Parallelism, registerEventTimeTimer and watermark problem

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Parallelism, registerEventTimeTimer and watermark problem

Fritz Budiyanto
Hi All,

If I have high parallelism and use processFunction to registerEventTimeTimer, the timer never gets fired.
After debugging, I found out the watermark isn't updated because I have keyBy right after assignTimestampsAndWatermarks.
And if I set assignTimestampsAndWatermarks right after the keyBy, an exception is thrown.

 val contractFlow = enrichedFlow
      .keyBy(f => f.fiveTupleKey)
      .assignTimestampsAndWatermarks(new AggFlowTimestampAssigner) <<<<<
      .process(new FlowContractStitcherProcess)
      .name("contractStitcher")

at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30)
        at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96)
        at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17)
        at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
        at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)


Any idea how to solve my problem ? How do I update the watermark after keyBy ?

Would I hit scaling issue if on large number of timer if I use registerProcessingTimeTimer instead ? I'm using event time throughout the pipeline, would mixing processing timer with event time might cause problem down the line ?

--
Fritz
Reply | Threaded
Open this post in threaded view
|

Re: Parallelism, registerEventTimeTimer and watermark problem

Fritz Budiyanto
Sorry, missing copy paste for the exception thrown:

10/17/2017 20:21:30 dropDetection -> (aggFlowDropDetectPrintln -> Sink: Unnamed, aggFlowDropDetectPrintln -> Sink: Unnamed, Sink: kafkaSink)(3/4) switched to CANCELED
20:21:30,244 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Aggregate flows (313a46d5fd23e4c2d0d00d0033950b6d) switched from state FAILING to FAILED.
java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
        at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
        at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:151)
        at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:115)
        at FlowContractStitcherProcess.endState$lzycompute(FlowContractResolver.scala:30)
        at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30)
        at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96)
        at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17)
        at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
        at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)


--
Fritz


> On Oct 17, 2017, at 7:55 PM, Fritz Budiyanto <[hidden email]> wrote:
>
> Hi All,
>
> If I have high parallelism and use processFunction to registerEventTimeTimer, the timer never gets fired.
> After debugging, I found out the watermark isn't updated because I have keyBy right after assignTimestampsAndWatermarks.
> And if I set assignTimestampsAndWatermarks right after the keyBy, an exception is thrown.
>
> val contractFlow = enrichedFlow
>      .keyBy(f => f.fiveTupleKey)
>      .assignTimestampsAndWatermarks(new AggFlowTimestampAssigner) <<<<<
>      .process(new FlowContractStitcherProcess)
>      .name("contractStitcher")
>
> at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30)
> at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96)
> at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17)
> at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Any idea how to solve my problem ? How do I update the watermark after keyBy ?
>
> Would I hit scaling issue if on large number of timer if I use registerProcessingTimeTimer instead ? I'm using event time throughout the pipeline, would mixing processing timer with event time might cause problem down the line ?
>
> --
> Fritz

Reply | Threaded
Open this post in threaded view
|

Re: Parallelism, registerEventTimeTimer and watermark problem

Aljoscha Krettek
Hi Fritz,

If the watermark is not updating this usually means that one of the input partitions (if you're using Kafka) is not carrying data. In that case, the watermark/timestamp assigner will have no data on which to base an updated watermark. For such use cases I recently implemented a special watermark/timestamp assigner that will notice if a stream is idle and will then artificially advance the watermark. The code for this is available here: https://github.com/aljoscha/flink/commit/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f

Does this apply to your case?

Best,
Aljoscha

P.S. The exception is thrown because using timers/state is only allowed in an operation that directly follows a keyBy().

On 18. Oct 2017, at 05:23, Fritz Budiyanto <[hidden email]> wrote:

Sorry, missing copy paste for the exception thrown:

10/17/2017 20:21:30 dropDetection -> (aggFlowDropDetectPrintln -> Sink: Unnamed, aggFlowDropDetectPrintln -> Sink: Unnamed, Sink: kafkaSink)(3/4) switched to CANCELED
20:21:30,244 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Aggregate flows (313a46d5fd23e4c2d0d00d0033950b6d) switched from state FAILING to FAILED.
java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:151)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:115)
at FlowContractStitcherProcess.endState$lzycompute(FlowContractResolver.scala:30)
at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30)
at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96)
at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)


--
Fritz


On Oct 17, 2017, at 7:55 PM, Fritz Budiyanto <[hidden email]> wrote:

Hi All,

If I have high parallelism and use processFunction to registerEventTimeTimer, the timer never gets fired.
After debugging, I found out the watermark isn't updated because I have keyBy right after assignTimestampsAndWatermarks.
And if I set assignTimestampsAndWatermarks right after the keyBy, an exception is thrown.

val contractFlow = enrichedFlow
    .keyBy(f => f.fiveTupleKey)
    .assignTimestampsAndWatermarks(new AggFlowTimestampAssigner) <<<<<
    .process(new FlowContractStitcherProcess)
    .name("contractStitcher")

at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30)
at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96)
at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)


Any idea how to solve my problem ? How do I update the watermark after keyBy ?

Would I hit scaling issue if on large number of timer if I use registerProcessingTimeTimer instead ? I'm using event time throughout the pipeline, would mixing processing timer with event time might cause problem down the line ?

--
Fritz