ConnectedStream keyby issues

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

ConnectedStream keyby issues

Tarek khal
Hi ,
I have two kafka topics (tracking and rules) and I would like to join "tracking" datastream with "rules" datastream as the data arrives in the "tracking" datastream.

The problem with a join is that the rules only “survive” for the length of the window while I suspect that i want them to survive longer than that so that they can be applied to events arriving in the future.

I tested ConnectedStream and CoFlatMapFunction but the result is not as I wait.

For the execution:

1) I added 3 rules on "rules" topic (imei: "01","02,"03")
2) Perform 15 events with different imei but i guess i have problem with "keyby"

Result :



Code :

ConnectedStreams<TrackEvent, RulesEvent>  connectedStreams = inputEventStream.connect(inputRulesStream).keyBy("imei","imei");
        DataStream<Tuple2<TrackEvent, RulesEvent>> ds= connectedStreams.flatMap(new CoFlatMapFunction<TrackEvent, RulesEvent, Tuple2<TrackEvent,RulesEvent>>() {
            Tuple2<TrackEvent,RulesEvent> t2=new Tuple2<TrackEvent, RulesEvent>();
            @Override
            public void flatMap1(TrackEvent trackEvent, Collector<Tuple2<TrackEvent, RulesEvent>> collector) throws Exception {
                t2.f0=trackEvent;
                collector.collect(t2);
                // t2=new Tuple2<TrackEvent, RulesEvent>();
            }

            @Override
            public void flatMap2(RulesEvent rulesEvent, Collector<Tuple2<TrackEvent, RulesEvent>> collector) throws Exception {
                t2.f1 = rulesEvent;
                //collector.collect(t2);
            }
        });
        ds.printToErr();

Best,

Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStream keyby issues

Jason Brelloch
I think the issue is that t2 is not registered to keyed state, so it is being shared across all of the keys on that taskmanager.  Take a look at this article:


Basically you need to change t2 to be a ValueState[Tuple2[TrackEvent,RulesEvent]] and register it with a ValueStateDescriptor in in the function's open method.  Then access it using t2.value() and t2.update().

Hopefully that helps.

On Thu, May 4, 2017 at 9:17 AM, Tarek khal <[hidden email]> wrote:
Hi ,
I have two kafka topics (tracking and rules) and I would like to join
"tracking" datastream with "rules" datastream as the data arrives in the
"tracking" datastream.

The problem with a join is that the rules only “survive” for the length of
the window while I suspect that i want them to survive longer than that so
that they can be applied to events arriving in the future.

I tested ConnectedStream and CoFlatMapFunction but the result is not as I
wait.

*For the execution:*

1) I added 3 rules on "rules" topic (imei: "01","02,"03")
2) Perform 15 events with different imei but i guess i have problem with
"keyby"

*Result : *

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n12999/2222222.jpg>

*Code :*

ConnectedStreams<TrackEvent, RulesEvent>  connectedStreams =
inputEventStream.connect(inputRulesStream).keyBy("imei","imei");
        DataStream<Tuple2&lt;TrackEvent, RulesEvent>> ds=
connectedStreams.flatMap(new CoFlatMapFunction<TrackEvent, RulesEvent,
Tuple2&lt;TrackEvent,RulesEvent>>() {
            Tuple2<TrackEvent,RulesEvent> t2=new Tuple2<TrackEvent,
RulesEvent>();
            @Override
            public void flatMap1(TrackEvent trackEvent,
Collector<Tuple2&lt;TrackEvent, RulesEvent>> collector) throws Exception {
                t2.f0=trackEvent;
                collector.collect(t2);
                // t2=new Tuple2<TrackEvent, RulesEvent>();
            }

            @Override
            public void flatMap2(RulesEvent rulesEvent,
Collector<Tuple2&lt;TrackEvent, RulesEvent>> collector) throws Exception {
                t2.f1 = rulesEvent;
                //collector.collect(t2);
            }
        });
        ds.printToErr();

Best,





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



--
Jason Brelloch | Product Developer
3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 
Subscribe to the BetterCloud Monitor - Get IT delivered to your inbox
Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStream keyby issues

Tarek khal
Hi Jason,

Thank you very much for your help, it solves my problem.

Best regards,
Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStream keyby issues

yunfan123
In reply to this post by Jason Brelloch
But what happened if some data can't be merged forever ?
The state will be saved forever?
Can I set a timeout?
Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStream keyby issues

Aljoscha Krettek
Hi,
Yes, if you use ProcessFunction or CoProcessFunction you can (and should) set a timer to clean up state.

Best,
Aljoscha

> On 7. May 2017, at 09:28, yunfan123 <[hidden email]> wrote:
>
> But what happened if some data can't be merged forever ?
> The state will be saved forever?
> Can I set a timeout?
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13037.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStream keyby issues

yunfan123
private static class MergeFunction extends RichProcessFunction<Tuple2<Integer, ObjectNode>, Tuple2<Integer, ObjectNode>> {
        private ValueState<Tuple2<Integer, ObjectNode>> state;

