Hi ,
I have two kafka topics (tracking and rules) and I would like to join "tracking" datastream with "rules" datastream as the data arrives in the "tracking" datastream. The problem with a join is that the rules only “survive” for the length of the window while I suspect that i want them to survive longer than that so that they can be applied to events arriving in the future. I tested ConnectedStream and CoFlatMapFunction but the result is not as I wait. For the execution: 1) I added 3 rules on "rules" topic (imei: "01","02,"03") 2) Perform 15 events with different imei but i guess i have problem with "keyby" Result : Code : ConnectedStreams<TrackEvent, RulesEvent> connectedStreams = inputEventStream.connect(inputRulesStream).keyBy("imei","imei"); DataStream<Tuple2<TrackEvent, RulesEvent>> ds= connectedStreams.flatMap(new CoFlatMapFunction<TrackEvent, RulesEvent, Tuple2<TrackEvent,RulesEvent>>() { Tuple2<TrackEvent,RulesEvent> t2=new Tuple2<TrackEvent, RulesEvent>(); @Override public void flatMap1(TrackEvent trackEvent, Collector<Tuple2<TrackEvent, RulesEvent>> collector) throws Exception { t2.f0=trackEvent; collector.collect(t2); // t2=new Tuple2<TrackEvent, RulesEvent>(); } @Override public void flatMap2(RulesEvent rulesEvent, Collector<Tuple2<TrackEvent, RulesEvent>> collector) throws Exception { t2.f1 = rulesEvent; //collector.collect(t2); } }); ds.printToErr(); Best, |
I think the issue is that t2 is not registered to keyed state, so it is being shared across all of the keys on that taskmanager. Take a look at this article: Basically you need to change t2 to be a ValueState[Tuple2[TrackEvent,RulesEvent]] and register it with a ValueStateDescriptor in in the function's open method. Then access it using t2.value() and t2.update(). Hopefully that helps. On Thu, May 4, 2017 at 9:17 AM, Tarek khal <[hidden email]> wrote: Hi , Jason Brelloch | Product Developer Subscribe to the BetterCloud Monitor - Get IT delivered to your inbox |
Hi Jason,
Thank you very much for your help, it solves my problem. Best regards, |
In reply to this post by Jason Brelloch
But what happened if some data can't be merged forever ?
The state will be saved forever? Can I set a timeout? |
Hi,
Yes, if you use ProcessFunction or CoProcessFunction you can (and should) set a timer to clean up state. Best, Aljoscha > On 7. May 2017, at 09:28, yunfan123 <[hidden email]> wrote: > > But what happened if some data can't be merged forever ? > The state will be saved forever? > Can I set a timeout? > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13037.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
private static class MergeFunction extends RichProcessFunction<Tuple2<Integer, ObjectNode>, Tuple2<Integer, ObjectNode>> {
private ValueState<Tuple2<Integer, ObjectNode>> state; @Override public void open(Configuration parameters) throws Exception { state = getRuntimeContext().getState(new ValueStateDescriptor<>("mystate", TypeInformation.of(getTypeOfTuple))); } @Override public void processElement(Tuple2<Integer, ObjectNode> tuple2, Context context, Collector<Tuple2<Integer, ObjectNode>> collector) throws Exception { //XXXXXX context.timerService().registerEventTimeTimer(System.currentTimeMillis() + interval); } @Override public void onTimer(long l, OnTimerContext onTimerContext, Collector<Tuple2<Integer, ObjectNode>> collector) throws Exception { if (state.value() != null) { collector.collect(state.value()); } } } In my understanding. If I set a timer in my class that extends RichProcessFunction, once the onTimer function be called. The object of MergeFunction can be recycled by gc. If I don't set a event timer in the processElement, it can be recycled by gc after end of processElement function. |
In upstairs example, it seems I should clear the state in onTimer function in order to free resource like follows:
public void onTimer(long l, OnTimerContext onTimerContext, Collector<Tuple2<Integer, ObjectNode>> collector) throws Exception { if (state.value() != null) { collector.collect(state.value()); state.update(null); } } |
Yes, that looks right.
> On 10. May 2017, at 14:56, yunfan123 <[hidden email]> wrote: > > In upstairs example, it seems I should clear the state in onTimer function in > order to free resource like follows: > public void onTimer(long l, OnTimerContext onTimerContext, > Collector<Tuple2<Integer, ObjectNode>> collector) throws Exception { > if (state.value() != null) { > collector.collect(state.value()); > state.update(null); > } > } > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13090.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Hello
I am little confused on when the state will be gc. For example, Example 1: Class abc extends RichProcessFunction<Tuple<>,Tuple<>> { public void processElement(......) { if(timer never set) { ctx.timerService().registerEventTimeTimer(...); } } public void onTimer(.....) { // do some work .... ctx.timerService().registerEventTimeTimer(...); } } In example 1, will it ever be garbage collected? Also, in example1 in processElement we are only once registering eventTimer. Will it be gc when the second event arrives? And if we have: Example 2 public void onTimer(.....) { // do some work .... // no timer registeration } Will it be gc after completion of onTimer ? |
In reply to this post by Tarek khal
@Jason I think there's a mistake in your explanation since each task in the task manager has its own copy of an operator instance, so the tuple may not be shared. State is a great solution but I think that's not the root cause. @Tarek What's the parallelism of your data stream? I think the reason may be the parallelism is 1. On Thu, May 4, 2017 at 10:39 PM Tarek khal <[hidden email]> wrote: Hi Jason, -- Liu, Renjie Software Engineer, MVAD |
Hello Renjie,
Yes, the parallelism is 1. what should i do pls ? Regards, |
Jason's solution is right, l'm just clarifying the mistake in the explanation. Tarek khal <[hidden email]>于2017年5月19日周五 下午7:11写道: Hello Renjie, -- Liu, Renjie Software Engineer, MVAD |
If I increase the parallelism operator, I risk losing shared state solution or it has nothing to do.
And if it's going to be an advantage, is it limited to what? I am new with this framework I find difficulty in some notions. Best Regards, |
Even if you increase the operator parallelism, you can still use the state operation. On Fri, May 19, 2017 at 7:47 PM Tarek khal <[hidden email]> wrote: If I increase the parallelism operator, I risk losing shared state solution -- Liu, Renjie Software Engineer, MVAD |
In reply to this post by gaurav
Hi,
The State will never be automatically GC’ed. You have to do it in the onTimer() callback, as mentioned earlier. Best, Aljoscha > On 19. May 2017, at 10:39, gaurav <[hidden email]> wrote: > > Hello > > I am little confused on when the state will be gc. For example, > > Example 1: > > Class abc extends RichProcessFunction<Tuple<>,Tuple<>> > { > public void processElement(......) > { > if(timer never set) > { > ctx.timerService().registerEventTimeTimer(...); > } > } > public void onTimer(.....) > { > // do some work .... > ctx.timerService().registerEventTimeTimer(...); > } > } > > In example 1, will it ever be garbage collected? Also, in example1 in > processElement we are only once registering eventTimer. Will it be gc when > the second event arrives? > > And if we have: > Example 2 > public void onTimer(.....) > { > // do some work .... > // no timer registeration > } > Will it be gc after completion of onTimer ? > > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999p13219.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Free forum by Nabble | Edit this page |