Error handling

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

Error handling

Nick Dimiduk
Heya,

I don't see a section in the online manual dedicated to this topic, so I want to raise the question here: How should errors be handled? Specifically I'm thinking about streaming jobs, which are expected to "never go down". For example, errors can be raised at the point where objects are serialized to/from sources/sinks, and UDFs. Cascading provides failure traps [0] where erroneous tuples are saved off for post-processing. Is there any such functionality in Flink?

I started down the road of implementing a Maybe/Optional type, a POJO Generic triple of <IN, OUT, Throwable> for capturing errors at each stage of a pipeline. However, Java type erasure means even though it compiles, the job is rejected at submission time.

How are other people handling errors in their stream processing?

Thanks,
Nick

Reply | Threaded
Open this post in threaded view
|

Re: Error handling

Aljoscha Krettek
Hi Nick,
these are some interesting Ideas. I have been thinking about this, maybe we can add a special output stream (for example Kafka, but can be generic) that would get errors/exceptions that where throws during processing. The actual processing would not stop and the messages in this special stream would contain some information about the current state of processing, the input element(s) and the machine/VM where computation is happening.

In cascading you get the erroneous tuples at the end. This is not possible in streaming, therefore the more stream-y approach/solution.

What do you think about that?

Cheers,
Aljoscha

> On 11 Nov 2015, at 21:49, Nick Dimiduk <[hidden email]> wrote:
>
> Heya,
>
> I don't see a section in the online manual dedicated to this topic, so I want to raise the question here: How should errors be handled? Specifically I'm thinking about streaming jobs, which are expected to "never go down". For example, errors can be raised at the point where objects are serialized to/from sources/sinks, and UDFs. Cascading provides failure traps [0] where erroneous tuples are saved off for post-processing. Is there any such functionality in Flink?
>
> I started down the road of implementing a Maybe/Optional type, a POJO Generic triple of <IN, OUT, Throwable> for capturing errors at each stage of a pipeline. However, Java type erasure means even though it compiles, the job is rejected at submission time.
>
> How are other people handling errors in their stream processing?
>
> Thanks,
> Nick
>
> [0]: http://docs.cascading.org/cascading/1.2/userguide/html/ch06s03.html

Reply | Threaded
Open this post in threaded view
|

Re: Error handling

Nick Dimiduk
I have been thinking about this, maybe we can add a special output stream (for example Kafka, but can be generic) that would get errors/exceptions that where throws during processing. The actual processing would not stop and the messages in this special stream would contain some information about the current state of processing, the input element(s) and the machine/VM where computation is happening.

Yes, this is precisely what I have in mind. The goal is (1) to not lose input data, and (2) to make errors available for operator visibility.

It's not very portable, but I was able to implement my Maybe<IN, OUT, Throwable> type. I can now use it as the output of all my source streams, and split those streams on the presence of the Throwable. With this, I'm able to trap certain forms of invalid input and send it to an errors sink. However, there are still some error cases that cause exceptions, apparently, outside of my UDF try block that cause the whole streaming job to terminate.
 
> On 11 Nov 2015, at 21:49, Nick Dimiduk <[hidden email]> wrote:
>
> Heya,
>
> I don't see a section in the online manual dedicated to this topic, so I want to raise the question here: How should errors be handled? Specifically I'm thinking about streaming jobs, which are expected to "never go down". For example, errors can be raised at the point where objects are serialized to/from sources/sinks, and UDFs. Cascading provides failure traps [0] where erroneous tuples are saved off for post-processing. Is there any such functionality in Flink?
>
> I started down the road of implementing a Maybe/Optional type, a POJO Generic triple of <IN, OUT, Throwable> for capturing errors at each stage of a pipeline. However, Java type erasure means even though it compiles, the job is rejected at submission time.
>
> How are other people handling errors in their stream processing?
>
> Thanks,
> Nick
>
> [0]: http://docs.cascading.org/cascading/1.2/userguide/html/ch06s03.html


Reply | Threaded
Open this post in threaded view
|

Re: Error handling

Stephan Ewen
Hi Nick!

The errors outside your UDF (such as network problems) will be handled by Flink and cause the job to go into recovery. They should be transparently handled.

