Implementing tee functionality in a streaming job

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Implementing tee functionality in a streaming job

Yury Ruchin
Hi all,

I have a streaming job that I want at some point to duplicate stream, so that one copy of it ends up in a sink, and another one goes down the processing pipeline. This way I want to implement something similar to "tee" Linux utility.

I tried a naive approach that looked like this:

val streamToMirror = env.addSource(mySource).<some operators here>
streamToMirror.addSink(someSink) // (1) tee 
streamToMirror.<more operators here>.addSink(anotherSink) // (2) process further

But the above results in the stream being split between the mySink (1) and further processors (2) in a seemingly nondeterministic way.

I could write through Kafka as shown in this pseudocode:

val headStream = env.addSource(mySource).<some operators here>
headStream.addSink(KafkaSink("myTopic"))
val tailStream = env.addSource(KafkaSource("myTopic")).<more operators here>

But this would incur additional latency + deserialization overhead that I would like to avoid.

Is there any way to implement stream teeing as described?

Thanks,
Yury
Reply | Threaded
Open this post in threaded view
|

Re: Implementing tee functionality in a streaming job

Fabian Hueske-2
Hi Yury,

your solution should exactly solve your problem.
An operator sends all outgoing records to all connected successor operators.
There should not be any non-deterministic behavior or splitting of records.

Can you share some example code that produces the non-deterministic behavior?

Best, Fabian


2016-12-20 10:50 GMT+01:00 Yury Ruchin <[hidden email]>:
Hi all,

I have a streaming job that I want at some point to duplicate stream, so that one copy of it ends up in a sink, and another one goes down the processing pipeline. This way I want to implement something similar to "tee" Linux utility.

I tried a naive approach that looked like this:

val streamToMirror = env.addSource(mySource).<some operators here>
streamToMirror.addSink(someSink) // (1) tee 
streamToMirror.<more operators here>.addSink(anotherSink) // (2) process further

But the above results in the stream being split between the mySink (1) and further processors (2) in a seemingly nondeterministic way.

I could write through Kafka as shown in this pseudocode:

val headStream = env.addSource(mySource).<some operators here>
headStream.addSink(KafkaSink("myTopic"))
val tailStream = env.addSource(KafkaSource("myTopic")).<more operators here>

But this would incur additional latency + deserialization overhead that I would like to avoid.

Is there any way to implement stream teeing as described?

Thanks,
Yury

Reply | Threaded
Open this post in threaded view
|

Re: Implementing tee functionality in a streaming job

Yury Ruchin
Thanks Fabian, I will try creating a toy job illustrating the issue and get back.

2016-12-20 12:58 GMT+03:00 Fabian Hueske <[hidden email]>:
Hi Yury,

your solution should exactly solve your problem.
An operator sends all outgoing records to all connected successor operators.
There should not be any non-deterministic behavior or splitting of records.

Can you share some example code that produces the non-deterministic behavior?

Best, Fabian


2016-12-20 10:50 GMT+01:00 Yury Ruchin <[hidden email]>:
Hi all,

I have a streaming job that I want at some point to duplicate stream, so that one copy of it ends up in a sink, and another one goes down the processing pipeline. This way I want to implement something similar to "tee" Linux utility.

I tried a naive approach that looked like this:

val streamToMirror = env.addSource(mySource).<some operators here>
streamToMirror.addSink(someSink) // (1) tee 
streamToMirror.<more operators here>.addSink(anotherSink) // (2) process further

But the above results in the stream being split between the mySink (1) and further processors (2) in a seemingly nondeterministic way.

I could write through Kafka as shown in this pseudocode:

val headStream = env.addSource(mySource).<some operators here>
headStream.addSink(KafkaSink("myTopic"))
val tailStream = env.addSource(KafkaSource("myTopic")).<more operators here>

But this would incur additional latency + deserialization overhead that I would like to avoid.

Is there any way to implement stream teeing as described?

Thanks,
Yury


Reply | Threaded
Open this post in threaded view
|

Re: Implementing tee functionality in a streaming job

