Flink kinesis connector crashes with daemon exception

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink kinesis connector crashes with daemon exception

Rahul Jain
We are using flink 1.7.2 on debian slim, and kubernetes as the resource manager. But when we deploy it, it works for an hour or so without any issues and then starts failing with the following error:

java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
    at com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction$$anonfun$process$2.apply(SessionProcessingFunction.scala:37)
    at com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction$$anonfun$process$2.apply(SessionProcessingFunction.scala:33)
    at scala.collection.immutable.Stream.foreach(Stream.scala:594)
    at com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction.process(SessionProcessingFunction.scala:33)
    at com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction.process(SessionProcessingFunction.scala:13)
    at org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
    at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
    at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
    ... 7 more
Caused by: org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.DaemonException: The child process has been shutdown and can no longer accept messages.
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.add(Daemon.java:176)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536)
    at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.invoke(FlinkKinesisProducer.java:293)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
    ... 26 more

We searched through the mailing list archives and this seems to have been a case with Flink 1.4 and older versions of KPL client. But we are seeing this with the latest version as well.

Does anyone know why this is happening? 

Thanks,
Rahul