Just make sure you activate checkpointing for your job!

Stephan


On Mon, Nov 16, 2015 at 6:18 PM, Nick Dimiduk <[hidden email]> wrote:
I have been thinking about this, maybe we can add a special output stream (for example Kafka, but can be generic) that would get errors/exceptions that where throws during processing. The actual processing would not stop and the messages in this special stream would contain some information about the current state of processing, the input element(s) and the machine/VM where computation is happening.

Yes, this is precisely what I have in mind. The goal is (1) to not lose input data, and (2) to make errors available for operator visibility.

It's not very portable, but I was able to implement my Maybe<IN, OUT, Throwable> type. I can now use it as the output of all my source streams, and split those streams on the presence of the Throwable. With this, I'm able to trap certain forms of invalid input and send it to an errors sink. However, there are still some error cases that cause exceptions, apparently, outside of my UDF try block that cause the whole streaming job to terminate.
 
> On 11 Nov 2015, at 21:49, Nick Dimiduk <[hidden email]> wrote:
>
> Heya,
>
> I don't see a section in the online manual dedicated to this topic, so I want to raise the question here: How should errors be handled? Specifically I'm thinking about streaming jobs, which are expected to "never go down". For example, errors can be raised at the point where objects are serialized to/from sources/sinks, and UDFs. Cascading provides failure traps [0] where erroneous tuples are saved off for post-processing. Is there any such functionality in Flink?
>
> I started down the road of implementing a Maybe/Optional type, a POJO Generic triple of <IN, OUT, Throwable> for capturing errors at each stage of a pipeline. However, Java type erasure means even though it compiles, the job is rejected at submission time.
>
> How are other people handling errors in their stream processing?
>
> Thanks,
> Nick
>
> [0]: http://docs.cascading.org/cascading/1.2/userguide/html/ch06s03.html



Reply | Threaded
Open this post in threaded view
|

Re: Error handling

Aljoscha Krettek
Hi,
I don’t think that alleviates the problem. Sometimes you might want the system to continue even if stuff outside the UDF fails. For example, if a serializer does not work because of a null value somewhere. You would, however, like to get a message about this somewhere, I assume.

Cheers,
Aljoscha

> On 16 Nov 2015, at 19:22, Stephan Ewen <[hidden email]> wrote:
>
> Hi Nick!
>
> The errors outside your UDF (such as network problems) will be handled by Flink and cause the job to go into recovery. They should be transparently handled.
>
> Just make sure you activate checkpointing for your job!
>
> Stephan
>
>
> On Mon, Nov 16, 2015 at 6:18 PM, Nick Dimiduk <[hidden email]> wrote:
> I have been thinking about this, maybe we can add a special output stream (for example Kafka, but can be generic) that would get errors/exceptions that where throws during processing. The actual processing would not stop and the messages in this special stream would contain some information about the current state of processing, the input element(s) and the machine/VM where computation is happening.
>
> Yes, this is precisely what I have in mind. The goal is (1) to not lose input data, and (2) to make errors available for operator visibility.
>
> It's not very portable, but I was able to implement my Maybe<IN, OUT, Throwable> type. I can now use it as the output of all my source streams, and split those streams on the presence of the Throwable. With this, I'm able to trap certain forms of invalid input and send it to an errors sink. However, there are still some error cases that cause exceptions, apparently, outside of my UDF try block that cause the whole streaming job to terminate.
>  
> > On 11 Nov 2015, at 21:49, Nick Dimiduk <[hidden email]> wrote:
> >
> > Heya,
> >
> > I don't see a section in the online manual dedicated to this topic, so I want to raise the question here: How should errors be handled? Specifically I'm thinking about streaming jobs, which are expected to "never go down". For example, errors can be raised at the point where objects are serialized to/from sources/sinks, and UDFs. Cascading provides failure traps [0] where erroneous tuples are saved off for post-processing. Is there any such functionality in Flink?
> >
> > I started down the road of implementing a Maybe/Optional type, a POJO Generic triple of <IN, OUT, Throwable> for capturing errors at each stage of a pipeline. However, Java type erasure means even though it compiles, the job is rejected at submission time.
> >
> > How are other people handling errors in their stream processing?
> >
> > Thanks,
> > Nick
> >
> > [0]: http://docs.cascading.org/cascading/1.2/userguide/html/ch06s03.html
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Error handling