Yury Ruchin
Well, it seems I figured it out. You're right, Fabian, it works the way you described. I wrote a simple test job:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromCollection(Seq.range(0, 100))

stream.addSink(new DiscardingSink[Int]).disableChaining()
stream.map {_ => 1}.countWindowAll(100).sum(0).print().disableChaining()

env.setParallelism(2)
env.execute("tee-test-job")

I saw that "Records received" was 100 for both the DiscardingSink and window operators. I also noticed that "Records out" for the fromCollection() sink was 200 - and that was the key to understanding. In the original job I use Kafka source, and I treated its "Records out" as the number of records consumed by it, but it's not true. The correct interpretation should be "<Records Out> = <Records Consumed> * <Number of successor operators>". The additional source of confusion for me was some inaccuracy of sampled numbers - "Records In" values in successor operators were not exactly equal which made me think they receive different portions of the stream. I believe the inaccuracy is somewhat intrinsic to live stream sampling, so that's fine.


2016-12-20 14:35 GMT+03:00 Yury Ruchin <[hidden email]>:
Thanks Fabian, I will try creating a toy job illustrating the issue and get back.

2016-12-20 12:58 GMT+03:00 Fabian Hueske <[hidden email]>:
Hi Yury,

your solution should exactly solve your problem.
An operator sends all outgoing records to all connected successor operators.
There should not be any non-deterministic behavior or splitting of records.

Can you share some example code that produces the non-deterministic behavior?

Best, Fabian


2016-12-20 10:50 GMT+01:00 Yury Ruchin <[hidden email]>:
Hi all,

I have a streaming job that I want at some point to duplicate stream, so that one copy of it ends up in a sink, and another one goes down the processing pipeline. This way I want to implement something similar to "tee" Linux utility.

I tried a naive approach that looked like this:

val streamToMirror = env.addSource(mySource).<some operators here>
streamToMirror.addSink(someSink) // (1) tee 
streamToMirror.<more operators here>.addSink(anotherSink) // (2) process further

But the above results in the stream being split between the mySink (1) and further processors (2) in a seemingly nondeterministic way.

I could write through Kafka as shown in this pseudocode:

val headStream = env.addSource(mySource).<some operators here>
headStream.addSink(KafkaSink("myTopic"))
val tailStream = env.addSource(KafkaSource("myTopic")).<more operators here>

But this would incur additional latency + deserialization overhead that I would like to avoid.

Is there any way to implement stream teeing as described?

Thanks,
Yury



Reply | Threaded
Open this post in threaded view
|

Re: Implementing tee functionality in a streaming job

Fabian Hueske-2
Thanks for reporting back Yury!
Glad to hear that your use case is covered.

Cheers, Fabian

2016-12-20 16:42 GMT+01:00 Yury Ruchin <[hidden email]>:
Well, it seems I figured it out. You're right, Fabian, it works the way you described. I wrote a simple test job:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromCollection(Seq.range(0, 100))

stream.addSink(new DiscardingSink[Int]).disableChaining()
stream.map {_ => 1}.countWindowAll(100).sum(0).print().disableChaining()

env.setParallelism(2)
env.execute("tee-test-job")

I saw that "Records received" was 100 for both the DiscardingSink and window operators. I also noticed that "Records out" for the fromCollection() sink was 200 - and that was the key to understanding. In the original job I use Kafka source, and I treated its "Records out" as the number of records consumed by it, but it's not true. The correct interpretation should be "<Records Out> = <Records Consumed> * <Number of successor operators>". The additional source of confusion for me was some inaccuracy of sampled numbers - "Records In" values in successor operators were not exactly equal which made me think they receive different portions of the stream. I believe the inaccuracy is somewhat intrinsic to live stream sampling, so that's fine.


2016-12-20 14:35 GMT+03:00 Yury Ruchin <[hidden email]>:
Thanks Fabian, I will try creating a toy job illustrating the issue and get back.

2016-12-20 12:58 GMT+03:00 Fabian Hueske <[hidden email]>:
Hi Yury,

