Heya,
I don't see a section in the online manual dedicated to this topic, so I want to raise the question here: How should errors be handled? Specifically I'm thinking about streaming jobs, which are expected to "never go down". For example, errors can be raised at the point where objects are serialized to/from sources/sinks, and UDFs. Cascading provides failure traps [0] where erroneous tuples are saved off for post-processing. Is there any such functionality in Flink? I started down the road of implementing a Maybe/Optional type, a POJO Generic triple of <IN, OUT, Throwable> for capturing errors at each stage of a pipeline. However, Java type erasure means even though it compiles, the job is rejected at submission time. How are other people handling errors in their stream processing? Thanks, Nick |
Hi Nick,
these are some interesting Ideas. I have been thinking about this, maybe we can add a special output stream (for example Kafka, but can be generic) that would get errors/exceptions that where throws during processing. The actual processing would not stop and the messages in this special stream would contain some information about the current state of processing, the input element(s) and the machine/VM where computation is happening. In cascading you get the erroneous tuples at the end. This is not possible in streaming, therefore the more stream-y approach/solution. What do you think about that? Cheers, Aljoscha > On 11 Nov 2015, at 21:49, Nick Dimiduk <[hidden email]> wrote: > > Heya, > > I don't see a section in the online manual dedicated to this topic, so I want to raise the question here: How should errors be handled? Specifically I'm thinking about streaming jobs, which are expected to "never go down". For example, errors can be raised at the point where objects are serialized to/from sources/sinks, and UDFs. Cascading provides failure traps [0] where erroneous tuples are saved off for post-processing. Is there any such functionality in Flink? > > I started down the road of implementing a Maybe/Optional type, a POJO Generic triple of <IN, OUT, Throwable> for capturing errors at each stage of a pipeline. However, Java type erasure means even though it compiles, the job is rejected at submission time. > > How are other people handling errors in their stream processing? > > Thanks, > Nick > > [0]: http://docs.cascading.org/cascading/1.2/userguide/html/ch06s03.html |
I have been thinking about this, maybe we can add a special output stream (for example Kafka, but can be generic) that would get errors/exceptions that where throws during processing. The actual processing would not stop and the messages in this special stream would contain some information about the current state of processing, the input element(s) and the machine/VM where computation is happening. Yes, this is precisely what I have in mind. The goal is (1) to not lose input data, and (2) to make errors available for operator visibility. It's not very portable, but I was able to implement my Maybe<IN, OUT, Throwable> type. I can now use it as the output of all my source streams, and split those streams on the presence of the Throwable. With this, I'm able to trap certain forms of invalid input and send it to an errors sink. However, there are still some error cases that cause exceptions, apparently, outside of my UDF try block that cause the whole streaming job to terminate.
|
Hi Nick! The errors outside your UDF (such as network problems) will be handled by Flink and cause the job to go into recovery. They should be transparently handled. Just make sure you activate checkpointing for your job! Stephan On Mon, Nov 16, 2015 at 6:18 PM, Nick Dimiduk <[hidden email]> wrote:
|
Hi,
I don’t think that alleviates the problem. Sometimes you might want the system to continue even if stuff outside the UDF fails. For example, if a serializer does not work because of a null value somewhere. You would, however, like to get a message about this somewhere, I assume. Cheers, Aljoscha > On 16 Nov 2015, at 19:22, Stephan Ewen <[hidden email]> wrote: > > Hi Nick! > > The errors outside your UDF (such as network problems) will be handled by Flink and cause the job to go into recovery. They should be transparently handled. > > Just make sure you activate checkpointing for your job! > > Stephan > > > On Mon, Nov 16, 2015 at 6:18 PM, Nick Dimiduk <[hidden email]> wrote: > I have been thinking about this, maybe we can add a special output stream (for example Kafka, but can be generic) that would get errors/exceptions that where throws during processing. The actual processing would not stop and the messages in this special stream would contain some information about the current state of processing, the input element(s) and the machine/VM where computation is happening. > > Yes, this is precisely what I have in mind. The goal is (1) to not lose input data, and (2) to make errors available for operator visibility. > > It's not very portable, but I was able to implement my Maybe<IN, OUT, Throwable> type. I can now use it as the output of all my source streams, and split those streams on the presence of the Throwable. With this, I'm able to trap certain forms of invalid input and send it to an errors sink. However, there are still some error cases that cause exceptions, apparently, outside of my UDF try block that cause the whole streaming job to terminate. > > > On 11 Nov 2015, at 21:49, Nick Dimiduk <[hidden email]> wrote: > > > > Heya, > > > > I don't see a section in the online manual dedicated to this topic, so I want to raise the question here: How should errors be handled? Specifically I'm thinking about streaming jobs, which are expected to "never go down". For example, errors can be raised at the point where objects are serialized to/from sources/sinks, and UDFs. Cascading provides failure traps [0] where erroneous tuples are saved off for post-processing. Is there any such functionality in Flink? > > > > I started down the road of implementing a Maybe/Optional type, a POJO Generic triple of <IN, OUT, Throwable> for capturing errors at each stage of a pipeline. However, Java type erasure means even though it compiles, the job is rejected at submission time. > > > > How are other people handling errors in their stream processing? > > > > Thanks, > > Nick > > > > [0]: http://docs.cascading.org/cascading/1.2/userguide/html/ch06s03.html > > > |
Makes sense. The class of operations that work "per-tuple" before the data is forwarded to the network stack could be extended to have error traps. @Nick: Is that what you had in mind? On Mon, Nov 16, 2015 at 7:27 PM, Aljoscha Krettek <[hidden email]> wrote: Hi, |
The errors outside your UDF (such as network problems) will be handled by Flink and cause the job to go into recovery. They should be transparently handled. Is that so? I've been able to feed bad data onto my kafka topic and cause the stream job to abort. You're saying this should not be the case? I have not started exploring high availability or checkpointing, so let me spend some time on those features before filing tickets. Sometimes you might want the system to continue even if stuff outside the UDF fails. For example, if a serializer does not work because of a null value somewhere. You would, however, like to get a message about this somewhere, I assume. Exactly right.
Yes, that sounds like what I'd be looking for.
|
Hi Nick, regarding the Kafka example: What happens is that the FlinkKafkaConsumer will throw an exception. The JobManager then cancels the entire job and restarts it. It will then try to continue reading from the last valid checkpoint or the consumer offset in zookeeper. Since the data in the topic is still corrupt, the job will fail again ... On Mon, Nov 16, 2015 at 8:28 PM, Nick Dimiduk <[hidden email]> wrote:
|
This post was updated on .
Hi,
I have this architecture: kafka topic -> flink kafka stream -> flink custom sink to save data in a Postgresql database. For testing how the system will behave if an error occurs, I've done the following test: Activate checkpoints on my DataStream and put on kafka topic one item with special value on some field and throw an error when processing that item. What have I discovered: - When the error is thrown inside DeserializationSchema implementation everything is fine, the job is recovered as it says in documentation. - BUT when the error is thrown inside invoke method's implementation from RichSinkFunction, the recovery is not done and also no further items are processed even if the kafka consumer is working fine. Is this normal? Here are some logs: 2016-11-15 17:44:20,000 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Caught exception while processing timer. java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) 2016-11-15 17:44:20,036 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service is shutting down. 2016-11-15 17:44:20,036 ERROR org.apache.flink.runtime.taskmanager.Task - Task execution failed. TimerException{java.lang.RuntimeException: Could not forward element to next operator} at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) |
Hi, is that the complete stack trace or is there more to it? I cannot really see where the exception originates. Cheers, Aljoscha On Wed, 16 Nov 2016 at 10:38 criss <[hidden email]> wrote: Hi, |
Hi,
Here is the code which triggers the error(part of sink): @Override public void invoke(KafkaLog value) throws Exception { ...................... if (arg instanceof String && "error".equals((String)arg)) { throw new IOException("search for error"); } ........................... } And here's the entire stack trace that I have from log file: 2016-11-15 17:44:20,000 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Caught exception while processing timer. java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56) at mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508) at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: search for error at mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48) at mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) ... 18 more 2016-11-15 17:44:20,036 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service is shutting down. 2016-11-15 17:44:20,036 ERROR org.apache.flink.runtime.taskmanager.Task - Task execution failed. TimerException{java.lang.RuntimeException: Could not forward element to next operator} at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56) at mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508) at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796) ... 7 more Caused by: java.io.IOException: search for error at mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48) at mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) ... 18 more 2016-11-15 17:44:20,037 INFO org.apache.flink.runtime.taskmanager.Task - TriggerWindow(TumblingProcessingTimeWindows(10000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@72b9f6df}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) -> Sink: Unnamed (1/2) switched to FAILED with exception. TimerException{java.lang.RuntimeException: Could not forward element to next operator} at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56) at mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508) at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796) ... 7 more Caused by: java.io.IOException: search for error at mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48) at mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) ... 18 more |
Hmm, I still don't know what could be causing this. Which version of Flink are you using? Also, when you say "BUT when the error is thrown inside invoke implementation from RichSinkFunction, the recovery is not done and also no further items are processed even if the kafka consumer is working fine. " you mean that the job will simply stop and not try restarting, right? Have you set anything as the restarting strategy? Cheers, Aljoscha On Thu, 17 Nov 2016 at 09:48 criss <[hidden email]> wrote: Hi, |
In reply to this post by Nick Dimiduk
Hi,
Wondering if any of these ideas were implemented after the discussion? Thanks, Vishnu -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Was referring to the original email thread: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-handling-td3448.html On Tue, May 8, 2018 at 5:29 PM, vishnuviswanath <[hidden email]> wrote: Hi, |
I'm not aware of any changes made in
this direction.
On 08.05.2018 23:30, Vishnu Viswanath wrote:
|
Free forum by Nabble | Edit this page |