Initialization of broadcast state before processing main stream

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Initialization of broadcast state before processing main stream

Vasily Melnik
Hi all.

In our task we have two Kafka topics:
- one with fact stream (web traffic)
- one with dimension 

We would like to put dimension data into broadcast state and lookup on int with facts. But we see that not all dimension records are put into state before first fact record is processed, so lookup gives no data. 

The question is: how could we read fact topic with some "delay" to give dimension enough time to initialize state?


С уважением,
Василий Мельник

Reply | Threaded
Open this post in threaded view
|

Re: Initialization of broadcast state before processing main stream

vino yang
Hi Vasily,

Currently, Flink did not do the coordination between a general stream and broadcast stream, they are both streams. Your scene of using the broadcast state is a special one. In a more general scene, the states need to be broadcasted is an unbounded stream, the state events may be broadcasted to the downstream at any time. So it can not be wait to be done before playing the usual stream events.

For your scene:

  • you can change your storage about dimension table, e.g. Redis or MySQL and so on to do the stream and dimension table join;
  • you can inject some control event in your broadcast stream to mark the stream is end and let the fact stream wait until receiving the control event. Or you can introduce a thrid-party coordinator e.g. ZooKeeper to coordinate them, however, it would make your solution more complex.
Best,
Vino


Vasily Melnik <[hidden email]> 于2019年11月14日周四 下午1:28写道:
Hi all.

In our task we have two Kafka topics:
- one with fact stream (web traffic)
- one with dimension 

We would like to put dimension data into broadcast state and lookup on int with facts. But we see that not all dimension records are put into state before first fact record is processed, so lookup gives no data. 

The question is: how could we read fact topic with some "delay" to give dimension enough time to initialize state?


С уважением,
Василий Мельник

Reply | Threaded
Open this post in threaded view
|

Re: Initialization of broadcast state before processing main stream

Maxim Parkachov
Hi Vasily,

unfortunately, this is known issue with Flink, you could read discussion under https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API .

At the moment I have seen 3 solutions for this issue:

1. You buffer fact stream in local state before broadcast is completely read
2. You create custom source for fact stream and in open method wait before broadcast stream is completely read.
3. With latest Flink version, you could pre-populate state with dimension and start Flink job with existing state. You need to take care of setting correct kafka offsets for dimension stream though, otherwise you will get a gap between pre-populated state and moment when job is started.

First 2  solutions need to know when broadcast stream is "completely read". I created workaround for this issue with custom source for dimension events. It creates "stop file" on shared file system, reads with admin interface kafka end offsets for dimension topic, start processing all messages from beginning and clears "stop file" after offset of messages reached end offsets for all partitions. Instead of "stop file" you could use shared lock in zookeeper.

Hope this helps,
Maxim.

On Thu, Nov 14, 2019 at 7:42 AM vino yang <[hidden email]> wrote:
Hi Vasily,

Currently, Flink did not do the coordination between a general stream and broadcast stream, they are both streams. Your scene of using the broadcast state is a special one. In a more general scene, the states need to be broadcasted is an unbounded stream, the state events may be broadcasted to the downstream at any time. So it can not be wait to be done before playing the usual stream events.

For your scene:

  • you can change your storage about dimension table, e.g. Redis or MySQL and so on to do the stream and dimension table join;
  • you can inject some control event in your broadcast stream to mark the stream is end and let the fact stream wait until receiving the control event. Or you can introduce a thrid-party coordinator e.g. ZooKeeper to coordinate them, however, it would make your solution more complex.
Best,
Vino


Vasily Melnik <[hidden email]> 于2019年11月14日周四 下午1:28写道:
Hi all.

In our task we have two Kafka topics:
- one with fact stream (web traffic)
- one with dimension 

We would like to put dimension data into broadcast state and lookup on int with facts. But we see that not all dimension records are put into state before first fact record is processed, so lookup gives no data. 

The question is: how could we read fact topic with some "delay" to give dimension enough time to initialize state?


С уважением,
Василий Мельник

Reply | Threaded
Open this post in threaded view
|

Re: Initialization of broadcast state before processing main stream

Vasily Melnik
Maxim, great thanks. 
We'll try buffering.

С уважением,
Василий Мельник



On Thu, 14 Nov 2019 at 19:36, Maxim Parkachov <[hidden email]> wrote:
Hi Vasily,

unfortunately, this is known issue with Flink, you could read discussion under https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API .

At the moment I have seen 3 solutions for this issue:

1. You buffer fact stream in local state before broadcast is completely read
2. You create custom source for fact stream and in open method wait before broadcast stream is completely read.
3. With latest Flink version, you could pre-populate state with dimension and start Flink job with existing state. You need to take care of setting correct kafka offsets for dimension stream though, otherwise you will get a gap between pre-populated state and moment when job is started.

First 2  solutions need to know when broadcast stream is "completely read". I created workaround for this issue with custom source for dimension events. It creates "stop file" on shared file system, reads with admin interface kafka end offsets for dimension topic, start processing all messages from beginning and clears "stop file" after offset of messages reached end offsets for all partitions. Instead of "stop file" you could use shared lock in zookeeper.

Hope this helps,
Maxim.

On Thu, Nov 14, 2019 at 7:42 AM vino yang <[hidden email]> wrote:
Hi Vasily,

Currently, Flink did not do the coordination between a general stream and broadcast stream, they are both streams. Your scene of using the broadcast state is a special one. In a more general scene, the states need to be broadcasted is an unbounded stream, the state events may be broadcasted to the downstream at any time. So it can not be wait to be done before playing the usual stream events.

For your scene:

  • you can change your storage about dimension table, e.g. Redis or MySQL and so on to do the stream and dimension table join;
  • you can inject some control event in your broadcast stream to mark the stream is end and let the fact stream wait until receiving the control event. Or you can introduce a thrid-party coordinator e.g. ZooKeeper to coordinate them, however, it would make your solution more complex.
Best,
Vino


Vasily Melnik <[hidden email]> 于2019年11月14日周四 下午1:28写道:
Hi all.

In our task we have two Kafka topics:
- one with fact stream (web traffic)
- one with dimension 

We would like to put dimension data into broadcast state and lookup on int with facts. But we see that not all dimension records are put into state before first fact record is processed, so lookup gives no data. 

The question is: how could we read fact topic with some "delay" to give dimension enough time to initialize state?


С уважением,
Василий Мельник