Dear Friends, I have 2 streams of the below data types. DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple; DataStream<Tuple2<String, Double>> unionReloadsStream; These streams are getting data from Kafka and getting data in different frequencies.
"unionReloadsStream"
will receive more data than "splittedActivationTuple". I need to store
"splittedActivationTuple"
in a Window of 24 hours and manipulate its "Double" field, if a matching data comes from
unionReloadsStream (String field is the common field). So I wrote the following method to do this task. public static DataStream<Tuple3<String, Integer, Double>> joinActivationsBasedOnReload( DataStream<Tuple3<String, Integer, Double>> activationsStream, DataStream<Tuple2<String, Double>> unifiedReloadStream) { return activationsStream.join(unifiedReloadStream).where(new ActivationStreamSelector()) .equalTo(new ReloadStreamSelector()).window(GlobalWindows.create()) .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS))) .apply(new JoinFunction<Tuple3<String, Integer, Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() { private static final long serialVersionUID = 1L; @Override public Tuple3<String, Integer, Double> join(Tuple3<String, Integer, Double> first, Tuple2<String, Double> second) { return new Tuple3<String, Integer, Double>(first.f0, first.f1, first.f2 + second.f1); } }); } and calling as, DataStream<Tuple3<String, Integer, Double>> activationWindowStream = joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream); activationWindowStream.print(); But I couldn't see anything printing. I expected "activationWindowStream" to contain the "splittedActivationTuple" (smaller set) data and the Double value accumulated if
unionReloadsStream's incoming elements have a matching "String" field. But that is not happening. Where I am missing? Thanks, Rakkesh |
Hi Rakkesh,
Did you call `execute()`on your `StreamExecutionEnvironment`? Best, Xingcan > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <[hidden email]> wrote: > > Dear Friends, > I have 2 streams of the below data types. > > DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple; > > DataStream<Tuple2<String, Double>> unionReloadsStream; > > These streams are getting data from Kafka and getting data in different frequencies. "unionReloadsStream" will receive more data than "splittedActivationTuple". I need to store "splittedActivationTuple" in a Window of 24 hours and manipulate its "Double" field, if a matching data comes from unionReloadsStream (String field is the common field). > > So I wrote the following method to do this task. > > > public static DataStream<Tuple3<String, Integer, Double>> joinActivationsBasedOnReload( > DataStream<Tuple3<String, Integer, Double>> activationsStream, > DataStream<Tuple2<String, Double>> unifiedReloadStream) { > > return activationsStream.join(unifiedReloadStream).where(new ActivationStreamSelector()) > .equalTo(new ReloadStreamSelector()).window(GlobalWindows.create()) > .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS))) > .apply(new JoinFunction<Tuple3<String, Integer, Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() { > private static final long serialVersionUID = 1L; > @Override > public Tuple3<String, Integer, Double> join(Tuple3<String, Integer, Double> first, > Tuple2<String, Double> second) { > return new Tuple3<String, Integer, Double>(first.f0, first.f1, first.f2 + second.f1); > } > }); > } > > > and calling as, > > DataStream<Tuple3<String, Integer, Double>> activationWindowStream = joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream); > > activationWindowStream.print(); > > > But I couldn't see anything printing. > > I expected "activationWindowStream" to contain the "splittedActivationTuple" (smaller set) data and the Double value accumulated if unionReloadsStream's incoming elements have a matching "String" field. But that is not happening. Where I am missing? > > Thanks, > Rakkesh |
Thanks for the reply. I have called "env.execute()". But nothing getting printed. I have a doubt whether "implemented function" is correct
with my "requirement". Please assist.
On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <[hidden email]> wrote: Hi Rakkesh, |
Hi Rakkesh,
The `GlobalWindow` is commonly used for custom window assignment and you should specify a `trigger` for it [1]. If the built-in window (e.g., tumbling window or sliding window) join in DataStream API fails to meet the requirements, you could try the time-windowed join in Table/SQL API [2]. Hope that helps. Best, Xingcan
|
Thanks Xingcan. I specified as GlobalWindow since I am going to put all the elements coming with splittedActivationTuple with a 24 hour expiry and then do operations on that when elements coming with stream "unionReloadsStream" (bigger set). On Wed, Jul 18, 2018 at 4:07 PM, Xingcan Cui <[hidden email]> wrote:
|
Hi Rakkesh, As Xingcan said, the trigger is required by the window to FIRE, you can use time window (contains a inner trigger) or (ProcessFunction + State + Timer). Thanks, vino. 2018-07-18 21:44 GMT+08:00 Titus Rakkesh <[hidden email]>:
|
Free forum by Nabble | Edit this page |