Stephan Ewen
Makes sense. The class of operations that work "per-tuple" before the data is forwarded to the network stack could be extended to have error traps.

@Nick: Is that what you had in mind?

On Mon, Nov 16, 2015 at 7:27 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I don’t think that alleviates the problem. Sometimes you might want the system to continue even if stuff outside the UDF fails. For example, if a serializer does not work because of a null value somewhere. You would, however, like to get a message about this somewhere, I assume.

Cheers,
Aljoscha
> On 16 Nov 2015, at 19:22, Stephan Ewen <[hidden email]> wrote:
>
> Hi Nick!
>
> The errors outside your UDF (such as network problems) will be handled by Flink and cause the job to go into recovery. They should be transparently handled.
>
> Just make sure you activate checkpointing for your job!
>
> Stephan
>
>
> On Mon, Nov 16, 2015 at 6:18 PM, Nick Dimiduk <[hidden email]> wrote:
> I have been thinking about this, maybe we can add a special output stream (for example Kafka, but can be generic) that would get errors/exceptions that where throws during processing. The actual processing would not stop and the messages in this special stream would contain some information about the current state of processing, the input element(s) and the machine/VM where computation is happening.
>
> Yes, this is precisely what I have in mind. The goal is (1) to not lose input data, and (2) to make errors available for operator visibility.
>
> It's not very portable, but I was able to implement my Maybe<IN, OUT, Throwable> type. I can now use it as the output of all my source streams, and split those streams on the presence of the Throwable. With this, I'm able to trap certain forms of invalid input and send it to an errors sink. However, there are still some error cases that cause exceptions, apparently, outside of my UDF try block that cause the whole streaming job to terminate.
>
> > On 11 Nov 2015, at 21:49, Nick Dimiduk <[hidden email]> wrote:
> >
> > Heya,
> >
> > I don't see a section in the online manual dedicated to this topic, so I want to raise the question here: How should errors be handled? Specifically I'm thinking about streaming jobs, which are expected to "never go down". For example, errors can be raised at the point where objects are serialized to/from sources/sinks, and UDFs. Cascading provides failure traps [0] where erroneous tuples are saved off for post-processing. Is there any such functionality in Flink?
> >
> > I started down the road of implementing a Maybe/Optional type, a POJO Generic triple of <IN, OUT, Throwable> for capturing errors at each stage of a pipeline. However, Java type erasure means even though it compiles, the job is rejected at submission time.
> >
> > How are other people handling errors in their stream processing?
> >
> > Thanks,
> > Nick
> >
> > [0]: http://docs.cascading.org/cascading/1.2/userguide/html/ch06s03.html
>
>
>


Reply | Threaded
Open this post in threaded view
|

Re: Error handling

Nick Dimiduk
The errors outside your UDF (such as network problems) will be handled by Flink and cause the job to go into recovery. They should be transparently handled.

Is that so? I've been able to feed bad data onto my kafka topic and cause the stream job to abort. You're saying this should not be the case? I have not started exploring high availability or checkpointing, so let me spend some time on those features before filing tickets.

Sometimes you might want the system to continue even if stuff outside the UDF fails. For example, if a serializer does not work because of a null value somewhere. You would, however, like to get a message about this somewhere, I assume.

Exactly right.

Makes sense. The class of operations that work "per-tuple" before the data is forwarded to the network stack could be extended to have error traps.

@Nick: Is that what you had in mind?

Yes, that sounds like what I'd be looking for.

On Mon, Nov 16, 2015 at 7:27 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I don’t think that alleviates the problem. Sometimes you might want the system to continue even if stuff outside the UDF fails. For example, if a serializer does not work because of a null value somewhere. You would, however, like to get a message about this somewhere, I assume.

