Implementing samza table/stream join

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Implementing samza table/stream join

Nick Dimiduk
Hello,

I'm interested in implementing a table/stream join, very similar to what is described in the "Table-stream join" section of the Samza key-value state documentation [0]. Conceptually, this would be an extension of the example provided in the javadocs for RichFunction#open [1], where I have a dataset of searchStrings instead of a single one. As per the Samza explanation, I would like to receive updates to this dataset via an operation log (a la, kafka topic), so as to update my local state while the streaming job runs.

Perhaps you can further advise on parallelization strategy for this operation. It seems to me that I'd want to partition the searchString database across multiple parallelization units and broadcast my input datastream to all those units. The idea being to maximize throughput on available hardware, though I would expect there to be a limit at which the network plane becomes a bottleneck to the broadcast.

Is there an example of how I might implement this in Flink-Streaming? I thought perhaps the DataStream#cross transformation would work, but I haven't worked out how to use it to my purpose. Thus far, I'm using the Java API.

Thanks a lot!
-n

Reply | Threaded
Open this post in threaded view
|

Re: Implementing samza table/stream join

Fabian Hueske-2
Hi Nick,

I think you can do this with Flink quite similar to how it is explained in the Samza documentation by using a stateful CoFlatMapFunction [1], [2].

Please have a look at this snippet [3].
This code implements an updateable stream filter. The first stream is filtered by words from the second stream. The filter operator adds or removes words to/from the filter which are received from the second stream. Both flows are partitioned by the filter word (or join key) such that each parallel task instance is only responsible for a subset of the filter words.

Please let me know if you have questions.

Best,
Fabian

2015-11-10 19:02 GMT+01:00 Nick Dimiduk <[hidden email]>:
Hello,

I'm interested in implementing a table/stream join, very similar to what is described in the "Table-stream join" section of the Samza key-value state documentation [0]. Conceptually, this would be an extension of the example provided in the javadocs for RichFunction#open [1], where I have a dataset of searchStrings instead of a single one. As per the Samza explanation, I would like to receive updates to this dataset via an operation log (a la, kafka topic), so as to update my local state while the streaming job runs.

Perhaps you can further advise on parallelization strategy for this operation. It seems to me that I'd want to partition the searchString database across multiple parallelization units and broadcast my input datastream to all those units. The idea being to maximize throughput on available hardware, though I would expect there to be a limit at which the network plane becomes a bottleneck to the broadcast.

Is there an example of how I might implement this in Flink-Streaming? I thought perhaps the DataStream#cross transformation would work, but I haven't worked out how to use it to my purpose. Thus far, I'm using the Java API.

Thanks a lot!
-n


Reply | Threaded
Open this post in threaded view
|

Re: Implementing samza table/stream join

Nick Dimiduk
Brilliant Fabian, thanks a lot! This looks exactly like what I'm after. One thing: the DatStream API I'm using (0.9.1) does not have a keyBy() method. Presumably this is from newer API?

On Tue, Nov 10, 2015 at 1:11 PM, Fabian Hueske <[hidden email]> wrote:
Hi Nick,

I think you can do this with Flink quite similar to how it is explained in the Samza documentation by using a stateful CoFlatMapFunction [1], [2].

Please have a look at this snippet [3].
This code implements an updateable stream filter. The first stream is filtered by words from the second stream. The filter operator adds or removes words to/from the filter which are received from the second stream. Both flows are partitioned by the filter word (or join key) such that each parallel task instance is only responsible for a subset of the filter words.

Please let me know if you have questions.

Best,
Fabian

2015-11-10 19:02 GMT+01:00 Nick Dimiduk <[hidden email]>:
Hello,

I'm interested in implementing a table/stream join, very similar to what is described in the "Table-stream join" section of the Samza key-value state documentation [0]. Conceptually, this would be an extension of the example provided in the javadocs for RichFunction#open [1], where I have a dataset of searchStrings instead of a single one. As per the Samza explanation, I would like to receive updates to this dataset via an operation log (a la, kafka topic), so as to update my local state while the streaming job runs.

