Hi all.
In our task we have two Kafka topics: - one with fact stream (web traffic) - one with dimension We would like to put dimension data into broadcast state and lookup on int with facts. But we see that not all dimension records are put into state before first fact record is processed, so lookup gives no data. The question is: how could we read fact topic with some "delay" to give dimension enough time to initialize state? С уважением, |
Hi Vasily, Currently, Flink did not do the coordination between a general stream and broadcast stream, they are both streams. Your scene of using the broadcast state is a special one. In a more general scene, the states need to be broadcasted is an unbounded stream, the state events may be broadcasted to the downstream at any time. So it can not be wait to be done before playing the usual stream events. For your scene:
Best, Vino Vasily Melnik <[hidden email]> 于2019年11月14日周四 下午1:28写道:
|
Hi Vasily, unfortunately, this is known issue with Flink, you could read discussion under https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API . At the moment I have seen 3 solutions for this issue: 1. You buffer fact stream in local state before broadcast is completely read 2. You create custom source for fact stream and in open method wait before broadcast stream is completely read. 3. With latest Flink version, you could pre-populate state with dimension and start Flink job with existing state. You need to take care of setting correct kafka offsets for dimension stream though, otherwise you will get a gap between pre-populated state and moment when job is started. First 2 solutions need to know when broadcast stream is "completely read". I created workaround for this issue with custom source for dimension events. It creates "stop file" on shared file system, reads with admin interface kafka end offsets for dimension topic, start processing all messages from beginning and clears "stop file" after offset of messages reached end offsets for all partitions. Instead of "stop file" you could use shared lock in zookeeper. Hope this helps, Maxim. On Thu, Nov 14, 2019 at 7:42 AM vino yang <[hidden email]> wrote:
|
Maxim, great thanks. We'll try buffering. С уважением, On Thu, 14 Nov 2019 at 19:36, Maxim Parkachov <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |