Is Flink able to do real time stock market analysis?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Is Flink able to do real time stock market analysis?

Ivan Wang
Hi all,

I've spent nearly 2 weeks trying to figure a solution to my requirement as below. If anyone can advise, that would be great.

1. There're going to be 2000 transactions per second as StreamRaw, I'm going to tumbleWindow them as StreamA, let's say every 15 seconds. Then I'm going to countWindow StreamA as StreamB, let's say every 20 events.

2. For every event in  StreamRaw as E, I need to find exact one event in StreamB which is earlier than E and closest to E. Then some comparison will be proceeded. For example, if timestamp in E is 9:46:38, there should be an event in StreamB with timestamp 9:46:30 because I use 15 seconds interval. 

I tried CEP using StreamRaw, however, I didn't figure out how to involve StreamB and get the exact one event in condition method.

I tried tableAPI and SQL, it throws time attribute error during the second window method. 

window(Tumble).group().select().window(Slide).group().select()

Seems there's no way to tell Flink the time attribute after the first window.group(). I then tried to convert it into table first then leftoutJoin them. But Flink tells me it's not supported.

Is Flink able to do this? If not, I'll go for other alternatives. Thanks again if someone can help.







Reply | Threaded
Open this post in threaded view
|

Re: Is Flink able to do real time stock market analysis?

Michael Latta
I am new to Flink so others may have more complete answer or correct me.

If you are counting the events in a tumbling window you will get output at the end of each tumbling window, so a running count of events/window.  It sounds like you want to compare the raw data to the smoothed data?  You can use a CoFlatMap to receive both streams and output any records you like, say a Tuple with the raw and smoothed value.  If you use a RichCoFlatMap you can track state, so you could keep a list of the last 20 or so raw and smoothed values so you can align them.

Michael

On Apr 10, 2018, at 6:40 PM, Ivan Wang <[hidden email]> wrote:

Hi all,

I've spent nearly 2 weeks trying to figure a solution to my requirement as below. If anyone can advise, that would be great.

1. There're going to be 2000 transactions per second as StreamRaw, I'm going to tumbleWindow them as StreamA, let's say every 15 seconds. Then I'm going to countWindow StreamA as StreamB, let's say every 20 events.

2. For every event in  StreamRaw as E, I need to find exact one event in StreamB which is earlier than E and closest to E. Then some comparison will be proceeded. For example, if timestamp in E is 9:46:38, there should be an event in StreamB with timestamp 9:46:30 because I use 15 seconds interval. 

I tried CEP using StreamRaw, however, I didn't figure out how to involve StreamB and get the exact one event in condition method.

I tried tableAPI and SQL, it throws time attribute error during the second window method. 

window(Tumble).group().select().window(Slide).group().select()

Seems there's no way to tell Flink the time attribute after the first window.group(). I then tried to convert it into table first then leftoutJoin them. But Flink tells me it's not supported.

Is Flink able to do this? If not, I'll go for other alternatives. Thanks again if someone can help.








Reply | Threaded
Open this post in threaded view
|

Re: Is Flink able to do real time stock market analysis?

Ivan Wang-2

Thanks Michael very much, it helps a lot!

 

I tried what you suggest and now I can compare smoothed data with raw date in coFlat method.

However, it cannot ensure that the smoothed data is coming in the expected way.  Basically for every raw event, I’d like to refer to the early but closest event in smoothed data. However, it cannot be guaranteed by default. For example, we raw event comes with event time 13:01:39, I’d like to refer to smoothed event with event time 13:01:30 due to 15 seconds interval. But the latter only arrives after raw event 13:01:58, this happens at least in batch processing when I did historical analysis.  

 

I corrected the order by using key state in coFlatMap method. I stored the latest smoothed event and queued raw event if they arrive too early.

 

My question is that is there any better and straightforward way to correct the order? Because it makes the code hard to read. I’m thinking about watermark, but not sure how to do this.

 

 

-- 

Thanks

Ivan

From: TechnoMage <[hidden email]>
Date: Thursday, 12 April 2018 at 3:21 AM
To: Ivan Wang <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Is Flink able to do real time stock market analysis?

 

I am new to Flink so others may have more complete answer or correct me.

 