Cheers,
Aljoscha
> On 16 Nov 2015, at 19:22, Stephan Ewen <[hidden email]> wrote:
>
> Hi Nick!
>
> The errors outside your UDF (such as network problems) will be handled by Flink and cause the job to go into recovery. They should be transparently handled.
>
> Just make sure you activate checkpointing for your job!
>
> Stephan
>
>
> On Mon, Nov 16, 2015 at 6:18 PM, Nick Dimiduk <[hidden email]> wrote:
> I have been thinking about this, maybe we can add a special output stream (for example Kafka, but can be generic) that would get errors/exceptions that where throws during processing. The actual processing would not stop and the messages in this special stream would contain some information about the current state of processing, the input element(s) and the machine/VM where computation is happening.
>
> Yes, this is precisely what I have in mind. The goal is (1) to not lose input data, and (2) to make errors available for operator visibility.
>
> It's not very portable, but I was able to implement my Maybe<IN, OUT, Throwable> type. I can now use it as the output of all my source streams, and split those streams on the presence of the Throwable. With this, I'm able to trap certain forms of invalid input and send it to an errors sink. However, there are still some error cases that cause exceptions, apparently, outside of my UDF try block that cause the whole streaming job to terminate.
>
> > On 11 Nov 2015, at 21:49, Nick Dimiduk <[hidden email]> wrote:
> >
> > Heya,
> >
> > I don't see a section in the online manual dedicated to this topic, so I want to raise the question here: How should errors be handled? Specifically I'm thinking about streaming jobs, which are expected to "never go down". For example, errors can be raised at the point where objects are serialized to/from sources/sinks, and UDFs. Cascading provides failure traps [0] where erroneous tuples are saved off for post-processing. Is there any such functionality in Flink?
> >
> > I started down the road of implementing a Maybe/Optional type, a POJO Generic triple of <IN, OUT, Throwable> for capturing errors at each stage of a pipeline. However, Java type erasure means even though it compiles, the job is rejected at submission time.
> >
> > How are other people handling errors in their stream processing?
> >
> > Thanks,
> > Nick
> >
> > [0]: http://docs.cascading.org/cascading/1.2/userguide/html/ch06s03.html
>
>
>



Reply | Threaded
Open this post in threaded view
|

Re: Error handling

rmetzger0
Hi Nick,

regarding the Kafka example: What happens is that the FlinkKafkaConsumer will throw an exception. The JobManager then cancels the entire job and restarts it.
It will then try to continue reading from the last valid checkpoint or the consumer offset in zookeeper. 
Since the data in the topic is still corrupt, the job will fail again ...


On Mon, Nov 16, 2015 at 8:28 PM, Nick Dimiduk <[hidden email]> wrote:
The errors outside your UDF (such as network problems) will be handled by Flink and cause the job to go into recovery. They should be transparently handled.

Is that so? I've been able to feed bad data onto my kafka topic and cause the stream job to abort. You're saying this should not be the case? I have not started exploring high availability or checkpointing, so let me spend some time on those features before filing tickets.

Sometimes you might want the system to continue even if stuff outside the UDF fails. For example, if a serializer does not work because of a null value somewhere. You would, however, like to get a message about this somewhere, I assume.

Exactly right.

Makes sense. The class of operations that work "per-tuple" before the data is forwarded to the network stack could be extended to have error traps.

@Nick: Is that what you had in mind?

Yes, that sounds like what I'd be looking for.

On Mon, Nov 16, 2015 at 7:27 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I don’t think that alleviates the problem. Sometimes you might want the system to continue even if stuff outside the UDF fails. For example, if a serializer does not work because of a null value somewhere. You would, however, like to get a message about this somewhere, I assume.

