Hi, I have 2 streams which are partitioned based on key field. I want to join those streams based on key fields on windows. This is an example I saw in the flink website:val firstInput: DataStream[MyType] = ... val secondInput: DataStream[AnotherType] = ... val firstKeyed = firstInput.keyBy("userId") val secondKeyed = secondInput.keyBy("id") val result: DataStream[(MyType, AnotherType)] = firstKeyed.join(secondKeyed) onWindow(Time.of(5, SECONDS)) val firstInput: KeyedStream[MyType] = ... val secondInput: KeyedStream[AnotherType] = ... val result: DataStream[(MyType, AnotherType)] = firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..) val firstInput: DataStream[MyType] = ... val secondInput: DataStream[AnotherType] = ... val result: DataStream[(MyType, AnotherType)] = firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..) Thanks Adrienne |
Hey Adrienne!
On Wed, Oct 12, 2016 at 4:10 PM, Adrienne Kole <[hidden email]> wrote: > Hi, > > I have 2 streams which are partitioned based on key field. I want to join > those streams based on key fields on windows. This is an example I saw in > the flink website: > > val firstInput: DataStream[MyType] = ... > val secondInput: DataStream[AnotherType] = ... > > val firstKeyed = firstInput.keyBy("userId") > val secondKeyed = secondInput.keyBy("id") > > val result: DataStream[(MyType, AnotherType)] = > firstKeyed.join(secondKeyed) > onWindow(Time.of(5, SECONDS)) This does not work. I could not find this example in the Flink docs. Do you remember where you found this? Would make sense to remove it. :-) You have to go with the other approach you described (keyBy-join-where-equalTo-etc.). It would make sense to provide the keyed stream join API though. If you like, you can open a JIRA issue for it (you would need to tell me your JIRA ID so I can add you as a contributor). > val firstInput: KeyedStream[MyType] = ... > val secondInput: KeyedStream[AnotherType] = ... > val result: DataStream[(MyType, AnotherType)] = > firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..) > > and > > > val firstInput: DataStream[MyType] = ... > val secondInput: DataStream[AnotherType] = ... > val result: DataStream[(MyType, AnotherType)] = > firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..) Only if you need a specific KeyedDataStream operation, you would need to go with the KeyedStream type. There is no difference execution wise between the two examples. – Ufuk |
The example is at [1]. I have few questions: If there is no difference between KeyedStream- KeyedStream join by key and DataStream-DataStream join, then DataStream becomes KeyedStream with `where` and `equal` clauses. Please correct me If I am wrong. Is the execution of windowed joins in Flink is reduced to only one machine in cluster, as it has quite low throughput, when comparing to other operations? Thanks Adrienne On Thu, Oct 13, 2016 at 10:59 AM Ufuk Celebi <[hidden email]> wrote: Hey Adrienne! |
Free forum by Nabble | Edit this page |