Hi all,
I have a streaming job that I want at some point to duplicate stream, so that one copy of it ends up in a sink, and another one goes down the processing pipeline. This way I want to implement something similar to "tee" Linux utility. I tried a naive approach that looked like this: val streamToMirror = env.addSource(mySource).<some operators here> streamToMirror.addSink(someSink) // (1) tee streamToMirror.<more operators here>.addSink(anotherSink) // (2) process further But the above results in the stream being split between the mySink (1) and further processors (2) in a seemingly nondeterministic way. I could write through Kafka as shown in this pseudocode: val headStream = env.addSource(mySource).<some operators here> headStream.addSink(KafkaSink("myTopic")) val tailStream = env.addSource(KafkaSource("myTopic")).<more operators here> But this would incur additional latency + deserialization overhead that I would like to avoid. Is there any way to implement stream teeing as described? Thanks, Yury |
Hi Yury, your solution should exactly solve your problem.2016-12-20 10:50 GMT+01:00 Yury Ruchin <[hidden email]>:
|
Thanks Fabian, I will try creating a toy job illustrating the issue and get back. 2016-12-20 12:58 GMT+03:00 Fabian Hueske <[hidden email]>:
|
Well, it seems I figured it out. You're right, Fabian, it works the way you described. I wrote a simple test job: val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.fromCollection(Seq.range(0, 100)) stream.addSink(new DiscardingSink[Int]).disableChaining() stream.map {_ => 1}.countWindowAll(100).sum(0).print().disableChaining() env.setParallelism(2) env.execute("tee-test-job") I saw that "Records received" was 100 for both the DiscardingSink and window operators. I also noticed that "Records out" for the fromCollection() sink was 200 - and that was the key to understanding. In the original job I use Kafka source, and I treated its "Records out" as the number of records consumed by it, but it's not true. The correct interpretation should be "<Records Out> = <Records Consumed> * <Number of successor operators>". The additional source of confusion for me was some inaccuracy of sampled numbers - "Records In" values in successor operators were not exactly equal which made me think they receive different portions of the stream. I believe the inaccuracy is somewhat intrinsic to live stream sampling, so that's fine. 2016-12-20 14:35 GMT+03:00 Yury Ruchin <[hidden email]>:
|
Thanks for reporting back Yury! Glad to hear that your use case is covered.2016-12-20 16:42 GMT+01:00 Yury Ruchin <[hidden email]>:
|
In reply to this post by Yury Ruchin
My bad, the "Records Out" in the previous message should be read "Records sent" as per Flink UI. 2016-12-20 18:42 GMT+03:00 Yury Ruchin <[hidden email]>:
|
Free forum by Nabble | Edit this page |