Perhaps you can further advise on parallelization strategy for this operation. It seems to me that I'd want to partition the searchString database across multiple parallelization units and broadcast my input datastream to all those units. The idea being to maximize throughput on available hardware, though I would expect there to be a limit at which the network plane becomes a bottleneck to the broadcast.

Is there an example of how I might implement this in Flink-Streaming? I thought perhaps the DataStream#cross transformation would work, but I haven't worked out how to use it to my purpose. Thus far, I'm using the Java API.

Thanks a lot!
-n



Reply | Threaded
Open this post in threaded view
|

Re: Implementing samza table/stream join

Stephan Ewen
I would encourage you to use the 0.10 version of Flink. Streaming has made some major improvements there.

The release is voted on now, you can refer to these repositories for the release candidate code: 



Greetings,
Stephan


On Wed, Nov 11, 2015 at 2:07 AM, Nick Dimiduk <[hidden email]> wrote:
Brilliant Fabian, thanks a lot! This looks exactly like what I'm after. One thing: the DatStream API I'm using (0.9.1) does not have a keyBy() method. Presumably this is from newer API?

On Tue, Nov 10, 2015 at 1:11 PM, Fabian Hueske <[hidden email]> wrote:
Hi Nick,

I think you can do this with Flink quite similar to how it is explained in the Samza documentation by using a stateful CoFlatMapFunction [1], [2].

Please have a look at this snippet [3].
This code implements an updateable stream filter. The first stream is filtered by words from the second stream. The filter operator adds or removes words to/from the filter which are received from the second stream. Both flows are partitioned by the filter word (or join key) such that each parallel task instance is only responsible for a subset of the filter words.

Please let me know if you have questions.

Best,
Fabian

2015-11-10 19:02 GMT+01:00 Nick Dimiduk <[hidden email]>:
Hello,

I'm interested in implementing a table/stream join, very similar to what is described in the "Table-stream join" section of the Samza key-value state documentation [0]. Conceptually, this would be an extension of the example provided in the javadocs for RichFunction#open [1], where I have a dataset of searchStrings instead of a single one. As per the Samza explanation, I would like to receive updates to this dataset via an operation log (a la, kafka topic), so as to update my local state while the streaming job runs.

Perhaps you can further advise on parallelization strategy for this operation. It seems to me that I'd want to partition the searchString database across multiple parallelization units and broadcast my input datastream to all those units. The idea being to maximize throughput on available hardware, though I would expect there to be a limit at which the network plane becomes a bottleneck to the broadcast.

Is there an example of how I might implement this in Flink-Streaming? I thought perhaps the DataStream#cross transformation would work, but I haven't worked out how to use it to my purpose. Thus far, I'm using the Java API.

Thanks a lot!
-n




Reply | Threaded
Open this post in threaded view
|

Re: Implementing samza table/stream join

rmetzger0
In Flink 0.9.1 keyBy is called "groupBy()". We've reworked the DataStream API between 0.9 and 0.10, that's why we had to rename the method.

On Wed, Nov 11, 2015 at 9:37 AM, Stephan Ewen <[hidden email]> wrote:
I would encourage you to use the 0.10 version of Flink. Streaming has made some major improvements there.

The release is voted on now, you can refer to these repositories for the release candidate code: 



Greetings,
Stephan


On Wed, Nov 11, 2015 at 2:07 AM, Nick Dimiduk <[hidden email]> wrote:
Brilliant Fabian, thanks a lot! This looks exactly like what I'm after. One thing: the DatStream API I'm using (0.9.1) does not have a keyBy() method. Presumably this is from newer API?

On Tue, Nov 10, 2015 at 1:11 PM, Fabian Hueske <[hidden email]> wrote:
Hi Nick,

I think you can do this with Flink quite similar to how it is explained in the Samza documentation by using a stateful CoFlatMapFunction [1], [2].

Please have a look at this snippet [3].
This code implements an updateable stream filter. The first stream is filtered by words from the second stream. The filter operator adds or removes words to/from the filter which are received from the second stream. Both flows are partitioned by the filter word (or join key) such that each parallel task instance is only responsible for a subset of the filter words.

