Keyed join Flink Streaming

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Keyed join Flink Streaming

Adrienne Kole
Hi,

I have 2 streams which are partitioned based on key field. I want to join those streams based on  key fields on windows. This is an example I saw in the flink website:

val firstInput: DataStream[MyType] = ...
val secondInput: DataStream[AnotherType] = ...
 
val firstKeyed = firstInput.keyBy("userId")
val secondKeyed = secondInput.keyBy("id")
 
val result: DataStream[(MyType, AnotherType)] =
   firstKeyed.join(secondKeyed)
   onWindow(Time.of(5, SECONDS))

However, with current flink version,(1.1.2) I cannot do it. Basically even if streams are keyed or not, I still have to specify the "where" and "equal" clauses.

My question is that, is how can I implement keyed window joins in flink streaming? And is there a difference between:

val firstInput: KeyedStream[MyType] = ...
val secondInput: KeyedStream[AnotherType] = ...
val result: DataStream[(MyType, AnotherType)] =
   firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)

and


val firstInput: DataStream[MyType] = ...
val secondInput: DataStream[AnotherType] = ...
val result: DataStream[(MyType, AnotherType)] =
   firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)


Thanks
Adrienne


Reply | Threaded
Open this post in threaded view
|

Re: Keyed join Flink Streaming

Ufuk Celebi
Hey Adrienne!

On Wed, Oct 12, 2016 at 4:10 PM, Adrienne Kole <[hidden email]> wrote:

> Hi,
>
> I have 2 streams which are partitioned based on key field. I want to join
> those streams based on  key fields on windows. This is an example I saw in
> the flink website:
>
> val firstInput: DataStream[MyType] = ...
> val secondInput: DataStream[AnotherType] = ...
>
> val firstKeyed = firstInput.keyBy("userId")
> val secondKeyed = secondInput.keyBy("id")
>
> val result: DataStream[(MyType, AnotherType)] =
>    firstKeyed.join(secondKeyed)
>    onWindow(Time.of(5, SECONDS))

This does not work. I could not find this example in the Flink docs.
Do you remember where you found this? Would make sense to remove it.
:-)

You have to go with the other approach you described
(keyBy-join-where-equalTo-etc.). It would make sense to provide the
keyed stream join API though. If you like, you can open a JIRA issue
for it (you would need to tell me your JIRA ID so I can add you as a
contributor).

> val firstInput: KeyedStream[MyType] = ...
> val secondInput: KeyedStream[AnotherType] = ...
> val result: DataStream[(MyType, AnotherType)] =
>    firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)
>
> and
>
>
> val firstInput: DataStream[MyType] = ...
> val secondInput: DataStream[AnotherType] = ...
> val result: DataStream[(MyType, AnotherType)] =
>    firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)

Only if you need a specific KeyedDataStream operation, you would need
to go with the KeyedStream type. There is no difference execution wise
between the two examples.

– Ufuk
Reply | Threaded
Open this post in threaded view
|

Re: Keyed join Flink Streaming

Adrienne Kole


Hi Ufuk,
Thanks for reply.

The example is at [1]. I have few questions:

If there is  no difference between KeyedStream- KeyedStream join by key and DataStream-DataStream join, then DataStream becomes KeyedStream with `where` and `equal` clauses. Please correct me If I am wrong. 


Is the execution of windowed joins in Flink is reduced to only one machine in cluster, as it has quite low throughput, when comparing to other operations?
Thanks
Adrienne

On Thu, Oct 13, 2016 at 10:59 AM Ufuk Celebi <[hidden email]> wrote:
Hey Adrienne!

On Wed, Oct 12, 2016 at 4:10 PM, Adrienne Kole <[hidden email]> wrote:
> Hi,
>
> I have 2 streams which are partitioned based on key field. I want to join
> those streams based on  key fields on windows. This is an example I saw in
> the flink website:
>
> val firstInput: DataStream[MyType] = ...
> val secondInput: DataStream[AnotherType] = ...
>
> val firstKeyed = firstInput.keyBy("userId")
> val secondKeyed = secondInput.keyBy("id")
>
> val result: DataStream[(MyType, AnotherType)] =
>    firstKeyed.join(secondKeyed)
>    onWindow(Time.of(5, SECONDS))

This does not work. I could not find this example in the Flink docs.
Do you remember where you found this? Would make sense to remove it.
:-)

You have to go with the other approach you described
(keyBy-join-where-equalTo-etc.). It would make sense to provide the
keyed stream join API though. If you like, you can open a JIRA issue
for it (you would need to tell me your JIRA ID so I can add you as a
contributor).

> val firstInput: KeyedStream[MyType] = ...
> val secondInput: KeyedStream[AnotherType] = ...
> val result: DataStream[(MyType, AnotherType)] =
>    firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)
>
> and
>
>
> val firstInput: DataStream[MyType] = ...
> val secondInput: DataStream[AnotherType] = ...
> val result: DataStream[(MyType, AnotherType)] =
>    firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)

Only if you need a specific KeyedDataStream operation, you would need
to go with the KeyedStream type. There is no difference execution wise
between the two examples.

– Ufuk