Cheers,
Aljoscha
> On 16 Nov 2015, at 19:22, Stephan Ewen <[hidden email]> wrote:
>
> Hi Nick!
>
> The errors outside your UDF (such as network problems) will be handled by Flink and cause the job to go into recovery. They should be transparently handled.
>
> Just make sure you activate checkpointing for your job!
>
> Stephan
>
>
> On Mon, Nov 16, 2015 at 6:18 PM, Nick Dimiduk <[hidden email]> wrote:
> I have been thinking about this, maybe we can add a special output stream (for example Kafka, but can be generic) that would get errors/exceptions that where throws during processing. The actual processing would not stop and the messages in this special stream would contain some information about the current state of processing, the input element(s) and the machine/VM where computation is happening.
>
> Yes, this is precisely what I have in mind. The goal is (1) to not lose input data, and (2) to make errors available for operator visibility.
>
> It's not very portable, but I was able to implement my Maybe<IN, OUT, Throwable> type. I can now use it as the output of all my source streams, and split those streams on the presence of the Throwable. With this, I'm able to trap certain forms of invalid input and send it to an errors sink. However, there are still some error cases that cause exceptions, apparently, outside of my UDF try block that cause the whole streaming job to terminate.
>
> > On 11 Nov 2015, at 21:49, Nick Dimiduk <[hidden email]> wrote:
> >
> > Heya,
> >
> > I don't see a section in the online manual dedicated to this topic, so I want to raise the question here: How should errors be handled? Specifically I'm thinking about streaming jobs, which are expected to "never go down". For example, errors can be raised at the point where objects are serialized to/from sources/sinks, and UDFs. Cascading provides failure traps [0] where erroneous tuples are saved off for post-processing. Is there any such functionality in Flink?
> >
> > I started down the road of implementing a Maybe/Optional type, a POJO Generic triple of <IN, OUT, Throwable> for capturing errors at each stage of a pipeline. However, Java type erasure means even though it compiles, the job is rejected at submission time.
> >
> > How are other people handling errors in their stream processing?
> >
> > Thanks,
> > Nick
> >
> > [0]: http://docs.cascading.org/cascading/1.2/userguide/html/ch06s03.html
>
>
>




Reply | Threaded
Open this post in threaded view
|

Re: Error handling

criss
This post was updated on .
Hi,

I have this architecture: kafka topic -> flink kafka stream -> flink custom sink to save data in a Postgresql database.
For testing how the system will behave if an error occurs, I've done the following test:
Activate checkpoints on my DataStream and put on kafka topic one item with special value on some field and throw an error when processing that item.
What have I discovered:
- When the error is thrown inside DeserializationSchema implementation everything is fine, the job is recovered as it says in documentation.
- BUT when the error is thrown inside invoke method's implementation from RichSinkFunction, the recovery is not done and also no further items are processed even if the kafka consumer is working fine. Is this normal?
Here are some logs:
2016-11-15 17:44:20,000 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught exception while processing timer.
java.lang.RuntimeException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
2016-11-15 17:44:20,036 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
2016-11-15 17:44:20,036 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed.
TimerException{java.lang.RuntimeException: Could not forward element to next operator}
        at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
Reply | Threaded
Open this post in threaded view
|

Re: Error handling

Aljoscha Krettek
Hi,
is that the complete stack trace or is there more to it? I cannot really see where the exception originates.

Cheers,
Aljoscha

On Wed, 16 Nov 2016 at 10:38 criss <[hidden email]> wrote:
Hi,

I have this, architecture: kafka topic -> flink kafka stream -> flink custom
sink to save data in a Postgresql database.
For testing how the system will behave if an error occurs, I've done the
following test:
Activate checkpoints on my DataStream and put on kafka topic one item with
special value on some field and throw an error when processing that item.
What have I discovered:
- When the error is thrown inside DeserializationSchema implementation
everything is fine, the job is recovered as it says in documentation.
- BUT when the error is thrown inside invoke implementation from
RichSinkFunction, the recovery is not done and also no further items are
processed even if the kafka consumer is working fine. Is this normal?
Here are some logs:
2016-11-15 17:44:20,000 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught
exception while processing timer.
java.lang.RuntimeException: Could not forward element to next operator
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
2016-11-15 17:44:20,036 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer
service is shutting down.
2016-11-15 17:44:20,036 ERROR org.apache.flink.runtime.taskmanager.Task
- Task execution failed.
TimerException{java.lang.RuntimeException: Could not forward element to next
operator}
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to next
operator
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-handling-tp3448p10141.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Error handling

criss
Hi,

Here is the code which triggers the error(part of sink):
@Override
public void invoke(KafkaLog value) throws Exception {
        ......................
        if (arg instanceof String && "error".equals((String)arg)) {
                throw new IOException("search for error");
        }
        ...........................
}

And here's the entire stack trace that I have from log file:

