.keyBy() on ConnectedStream

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

.keyBy() on ConnectedStream

Matt
Hi all,

What's the purpose of .keyBy() on ConnectedStream? How does it affect .map() and .flatMap()?

I'm not finding a way to group stream elements based on a key, something like a Window on a normal Stream, but for a ConnectedStream.

Regards,
Matt
Reply | Threaded
Open this post in threaded view
|

Re: .keyBy() on ConnectedStream

Timo Walther
Hi Matt,

the keyBy() on ConnectedStream has two parameters to specify the key of
the left and of the right stream. Same keys end up in the same
CoMapFunction/CoFlatMapFunction. If you want to group both streams on a
common key, then you can use .union() instead of .connect().

I hope that helps.

Timo


Am 27/01/17 um 07:21 schrieb Matt:

> Hi all,
>
> What's the purpose of .keyBy() on ConnectedStream? How does it affect
> .map() and .flatMap()?
>
> I'm not finding a way to group stream elements based on a key,
> something like a Window on a normal Stream, but for a ConnectedStream.
>
> Regards,
> Matt


Reply | Threaded
Open this post in threaded view
|

Re: .keyBy() on ConnectedStream

Matt
Aha, ok, got it!

I just realized that this ConnectedStream I was talking about (A) depends on another ConnectedStream (B), which depends on the first one (A). So it's even trickier than I first thought.

For instance (simplified):

predictionStream = input
  .connect(statsStream)
  .keyBy(...)
  .flatMap(CoFlatMapFunction {
     flatMap1(obj, output) {
         p = prediction(obj)
         output.collect(p)
     }
     flatMap2(stat, output) {
         updateModel(stat)
     }
  })

statsStream = input2
  .connect(predictionStream)
  .keyBy(...)
  .flatMap(CoFlatMapFunction {
     flatMap1(obj2, output) {
        s = getStats(obj2, p)
        output.collect(s)
     }
     flatMap2(prediction, output) {
        p = prediction
     }
  })

I'm guessing it should be possible to achieve, one way would be to add a sink on statsStream to save the elements into Kafka and read from that topic on predictionStream instead of initializing it with a reference of statsStream. I would rather avoid writing unnecessarily into kafka.

Is there any other way to achieve this?

Thanks,
Matt

On Fri, Jan 27, 2017 at 6:35 AM, Timo Walther <[hidden email]> wrote:
Hi Matt,

the keyBy() on ConnectedStream has two parameters to specify the key of the left and of the right stream. Same keys end up in the same CoMapFunction/CoFlatMapFunction. If you want to group both streams on a common key, then you can use .union() instead of .connect().

I hope that helps.

Timo


Am 27/01/17 um 07:21 schrieb Matt:

Hi all,

What's the purpose of .keyBy() on ConnectedStream? How does it affect .map() and .flatMap()?

I'm not finding a way to group stream elements based on a key, something like a Window on a normal Stream, but for a ConnectedStream.

Regards,
Matt



Reply | Threaded
Open this post in threaded view
|

Re: .keyBy() on ConnectedStream

Matt
I'll create a new thread with my last message since it's not completely related with the original question here.

On Sat, Jan 28, 2017 at 11:55 AM, Matt <[hidden email]> wrote:
Aha, ok, got it!

I just realized that this ConnectedStream I was talking about (A) depends on another ConnectedStream (B), which depends on the first one (A). So it's even trickier than I first thought.

For instance (simplified):

predictionStream = input
  .connect(statsStream)
  .keyBy(...)
  .flatMap(CoFlatMapFunction {
     flatMap1(obj, output) {
         p = prediction(obj)
         output.collect(p)
     }
     flatMap2(stat, output) {
         updateModel(stat)
     }
  })

statsStream = input2
  .connect(predictionStream)
  .keyBy(...)
  .flatMap(CoFlatMapFunction {
     flatMap1(obj2, output) {
        s = getStats(obj2, p)
        output.collect(s)
     }
     flatMap2(prediction, output) {
        p = prediction
     }
  })

I'm guessing it should be possible to achieve, one way would be to add a sink on statsStream to save the elements into Kafka and read from that topic on predictionStream instead of initializing it with a reference of statsStream. I would rather avoid writing unnecessarily into kafka.

Is there any other way to achieve this?

Thanks,
Matt

On Fri, Jan 27, 2017 at 6:35 AM, Timo Walther <[hidden email]> wrote:
Hi Matt,

the keyBy() on ConnectedStream has two parameters to specify the key of the left and of the right stream. Same keys end up in the same CoMapFunction/CoFlatMapFunction. If you want to group both streams on a common key, then you can use .union() instead of .connect().

I hope that helps.

Timo


Am 27/01/17 um 07:21 schrieb Matt:

Hi all,

What's the purpose of .keyBy() on ConnectedStream? How does it affect .map() and .flatMap()?

I'm not finding a way to group stream elements based on a key, something like a Window on a normal Stream, but for a ConnectedStream.

Regards,
Matt