Hi All,
If I have high parallelism and use processFunction to registerEventTimeTimer, the timer never gets fired. After debugging, I found out the watermark isn't updated because I have keyBy right after assignTimestampsAndWatermarks. And if I set assignTimestampsAndWatermarks right after the keyBy, an exception is thrown. val contractFlow = enrichedFlow .keyBy(f => f.fiveTupleKey) .assignTimestampsAndWatermarks(new AggFlowTimestampAssigner) <<<<< .process(new FlowContractStitcherProcess) .name("contractStitcher") at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30) at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96) at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Any idea how to solve my problem ? How do I update the watermark after keyBy ? Would I hit scaling issue if on large number of timer if I use registerProcessingTimeTimer instead ? I'm using event time throughout the pipeline, would mixing processing timer with event time might cause problem down the line ? -- Fritz |
Sorry, missing copy paste for the exception thrown:
10/17/2017 20:21:30 dropDetection -> (aggFlowDropDetectPrintln -> Sink: Unnamed, aggFlowDropDetectPrintln -> Sink: Unnamed, Sink: kafkaSink)(3/4) switched to CANCELED 20:21:30,244 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Aggregate flows (313a46d5fd23e4c2d0d00d0033950b6d) switched from state FAILING to FAILED. java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation. at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:151) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:115) at FlowContractStitcherProcess.endState$lzycompute(FlowContractResolver.scala:30) at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30) at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96) at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) -- Fritz > On Oct 17, 2017, at 7:55 PM, Fritz Budiyanto <[hidden email]> wrote: > > Hi All, > > If I have high parallelism and use processFunction to registerEventTimeTimer, the timer never gets fired. > After debugging, I found out the watermark isn't updated because I have keyBy right after assignTimestampsAndWatermarks. > And if I set assignTimestampsAndWatermarks right after the keyBy, an exception is thrown. > > val contractFlow = enrichedFlow > .keyBy(f => f.fiveTupleKey) > .assignTimestampsAndWatermarks(new AggFlowTimestampAssigner) <<<<< > .process(new FlowContractStitcherProcess) > .name("contractStitcher") > > at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30) > at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96) > at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17) > at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68) > at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) > at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > > > Any idea how to solve my problem ? How do I update the watermark after keyBy ? > > Would I hit scaling issue if on large number of timer if I use registerProcessingTimeTimer instead ? I'm using event time throughout the pipeline, would mixing processing timer with event time might cause problem down the line ? > > -- > Fritz |
Hi Fritz,
If the watermark is not updating this usually means that one of the input partitions (if you're using Kafka) is not carrying data. In that case, the watermark/timestamp assigner will have no data on which to base an updated watermark. For such use cases I recently implemented a special watermark/timestamp assigner that will notice if a stream is idle and will then artificially advance the watermark. The code for this is available here: https://github.com/aljoscha/flink/commit/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f Does this apply to your case? Best, Aljoscha P.S. The exception is thrown because using timers/state is only allowed in an operation that directly follows a keyBy().
|
Free forum by Nabble | Edit this page |