2016-11-15 17:44:20,000 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught exception while processing timer.
java.lang.RuntimeException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56)
        at mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50)
        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50)
        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: search for error
        at mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48)
        at mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23)
        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
        ... 18 more
2016-11-15 17:44:20,036 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
2016-11-15 17:44:20,036 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed.
TimerException{java.lang.RuntimeException: Could not forward element to next operator}
        at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56)
        at mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50)
        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50)
        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796)
        ... 7 more
Caused by: java.io.IOException: search for error
        at mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48)
        at mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23)
        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
        ... 18 more
2016-11-15 17:44:20,037 INFO  org.apache.flink.runtime.taskmanager.Task                     - TriggerWindow(TumblingProcessingTimeWindows(10000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@72b9f6df}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) -> Sink: Unnamed (1/2) switched to FAILED with exception.
TimerException{java.lang.RuntimeException: Could not forward element to next operator}
        at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56)
        at mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50)
        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50)
        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796)
        ... 7 more
Caused by: java.io.IOException: search for error
        at mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48)
        at mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23)
        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
        ... 18 more

Reply | Threaded
Open this post in threaded view
|

Re: Error handling

Aljoscha Krettek
Hmm, I still don't know what could be causing this. Which version of Flink are you using? Also, when you say
"BUT when the error is thrown inside invoke implementation from
RichSinkFunction, the recovery is not done and also no further items are
processed even if the kafka consumer is working fine. "

you mean that the job will simply stop and not try restarting, right? Have you set anything as the restarting strategy?

Cheers,
Aljoscha

On Thu, 17 Nov 2016 at 09:48 criss <[hidden email]> wrote:
Hi,

Here is the code which triggers the error(part of sink):
@Override
public void invoke(KafkaLog value) throws Exception {
        ......................
        if (arg instanceof String && "error".equals((String)arg)) {
                throw new IOException("search for error");
        }
        ...........................
}

And here's the entire stack trace that I have from log file:

2016-11-15 17:44:20,000 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught
exception while processing timer.
java.lang.RuntimeException: Could not forward element to next operator
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
        at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at
mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56)
        at
mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50)
        at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50)
        at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36)
        at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543)
        at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: search for error
        at
mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48)
        at
mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23)
        at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
        ... 18 more
2016-11-15 17:44:20,036 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer
service is shutting down.
2016-11-15 17:44:20,036 ERROR org.apache.flink.runtime.taskmanager.Task
- Task execution failed.
TimerException{java.lang.RuntimeException: Could not forward element to next
operator}
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to next
operator
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
        at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at
mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56)
        at
mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50)
        at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50)
        at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36)
        at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543)
        at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796)
        ... 7 more
Caused by: java.io.IOException: search for error
        at
mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48)
        at
mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23)
        at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
        ... 18 more
2016-11-15 17:44:20,037 INFO  org.apache.flink.runtime.taskmanager.Task
- TriggerWindow(TumblingProcessingTimeWindows(10000),
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@72b9f6df},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->
Sink: Unnamed (1/2) switched to FAILED with exception.
TimerException{java.lang.RuntimeException: Could not forward element to next
operator}
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to next
operator
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
        at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at
mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56)
        at
mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50)
        at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50)
        at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36)
        at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543)
        at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796)
        ... 7 more
Caused by: java.io.IOException: search for error
        at
mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48)
        at
mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23)
        at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
        ... 18 more





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-handling-tp3448p10168.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Error handling

vishnuviswanath
In reply to this post by Nick Dimiduk
Hi,

Wondering if any of these ideas were implemented after the discussion?

Thanks,
Vishnu



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Error handling

vishnuviswanath

On Tue, May 8, 2018 at 5:29 PM, vishnuviswanath <[hidden email]> wrote:
Hi,

Wondering if any of these ideas were implemented after the discussion?

Thanks,
Vishnu



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Error handling

Chesnay Schepler
I'm not aware of any changes made in this direction.

On 08.05.2018 23:30, Vishnu Viswanath wrote:

On Tue, May 8, 2018 at 5:29 PM, vishnuviswanath <[hidden email]> wrote:
Hi,

Wondering if any of these ideas were implemented after the discussion?

Thanks,
Vishnu



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/