If you are counting the events in a tumbling window you will get output at the end of each tumbling window, so a running count of events/window.  It sounds like you want to compare the raw data to the smoothed data?  You can use a CoFlatMap to receive both streams and output any records you like, say a Tuple with the raw and smoothed value.  If you use a RichCoFlatMap you can track state, so you could keep a list of the last 20 or so raw and smoothed values so you can align them.

 

Michael



On Apr 10, 2018, at 6:40 PM, Ivan Wang <[hidden email]> wrote:

 

Hi all,

 

I've spent nearly 2 weeks trying to figure a solution to my requirement as below. If anyone can advise, that would be great.

 

1. There're going to be 2000 transactions per second as StreamRaw, I'm going to tumbleWindow them as StreamA, let's say every 15 seconds. Then I'm going to countWindow StreamA as StreamB, let's say every 20 events.

 

2. For every event in  StreamRaw as E, I need to find exact one event in StreamB which is earlier than E and closest to E. Then some comparison will be proceeded. For example, if timestamp in E is 9:46:38, there should be an event in StreamB with timestamp 9:46:30 because I use 15 seconds interval. 



I tried CEP using StreamRaw, however, I didn't figure out how to involve StreamB and get the exact one event in condition method.



I tried tableAPI and SQL, it throws time attribute error during the second window method. 



window(Tumble).group().select().window(Slide).group().select()



Seems there's no way to tell Flink the time attribute after the first window.group(). I then tried to convert it into table first then leftoutJoin them. But Flink tells me it's not supported.

 

Is Flink able to do this? If not, I'll go for other alternatives. Thanks again if someone can help.





 

 

 

 

 



Reply | Threaded
Open this post in threaded view
|

Re: Is Flink able to do real time stock market analysis?

Michael Latta
Given the data from a window can not arrive before any of the data in that window, it will always arrive after the raw data for the same period, and may have some latency relative to the raw data.  If your RichFlatMapFunction uses a ListState to hold more than one window worth of raw and smoothed data, you should be able to get what you want.  Given distributed systems and relative time I am not sure you will get simpler than that.

Michael

On Apr 12, 2018, at 7:52 AM, Ivan Wang <[hidden email]> wrote:

Thanks Michael very much, it helps a lot! 
 
I tried what you suggest and now I can compare smoothed data with raw date in coFlat method.
However, it cannot ensure that the smoothed data is coming in the expected way.  Basically for every raw event, I’d like to refer to the early but closest event in smoothed data. However, it cannot be guaranteed by default. For example, we raw event comes with event time 13:01:39, I’d like to refer to smoothed event with event time 13:01:30 due to 15 seconds interval. But the latter only arrives after raw event 13:01:58, this happens at least in batch processing when I did historical analysis.  
 
I corrected the order by using key state in coFlatMap method. I stored the latest smoothed event and queued raw event if they arrive too early.
 
My question is that is there any better and straightforward way to correct the order? Because it makes the code hard to read. I’m thinking about watermark, but not sure how to do this.
 
 
-- 
Thanks
Ivan
From: TechnoMage <[hidden email]>
Date: Thursday, 12 April 2018 at 3:21 AM
To: Ivan Wang <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Is Flink able to do real time stock market analysis?
 
 
If you are counting the events in a tumbling window you will get output at the end of each tumbling window, so a running count of events/window.  It sounds like you want to compare the raw data to the smoothed data?  You can use a CoFlatMap to receive both streams and output any records you like, say a Tuple with the raw and smoothed value.  If you use a RichCoFlatMap you can track state, so you could keep a list of the last 20 or so raw and smoothed values so you can align them.
 
Michael


On Apr 10, 2018, at 6:40 PM, Ivan Wang <[hidden email]> wrote:
 
Hi all, 
 
I've spent nearly 2 weeks trying to figure a solution to my requirement as below. If anyone can advise, that would be great.
 
1. There're going to be 2000 transactions per second as StreamRaw, I'm going to tumbleWindow them as StreamA, let's say every 15 seconds. Then I'm going to countWindow StreamA as StreamB, let's say every 20 events.
 
2. For every event in  StreamRaw as E, I need to find exact one event in StreamB which is earlier than E and closest to E. Then some comparison will be proceeded. For example, if timestamp in E is 9:46:38, there should be an event in StreamB with timestamp 9:46:30 because I use 15 seconds interval. 