your solution should exactly solve your problem.
An operator sends all outgoing records to all connected successor operators.
There should not be any non-deterministic behavior or splitting of records.

Can you share some example code that produces the non-deterministic behavior?

Best, Fabian


2016-12-20 10:50 GMT+01:00 Yury Ruchin <[hidden email]>:
Hi all,

I have a streaming job that I want at some point to duplicate stream, so that one copy of it ends up in a sink, and another one goes down the processing pipeline. This way I want to implement something similar to "tee" Linux utility.

I tried a naive approach that looked like this:

val streamToMirror = env.addSource(mySource).<some operators here>
streamToMirror.addSink(someSink) // (1) tee 
streamToMirror.<more operators here>.addSink(anotherSink) // (2) process further

But the above results in the stream being split between the mySink (1) and further processors (2) in a seemingly nondeterministic way.

I could write through Kafka as shown in this pseudocode:

val headStream = env.addSource(mySource).<some operators here>
headStream.addSink(KafkaSink("myTopic"))
val tailStream = env.addSource(KafkaSource("myTopic")).<more operators here>

But this would incur additional latency + deserialization overhead that I would like to avoid.

Is there any way to implement stream teeing as described?

Thanks,
Yury




Reply | Threaded
Open this post in threaded view
|

Re: Implementing tee functionality in a streaming job

Yury Ruchin
In reply to this post by Yury Ruchin
My bad, the "Records Out" in the previous message should be read "Records sent" as per Flink UI.

2016-12-20 18:42 GMT+03:00 Yury Ruchin <[hidden email]>:
Well, it seems I figured it out. You're right, Fabian, it works the way you described. I wrote a simple test job:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromCollection(Seq.range(0, 100))

stream.addSink(new DiscardingSink[Int]).disableChaining()
stream.map {_ => 1}.countWindowAll(100).sum(0).print().disableChaining()

env.setParallelism(2)
env.execute("tee-test-job")

I saw that "Records received" was 100 for both the DiscardingSink and window operators. I also noticed that "Records out" for the fromCollection() sink was 200 - and that was the key to understanding. In the original job I use Kafka source, and I treated its "Records out" as the number of records consumed by it, but it's not true. The correct interpretation should be "<Records Out> = <Records Consumed> * <Number of successor operators>". The additional source of confusion for me was some inaccuracy of sampled numbers - "Records In" values in successor operators were not exactly equal which made me think they receive different portions of the stream. I believe the inaccuracy is somewhat intrinsic to live stream sampling, so that's fine.


2016-12-20 14:35 GMT+03:00 Yury Ruchin <[hidden email]>:
Thanks Fabian, I will try creating a toy job illustrating the issue and get back.

2016-12-20 12:58 GMT+03:00 Fabian Hueske <[hidden email]>:
Hi Yury,

your solution should exactly solve your problem.
An operator sends all outgoing records to all connected successor operators.
There should not be any non-deterministic behavior or splitting of records.

Can you share some example code that produces the non-deterministic behavior?

Best, Fabian


2016-12-20 10:50 GMT+01:00 Yury Ruchin <[hidden email]>:
Hi all,

I have a streaming job that I want at some point to duplicate stream, so that one copy of it ends up in a sink, and another one goes down the processing pipeline. This way I want to implement something similar to "tee" Linux utility.

I tried a naive approach that looked like this:

val streamToMirror = env.addSource(mySource).<some operators here>
streamToMirror.addSink(someSink) // (1) tee 
streamToMirror.<more operators here>.addSink(anotherSink) // (2) process further

But the above results in the stream being split between the mySink (1) and further processors (2) in a seemingly nondeterministic way.

I could write through Kafka as shown in this pseudocode:

val headStream = env.addSource(mySource).<some operators here>
headStream.addSink(KafkaSink("myTopic"))
val tailStream = env.addSource(KafkaSource("myTopic")).<more operators here>

But this would incur additional latency + deserialization overhead that I would like to avoid.

Is there any way to implement stream teeing as described?

Thanks,
Yury