Please let me know if you have questions.

Best,
Fabian

2015-11-10 19:02 GMT+01:00 Nick Dimiduk <[hidden email]>:
Hello,

I'm interested in implementing a table/stream join, very similar to what is described in the "Table-stream join" section of the Samza key-value state documentation [0]. Conceptually, this would be an extension of the example provided in the javadocs for RichFunction#open [1], where I have a dataset of searchStrings instead of a single one. As per the Samza explanation, I would like to receive updates to this dataset via an operation log (a la, kafka topic), so as to update my local state while the streaming job runs.

Perhaps you can further advise on parallelization strategy for this operation. It seems to me that I'd want to partition the searchString database across multiple parallelization units and broadcast my input datastream to all those units. The idea being to maximize throughput on available hardware, though I would expect there to be a limit at which the network plane becomes a bottleneck to the broadcast.

Is there an example of how I might implement this in Flink-Streaming? I thought perhaps the DataStream#cross transformation would work, but I haven't worked out how to use it to my purpose. Thus far, I'm using the Java API.

Thanks a lot!
-n





Reply | Threaded
Open this post in threaded view
|

Re: Implementing samza table/stream join

Nick Dimiduk
Yes, I observed the RC votes underway. I did wire up 0.10 dependencies a couple days back and saw there were API changes. I will continue to work toward stabilizing my prototype before moving to the new API, hopefully timing will coincide with your release.

Thanks again for being such a communicative resource. I know how much work it is to keep up with the mailing lists. I do appreciate it.

-n

On Wed, Nov 11, 2015 at 1:10 AM, Robert Metzger <[hidden email]> wrote:
In Flink 0.9.1 keyBy is called "groupBy()". We've reworked the DataStream API between 0.9 and 0.10, that's why we had to rename the method.

On Wed, Nov 11, 2015 at 9:37 AM, Stephan Ewen <[hidden email]> wrote:
I would encourage you to use the 0.10 version of Flink. Streaming has made some major improvements there.

The release is voted on now, you can refer to these repositories for the release candidate code: 



Greetings,
Stephan


On Wed, Nov 11, 2015 at 2:07 AM, Nick Dimiduk <[hidden email]> wrote:
Brilliant Fabian, thanks a lot! This looks exactly like what I'm after. One thing: the DatStream API I'm using (0.9.1) does not have a keyBy() method. Presumably this is from newer API?

On Tue, Nov 10, 2015 at 1:11 PM, Fabian Hueske <[hidden email]> wrote:
Hi Nick,

I think you can do this with Flink quite similar to how it is explained in the Samza documentation by using a stateful CoFlatMapFunction [1], [2].

Please have a look at this snippet [3].
This code implements an updateable stream filter. The first stream is filtered by words from the second stream. The filter operator adds or removes words to/from the filter which are received from the second stream. Both flows are partitioned by the filter word (or join key) such that each parallel task instance is only responsible for a subset of the filter words.

Please let me know if you have questions.

Best,
Fabian

2015-11-10 19:02 GMT+01:00 Nick Dimiduk <[hidden email]>:
Hello,

I'm interested in implementing a table/stream join, very similar to what is described in the "Table-stream join" section of the Samza key-value state documentation [0]. Conceptually, this would be an extension of the example provided in the javadocs for RichFunction#open [1], where I have a dataset of searchStrings instead of a single one. As per the Samza explanation, I would like to receive updates to this dataset via an operation log (a la, kafka topic), so as to update my local state while the streaming job runs.

Perhaps you can further advise on parallelization strategy for this operation. It seems to me that I'd want to partition the searchString database across multiple parallelization units and broadcast my input datastream to all those units. The idea being to maximize throughput on available hardware, though I would expect there to be a limit at which the network plane becomes a bottleneck to the broadcast.

Is there an example of how I might implement this in Flink-Streaming? I thought perhaps the DataStream#cross transformation would work, but I haven't worked out how to use it to my purpose. Thus far, I'm using the Java API.

Thanks a lot!
-n