I tried CEP using StreamRaw, however, I didn't figure out how to involve StreamB and get the exact one event in condition method.


I tried tableAPI and SQL, it throws time attribute error during the second window method. 


window(Tumble).group().select().window(Slide).group().select()


Seems there's no way to tell Flink the time attribute after the first window.group(). I then tried to convert it into table first then leftoutJoin them. But Flink tells me it's not supported.
 
Is Flink able to do this? If not, I'll go for other alternatives. Thanks again if someone can help.

Reply | Threaded
Open this post in threaded view
|

Re: Is Flink able to do real time stock market analysis?

Fabian Hueske-2
Hi Ivan,

You can certainly do these things with Flink.
Michael pointed you in a good direction if you want to implement the logic in the DataStream API / ProcessFunctions.

Flink's SQL support should also be able to handle the use case you described.

The "ingredients" would be
- a TUMBLE window [1] with a TUMBLE_ROWTIME [2] in the SELECT clause to forward the rowtime of the window.
- an OVER window with an ORDER BY time ROW for the sliding window (just forward the time attribute in SELECT) [3]
- a windowed join to join the row event with the smoothed aggregate [4]

Best,

2018-04-12 14:13 GMT-07:00 TechnoMage <[hidden email]>:
Given the data from a window can not arrive before any of the data in that window, it will always arrive after the raw data for the same period, and may have some latency relative to the raw data.  If your RichFlatMapFunction uses a ListState to hold more than one window worth of raw and smoothed data, you should be able to get what you want.  Given distributed systems and relative time I am not sure you will get simpler than that.

Michael


On Apr 12, 2018, at 7:52 AM, Ivan Wang <[hidden email]> wrote:

Thanks Michael very much, it helps a lot! 
 
I tried what you suggest and now I can compare smoothed data with raw date in coFlat method.
However, it cannot ensure that the smoothed data is coming in the expected way.  Basically for every raw event, I’d like to refer to the early but closest event in smoothed data. However, it cannot be guaranteed by default. For example, we raw event comes with event time 13:01:39, I’d like to refer to smoothed event with event time 13:01:30 due to 15 seconds interval. But the latter only arrives after raw event 13:01:58, this happens at least in batch processing when I did historical analysis.  
 
I corrected the order by using key state in coFlatMap method. I stored the latest smoothed event and queued raw event if they arrive too early.
 
My question is that is there any better and straightforward way to correct the order? Because it makes the code hard to read. I’m thinking about watermark, but not sure how to do this.
 
 
-- 
Thanks
Ivan
From: TechnoMage <[hidden email]>
Date: Thursday, 12 April 2018 at 3:21 AM
To: Ivan Wang <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Is Flink able to do real time stock market analysis?
 
 
If you are counting the events in a tumbling window you will get output at the end of each tumbling window, so a running count of events/window.  It sounds like you want to compare the raw data to the smoothed data?  You can use a CoFlatMap to receive both streams and output any records you like, say a Tuple with the raw and smoothed value.  If you use a RichCoFlatMap you can track state, so you could keep a list of the last 20 or so raw and smoothed values so you can align them.
 
Michael


On Apr 10, 2018, at 6:40 PM, Ivan Wang <[hidden email]> wrote:
 
Hi all, 
 
I've spent nearly 2 weeks trying to figure a solution to my requirement as below. If anyone can advise, that would be great.
 
1. There're going to be 2000 transactions per second as StreamRaw, I'm going to tumbleWindow them as StreamA, let's say every 15 seconds. Then I'm going to countWindow StreamA as StreamB, let's say every 20 events.
 
2. For every event in  StreamRaw as E, I need to find exact one event in StreamB which is earlier than E and closest to E. Then some comparison will be proceeded. For example, if timestamp in E is 9:46:38, there should be an event in StreamB with timestamp 9:46:30 because I use 15 seconds interval. 


I tried CEP using StreamRaw, however, I didn't figure out how to involve StreamB and get the exact one event in condition method.


I tried tableAPI and SQL, it throws time attribute error during the second window method. 


window(Tumble).group().select().window(Slide).group().select()


Seems there's no way to tell Flink the time attribute after the first window.group(). I then tried to convert it into table first then leftoutJoin them. But Flink tells me it's not supported.
 
Is Flink able to do this? If not, I'll go for other alternatives. Thanks again if someone can help.