        @Override
        public void open(Configuration parameters) throws Exception {
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("mystate", TypeInformation.of(getTypeOfTuple)));
        }

        @Override
        public void processElement(Tuple2<Integer, ObjectNode> tuple2, Context context, Collector<Tuple2<Integer, ObjectNode>> collector) throws Exception {
               //XXXXXX
               context.timerService().registerEventTimeTimer(System.currentTimeMillis() + interval);
        }
     
        @Override
        public void onTimer(long l, OnTimerContext onTimerContext, Collector<Tuple2<Integer, ObjectNode>> collector) throws Exception {
            if (state.value() != null) {
                collector.collect(state.value());
            }
        }
    }

In my understanding.

If I set a timer in my class that extends RichProcessFunction, once the onTimer function be called.
The object of MergeFunction can be recycled by gc.
If I don't set a event timer in the processElement, it can be recycled by gc after end of processElement function.
Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStream keyby issues

yunfan123
In upstairs example, it seems I should clear the state in onTimer function in order to free resource like follows:
public void onTimer(long l, OnTimerContext onTimerContext, Collector<Tuple2<Integer, ObjectNode>> collector) throws Exception {
            if (state.value() != null) {
                collector.collect(state.value());
                state.update(null);
            }
        }
Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStream keyby issues

Aljoscha Krettek
Yes, that looks right.

> On 10. May 2017, at 14:56, yunfan123 <[hidden email]> wrote:
>
> In upstairs example, it seems I should clear the state in onTimer function in
> order to free resource like follows:
> public void onTimer(long l, OnTimerContext onTimerContext,
> Collector<Tuple2&lt;Integer, ObjectNode>> collector) throws Exception {
>            if (state.value() != null) {
>                collector.collect(state.value());
>                state.update(null);
>            }
>        }
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13090.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStream keyby issues

gaurav
Hello

I am little confused on when the state will be gc. For example,

Example 1:

Class abc extends RichProcessFunction<Tuple<>,Tuple<>>
{
       public void processElement(......)
       {
           if(timer never set)
           {
                ctx.timerService().registerEventTimeTimer(...);
           }
       }
       public void onTimer(.....)
       {
           // do some work ....
           ctx.timerService().registerEventTimeTimer(...);
       }
}

In example 1, will it ever be garbage collected? Also, in example1 in processElement we are only once registering eventTimer. Will it be gc when the second event arrives?

And  if we have:
Example 2
public void onTimer(.....)
       {
           // do some work ....
          // no timer registeration
       }
Will it be gc after completion of onTimer ?  

Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStream keyby issues

Renjie Liu
In reply to this post by Tarek khal
@Jason I think there's a mistake in your explanation since each task in the task manager has its own copy of an operator instance, so the tuple may not be shared. State is a great solution but I think that's not the root cause.

@Tarek What's the parallelism of your data stream? I think the reason may be the parallelism is 1.

On Thu, May 4, 2017 at 10:39 PM Tarek khal <[hidden email]> wrote:
Hi Jason,

Thank you very much for your help, it solves my problem.

Best regards,



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13003.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
--
Liu, Renjie
Software Engineer, MVAD
Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStream keyby issues

Tarek khal
Hello Renjie,

Yes, the parallelism  is 1. what should i do pls ?

Regards,
Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStream keyby issues

Renjie Liu

Jason's solution is right, l'm just clarifying the mistake in the explanation.


Tarek khal <[hidden email]>于2017年5月19日周五 下午7:11写道:
Hello Renjie,

Yes, the parallelism  is 1. what should i do pls ?

Regards,



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13226.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
--
Liu, Renjie
Software Engineer, MVAD
Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStream keyby issues

Tarek khal
If I increase the parallelism operator, I risk losing shared state solution or it has nothing to do.
And if it's going to be an advantage, is it limited to what?

I am new with this framework I find difficulty in some notions.

Best Regards,
Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStream keyby issues

Renjie Liu
Even if you increase the operator parallelism, you can still use the state operation.

On Fri, May 19, 2017 at 7:47 PM Tarek khal <[hidden email]> wrote:
If I increase the parallelism operator, I risk losing shared state solution
or it has nothing to do.
And if it's going to be an advantage, is it limited to what?

I am new with this framework I find difficulty in some notions.

Best Regards,



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13228.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
--
Liu, Renjie
Software Engineer, MVAD
Reply | Threaded
Open this post in threaded view
|

Re: ConnectedStream keyby issues

Aljoscha Krettek
In reply to this post by gaurav
Hi,

The State will never be automatically GC’ed. You have to do it in the onTimer() callback, as mentioned earlier.

Best,
Aljoscha

> On 19. May 2017, at 10:39, gaurav <[hidden email]> wrote:
>
> Hello
>
> I am little confused on when the state will be gc. For example,
>
> Example 1:
>
> Class abc extends RichProcessFunction<Tuple&lt;>,Tuple<>>
> {
>       public void processElement(......)
>       {
>           if(timer never set)
>           {
>                ctx.timerService().registerEventTimeTimer(...);
>           }
>       }
>       public void onTimer(.....)
>       {
>           // do some work ....
>           ctx.timerService().registerEventTimeTimer(...);
>       }
> }
>
> In example 1, will it ever be garbage collected? Also, in example1 in
> processElement we are only once registering eventTimer. Will it be gc when
> the second event arrives?
>
> And  if we have:
> Example 2
> public void onTimer(.....)
>       {
>           // do some work ....
>          // no timer registeration
>       }
> Will it be gc after completion of onTimer ?  
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13219.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.