Fink: KafkaProducer Data Loss

classic Classic list List threaded Threaded
32 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Fink: KafkaProducer Data Loss

ninad
This post was updated on .
Our Flink streaming workflow publishes messages to Kafka. KafkaProducer's 'retry' mechanism doesn't kick in until a message is added to it's internal buffer.

If there's an exception before that, KafkaProducer will throw that exception, and seems like Flink isn't handling that. In this case there will be a data loss.

Related Flink code (FlinkKafkaProducerBase):

if (logFailuresOnly) {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e != null) {
                        LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
                    }
                    acknowledgeMessage();
                }
            };
        }
        else {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null && asyncException == null) {
                        asyncException = exception;
                    }
                    acknowledgeMessage();
                }
            };
        }

Here are the scenario's we've identified that will cause data loss:

1) All kafka brokers are down.

In this case, before appending a message to it's buffer, KafkaProducer tries to fetch metadata. If the KafkaProducer isn't able to fetch the metadata in configured timeout, it throws an exception.

2) Memory records not writable (Existing bug in kafka 0.9.0.1 library)
https://issues.apache.org/jira/browse/KAFKA-3594

In both the above cases, KafkaProducer won't retry, and Flink will ignore the messages. the messages aren't even logged. The exception is, but not the messages which failed.

Possible workarounds (Kafka settings):

1) A very high value for metadata timeout (metadata.fetch.timeout.ms)
2) A very high value for buffer expiry (request.timeout.ms)

We're still investigating the possible side effects of changing the above kafka settings.

So, is our understanding correct? Or is there a way we can avoid this data loss by modifying some Flink settings?

Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

Till Rohrmann
Hi Ninad,

thanks for reporting the issue. For me it looks also as if exceptions might go under certain circumstances unnoticed. So for example you have a write operation which fails this will set the asyncException field which is not checked before the next invoke call happens. If now a checkpoint operation happens, it will pass and mark all messages up to this point as being successfully processed. Only after the checkpoint, the producer will fail. And this constitutes a data loss imho.

I've looped Robert and Gordon into the conversation which are more familiar with the Kafka producer. Maybe they can answer your and my questions.

Cheers,
Till

On Thu, Feb 2, 2017 at 9:58 PM, ninad <[hidden email]> wrote:
Our Flink streaming workflow publishes messages to Kafka. KafkaProducer's
'retry' mechanism doesn't kick in until a message is added to it's internal
buffer.

If there's an exception before that, KafkaProducer will throw that
exception, and seems like Flink isn't handling that. In this case there will
be a data loss.

Related Flink code (FlinkKafkaProducerBase):

if (logFailuresOnly) {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception
e) {
                    if (e != null) {
                        LOG.error("Error while sending record to Kafka: " +
e.getMessage(), e);
                    }
                    acknowledgeMessage();
                }
            };
        }
        else {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception
exception) {
                    if (exception != null && asyncException == null) {
                        asyncException = exception;
                    }
                    acknowledgeMessage();
                }
            };
        }

Here are the scenario's we've identified that will cause data loss:

All kafka brokers are down.

In this case, before appending a message to it's buffer, KafkaProducer tries
to fetch metadata. If the KafkaProducer isn't able to fetch the metadata in
configured timeout, it throws an exception.
-Memory records not writable (Existing bug in kafka 0.9.0.1 library)
https://issues.apache.org/jira/browse/KAFKA-3594

In both the above cases, KafkaProducer won't retry, and Flink will ignore
the messages. the messages aren't even logged. The exception is, but not the
messages which failed.

Possible workarounds (Kafka settings):

A very high value for metadata timeout (metadata.fetch.timeout.ms)
A very high value for buffer expiry (request.timeout.ms)
We're still investigating the possible side effects of changing the above
kafka settings.

So, is our understanding correct? Or is there a way we can avoid this data
loss by modifying some Flink settings?

Thanks.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

Tzu-Li (Gordon) Tai
Hi Ninad and Till,

Thank you for looking into the issue! This is actually a bug.

Till’s suggestion is correct:
The producer holds a `pendingRecords` value that is incremented on each invoke() and decremented on each callback, used to check if the producer needs to sync on pending callbacks on checkpoints.
On each checkpoint, we should only consider the checkpoint succeeded iff after flushing the `pendingRecords == 0` and `asyncException == null` (currently, we’re only checking `pendingRecords`).

A quick fix for this is to check and rethrow async exceptions in the `snapshotState` method both before and after flushing and `pendingRecords` becomes 0.
I’ve filed a JIRA for this: https://issues.apache.org/jira/browse/FLINK-5701.

Cheers,
Gordon

On February 3, 2017 at 6:05:23 AM, Till Rohrmann ([hidden email]) wrote:

Hi Ninad,

thanks for reporting the issue. For me it looks also as if exceptions might go under certain circumstances unnoticed. So for example you have a write operation which fails this will set the asyncException field which is not checked before the next invoke call happens. If now a checkpoint operation happens, it will pass and mark all messages up to this point as being successfully processed. Only after the checkpoint, the producer will fail. And this constitutes a data loss imho.

I've looped Robert and Gordon into the conversation which are more familiar with the Kafka producer. Maybe they can answer your and my questions.

Cheers,
Till

On Thu, Feb 2, 2017 at 9:58 PM, ninad <[hidden email]> wrote:
Our Flink streaming workflow publishes messages to Kafka. KafkaProducer's
'retry' mechanism doesn't kick in until a message is added to it's internal
buffer.

If there's an exception before that, KafkaProducer will throw that
exception, and seems like Flink isn't handling that. In this case there will
be a data loss.

Related Flink code (FlinkKafkaProducerBase):

if (logFailuresOnly) {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception
e) {
                    if (e != null) {
                        LOG.error("Error while sending record to Kafka: " +
e.getMessage(), e);
                    }
                    acknowledgeMessage();
                }
            };
        }
        else {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception
exception) {
                    if (exception != null && asyncException == null) {
                        asyncException = exception;
                    }
                    acknowledgeMessage();
                }
            };
        }

Here are the scenario's we've identified that will cause data loss:

All kafka brokers are down.

In this case, before appending a message to it's buffer, KafkaProducer tries
to fetch metadata. If the KafkaProducer isn't able to fetch the metadata in
configured timeout, it throws an exception.
-Memory records not writable (Existing bug in kafka 0.9.0.1 library)
https://issues.apache.org/jira/browse/KAFKA-3594

In both the above cases, KafkaProducer won't retry, and Flink will ignore
the messages. the messages aren't even logged. The exception is, but not the
messages which failed.

Possible workarounds (Kafka settings):

A very high value for metadata timeout (metadata.fetch.timeout.ms)
A very high value for buffer expiry (request.timeout.ms)
We're still investigating the possible side effects of changing the above
kafka settings.

So, is our understanding correct? Or is there a way we can avoid this data
loss by modifying some Flink settings?

Thanks.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

ninad
Thanks, Gordon and Till.
Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

ninad
This post was updated on .
In reply to this post by Tzu-Li (Gordon) Tai
Thanks for the fix guys. I am trying to test this with 1.1.5, but still seeing a data loss.
Here's our use case:

1) Consume from Kafka
2) Apply session window
3) Send messages of window to Kafka

If there's a failure in step 3, because all kafka brokers are down, we see a data loss.

Here are relevant logs:

java.lang.Exception: Could not perform checkpoint 2 for operator TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) -> Sink: sink.http.sep (2/4).
        at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:611)
        at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:360)
        at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:272)
        at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:174)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:195)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not perform the checkpoint 2 for 0th operator in chain.
        at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:666)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:603)
        ... 8 more
Caused by: java.lang.Exception: Failed to snapshot function state of TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) -> Sink: sink.http.sep (2/4).
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:139)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:652)
        ... 9 more
Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:366)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:335)


Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

Kostas Kloudas
Hi Ninad,

I think that Gordon could shed some more light on this but I suggest
you should update your Flink version to at least the 1.2.

The reason is that we are already in the process of releasing Flink 1.3
(which will come probably today) and a lot of things have
changed/fixed/improved since the 1.1 release. In fact, it would help us
a lot if you could check if your problem still exists in the upcoming 1.3 release.

In addition, I suppose that the 1.1 release will soon be not supported
anymore.

Cheers,
Kostas

> On Jun 1, 2017, at 12:15 AM, ninad <[hidden email]> wrote:
>
> Thanks for the fix guys. I am trying to test this with 1.1.5, but still
> seeing a data loss. I am not able to get much from logs except this:
>
> Here's our use case:
>
> 1) Consume from Kafka
> 2) Apply session window
> 3) Send messages of window to Kafka
>
> If there's a failure in step 3, because all kafka brokers are down, we see a
> data loss.
>
> Here are relevant logs:
>
> java.lang.Exception: Could not perform checkpoint 2 for operator
> TriggerWindow(ProcessingTimeSessionWindows(30000),
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->
> Sink: sink.http.sep (2/4).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:611)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:360)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:272)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:174)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:195)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not perform the checkpoint 2 for 0th
> operator in chain.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:666)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:603)
> ... 8 more
> Caused by: java.lang.Exception: Failed to snapshot function state of
> TriggerWindow(ProcessingTimeSessionWindows(30000),
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->
> Sink: sink.http.sep (2/4).
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:139)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:652)
> ... 9 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:366)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:335)
>
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13412.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

Tzu-Li (Gordon) Tai
Hi Ninad,

This exception you’re seeing does not cause data loss. As a matter of fact, its preventing data loss based on how Flink’s checkpoints / fault-tolerance works.

So, a recap of what the problem was when this “uncaught exception leak” issue was first reported:
Prior to the fix, on checkpoints the Flink Kafka producer did not check for any async produce errors, therefore voiding the at-least-once guarantee of the sink.
In other words, the checkpoint was incorrectly succeeding without respecting that some previous data wasn’t sent to Kafka.

The fix included in 1.1.5 / 1.2.1 basically corrects this by rethrowing any async errors that occurred before the checkpoint happened, and fails the checkpoint snapshot (as what you are observing from this exception).

When a failure occurs in the job, Flink uses the last completed checkpoint to restart the job. In the case of the Flink Kafka producer, this essentially makes sure that records which did not make it into Kafka and caused the last run to fail are reprocessed and sent to Kafka again.

Hope this helps!

Cheers,
Gordon

On 1 June 2017 at 12:15:47 PM, Kostas Kloudas ([hidden email]) wrote:

Hi Ninad,

I think that Gordon could shed some more light on this but I suggest
you should update your Flink version to at least the 1.2.

The reason is that we are already in the process of releasing Flink 1.3
(which will come probably today) and a lot of things have
changed/fixed/improved since the 1.1 release. In fact, it would help us
a lot if you could check if your problem still exists in the upcoming 1.3 release.

In addition, I suppose that the 1.1 release will soon be not supported
anymore.

Cheers,
Kostas

> On Jun 1, 2017, at 12:15 AM, ninad <[hidden email]> wrote:
>
> Thanks for the fix guys. I am trying to test this with 1.1.5, but still
> seeing a data loss. I am not able to get much from logs except this:
>
> Here's our use case:
>
> 1) Consume from Kafka
> 2) Apply session window
> 3) Send messages of window to Kafka
>
> If there's a failure in step 3, because all kafka brokers are down, we see a
> data loss.
>
> Here are relevant logs:
>
> java.lang.Exception: Could not perform checkpoint 2 for operator
> TriggerWindow(ProcessingTimeSessionWindows(30000),
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->
> Sink: sink.http.sep (2/4).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:611)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:360)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:272)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:174)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:195)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not perform the checkpoint 2 for 0th
> operator in chain.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:666)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:603)
> ... 8 more
> Caused by: java.lang.Exception: Failed to snapshot function state of
> TriggerWindow(ProcessingTimeSessionWindows(30000),
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->
> Sink: sink.http.sep (2/4).
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:139)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:652)
> ... 9 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:366)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:335)
>
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13412.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

ninad
This post was updated on .
Thanks Gordon and Kostas.

Gordon,

"When a failure occurs in the job, Flink uses the last completed checkpoint to restart the job. In the case of the Flink Kafka producer, this essentially makes sure that records which did not make it into Kafka and caused the last run to fail are reprocessed and sent to Kafka again."

This is exactly what we were expecting. Thanks for confirming. However, we still do not see messages in Kafka.
All the Kafka properties are as expected:

Replication: 3
Min ISR: 2
acks: all

We also tried this with Flink 1.2.1. Now, we haven't tested this with the standalone configuration. We will test it to see if the result is different.

That being said, we're running this on cloudera YARN/hadoop cluster. But we haven't built FLINK against cloudera binaries. The logs certainly don't indicate that being the problem.

Please let us know how we can troubleshoot this.

I have attached the JobManager and TaskManager log files for reference.

Relevant logs from the logs files:

Job Manager

2017-06-01 20:22:44,499 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDesc
riptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521
)) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9) switched from RUNNING to FAILED.

2017-06-01 20:22:44,530 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Restarting the job event-filter (510a7a83f509adace6704e7f2caa0b75).
2017-06-01 20:22:44,534 INFO  org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestarter  - Delaying retry of job execution for 10000 ms ...
2017-06-01 20:22:48,233 DEBUG org.apache.flink.runtime.metrics.dump.MetricDumpSerialization  - Failed to serialize gauge.

2017-06-01 20:22:54,535 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph        - Resetting execution vertex Source: Custom Source (1/1) for new execution.
2017-06-01 20:22:54,535 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph        - Resetting execution vertex TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)) -> Sink: sink.http.sep (1/1) for new execution.
2017-06-01 20:22:54,535 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state RESTARTING to CREATED.
2017-06-01 20:22:54,536 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
2017-06-01 20:22:54,543 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.
2017-06-01 20:22:54,543 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to retrieve checkpoint 7.
2017-06-01 20:22:54,585 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring from latest valid checkpoint: Checkpoint 7 @ 1496348508893 for 510a7a83f509adace6704e7f2caa0b75.
2017-06-01 20:22:54,591 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state CREATED to RUNNING.


Task Manager 1

2017-06-01 20:22:44,400 WARN  org.apache.kafka.clients.producer.internals.Sender            - Got error produce response with correlation id 4 on topic-partit
ion topic.http.stream.event.processor-0, retrying (99999 attempts left). Error: NOT_ENOUGH_REPLICAS

2017-06-01 20:22:44,426 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally TriggerWindow(ProcessingTimeS
essionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293}, ProcessingTimeTrigger(), WindowedS
tream.apply(WindowedStream.java:521)) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9).

2017-06-01 20:22:44,427 INFO  org.apache.flink.runtime.taskmanager.Task                     - TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDesc
riptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521
)) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9) switched from RUNNING to FAILED.
TimerException{java.lang.RuntimeException: Could not forward element to next operator}

Task Manager 2
2017-06-01 20:22:54,741 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition  - Source: Custom Source (1/1) (8ee2c8a628968bc3f8006f0740bb8ad1):
 Initialized ResultPartition 8d68b9c00d6a329d70ee2bf1ed320318@8ee2c8a628968bc3f8006f0740bb8ad1 [PIPELINED, 1 subpartitions, 1 pending references]
2017-06-01 20:22:54,760 INFO  org.apache.flink.yarn.YarnTaskManager                         - Received task Source: Custom Source (1/1)

2017-06-01 20:27:30,388 WARN  org.apache.kafka.clients.NetworkClient                        - Error while fetching metadata with correlation id 1 : {topic.event.filter=LEADER_NOT_AVAILABLE}

jobManager.log tmOne.log tmTwo.log
Tzu-Li (Gordon) Tai wrote
Hi Ninad,

This exception you’re seeing does not cause data loss. As a matter of fact, its preventing data loss based on how Flink’s checkpoints / fault-tolerance works.

So, a recap of what the problem was when this “uncaught exception leak” issue was first reported:
Prior to the fix, on checkpoints the Flink Kafka producer did not check for any async produce errors, therefore voiding the at-least-once guarantee of the sink.
In other words, the checkpoint was incorrectly succeeding without respecting that some previous data wasn’t sent to Kafka.

The fix included in 1.1.5 / 1.2.1 basically corrects this by rethrowing any async errors that occurred before the checkpoint happened, and fails the checkpoint snapshot (as what you are observing from this exception).

When a failure occurs in the job, Flink uses the last completed checkpoint to restart the job. In the case of the Flink Kafka producer, this essentially makes sure that records which did not make it into Kafka and caused the last run to fail are reprocessed and sent to Kafka again.

Hope this helps!

Cheers,
Gordon

On 1 June 2017 at 12:15:47 PM, Kostas Kloudas ([hidden email]) wrote:

Hi Ninad,  

I think that Gordon could shed some more light on this but I suggest  
you should update your Flink version to at least the 1.2.  

The reason is that we are already in the process of releasing Flink 1.3  
(which will come probably today) and a lot of things have  
changed/fixed/improved since the 1.1 release. In fact, it would help us  
a lot if you could check if your problem still exists in the upcoming 1.3 release.  

In addition, I suppose that the 1.1 release will soon be not supported  
anymore.  

Cheers,  
Kostas  

> On Jun 1, 2017, at 12:15 AM, ninad <[hidden email]> wrote:  
>  
> Thanks for the fix guys. I am trying to test this with 1.1.5, but still  
> seeing a data loss. I am not able to get much from logs except this:  
>  
> Here's our use case:  
>  
> 1) Consume from Kafka  
> 2) Apply session window  
> 3) Send messages of window to Kafka  
>  
> If there's a failure in step 3, because all kafka brokers are down, we see a  
> data loss.  
>  
> Here are relevant logs:  
>  
> java.lang.Exception: Could not perform checkpoint 2 for operator  
> TriggerWindow(ProcessingTimeSessionWindows(30000),  
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},  
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->  
> Sink: sink.http.sep (2/4).  
> at  
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:611)  
> at  
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:360)  
> at  
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:272)  
> at  
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:174)  
> at  
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:195)  
> at  
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)  
> at  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)  
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)  
> at java.lang.Thread.run(Thread.java:745)  
> Caused by: java.lang.Exception: Could not perform the checkpoint 2 for 0th  
> operator in chain.  
> at  
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:666)  
> at  
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:603)  
> ... 8 more  
> Caused by: java.lang.Exception: Failed to snapshot function state of  
> TriggerWindow(ProcessingTimeSessionWindows(30000),  
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},  
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->  
> Sink: sink.http.sep (2/4).  
> at  
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:139)  
> at  
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:652)  
> ... 9 more  
> Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired  
> at  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:366)  
> at  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:335)  
>  
>  
>  
>  
>  
>  
> --  
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13412.html 
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

Tzu-Li (Gordon) Tai
Hi Ninad,

Unfortunately I don’t think the provided logs shed any light here.

It does complain about:

2017-06-01 20:22:44,400 WARN 
org.apache.kafka.clients.producer.internals.Sender - Got error 
produce response with correlation id 4 on topic-partit 
ion topic.http.stream.event.processor-0, retrying (99999 attempts left). 
Error: NOT_ENOUGH_REPLICAS 

, not sure if this may be related to not being build with the Cloudera binaries.

Could you provide info on how exactly you’re verifying the lost messages?

Cheers,
Gordon

On 1 June 2017 at 11:14:17 PM, ninad ([hidden email]) wrote:

Thanks Gordon and Kostas.

Gordon,

"When a failure occurs in the job, Flink uses the last completed checkpoint
to restart the job. In the case of the Flink Kafka producer, this
essentially makes sure that records which did not make it into Kafka and
caused the last run to fail are reprocessed and sent to Kafka again."

This is exactly what we were expecting. Thanks for confirming. However, we
still do not see messages in Kafka.
All the Kafka properties are as expected:

Replication: 3
Min ISR: 2
acks: all

We also tried this with Flink 1.2.1. Now, we haven't tested this with the
standalone configuration. We will test it to see if the result is different.

That being said, we're running this on cloudera YARN/hadoop cluster. But we
haven't built FLINK against cloudera binaries. The logs certainly don't
indicate that being the problem.

Please let us know how we can troubleshoot this.

I have attached the JobManager and TaskManager log files for reference.

Relevant logs from the logs files:

*Job Manager*

2017-06-01 20:22:44,499 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph -
TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDesc
riptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521
)) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9) switched
from RUNNING to FAILED.

2017-06-01 20:22:44,530 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Restarting
the job event-filter (510a7a83f509adace6704e7f2caa0b75).
2017-06-01 20:22:44,534 INFO
org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestarter -
Delaying retry of job execution for 10000 ms ...
2017-06-01 20:22:48,233 DEBUG
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization - Failed to
serialize gauge.

2017-06-01 20:22:54,535 DEBUG
org.apache.flink.runtime.executiongraph.ExecutionGraph - Resetting
execution vertex Source: Custom Source (1/1) for new execution.
2017-06-01 20:22:54,535 DEBUG
org.apache.flink.runtime.executiongraph.ExecutionGraph - Resetting
execution vertex TriggerWindow(ProcessingTimeSessionWindows(30000),
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)) ->
Sink: sink.http.sep (1/1) for new execution.
2017-06-01 20:22:54,535 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state
RESTARTING to CREATED.
2017-06-01 20:22:54,536 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore -
Recovering checkpoints from ZooKeeper.
2017-06-01 20:22:54,543 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore -
Found 1 checkpoints in ZooKeeper.
2017-06-01 20:22:54,543 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore -
Trying to retrieve checkpoint 7.
2017-06-01 20:22:54,585 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring
from latest valid checkpoint: Checkpoint 7 @ 1496348508893 for
510a7a83f509adace6704e7f2caa0b75.
2017-06-01 20:22:54,591 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state CREATED
to RUNNING.


*Task Manager 1*

2017-06-01 20:22:44,400 WARN
org.apache.kafka.clients.producer.internals.Sender - Got error
produce response with correlation id 4 on topic-partit
ion topic.http.stream.event.processor-0, retrying (99999 attempts left).
Error: NOT_ENOUGH_REPLICAS

2017-06-01 20:22:44,426 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to fail task externally TriggerWindow(ProcessingTimeS
essionWindows(30000),
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293},
ProcessingTimeTrigger(), WindowedS
tream.apply(WindowedStream.java:521)) -> Sink: sink.http.sep (1/1)
(b4a5c72b52779ab9b2b093b85b8b20c9).

2017-06-01 20:22:44,427 INFO org.apache.flink.runtime.taskmanager.Task
- TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDesc
riptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521
)) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9) switched
from RUNNING to FAILED.
TimerException{java.lang.RuntimeException: Could not forward element to next
operator}

*Task Manager 2* jobManager.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13443/jobManager.log>
taskManager.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13443/taskManager.log>
taskManager.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13443/taskManager.log>
2017-06-01 20:22:54,741 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartition - Source:
Custom Source (1/1) (8ee2c8a628968bc3f8006f0740bb8ad1):
Initialized ResultPartition
8d68b9c00d6a329d70ee2bf1ed320318@8ee2c8a628968bc3f8006f0740bb8ad1
[PIPELINED, 1 subpartitions, 1 pending references]
2017-06-01 20:22:54,760 INFO org.apache.flink.yarn.YarnTaskManager
- Received task Source: Custom Source (1/1)

2017-06-01 20:27:30,388 WARN org.apache.kafka.clients.NetworkClient
- Error while fetching metadata with correlation id 1 :
{topic.event.filter=LEADER_NOT_AVAILABLE}




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13443.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

ninad
Thanks Gordon.

2017-06-01 20:22:44,400 WARN
org.apache.kafka.clients.producer.internals.Sender - Got error
produce response with correlation id 4 on topic-partit
ion topic.http.stream.event.processor-0, retrying (99999 attempts left).
Error: NOT_ENOUGH_REPLICAS

, not sure if this may be related to not being build with the Cloudera binaries.


This seems normal when kafka is down.

Could you provide info on how exactly you’re verifying the lost messages?

Our use case is pretty simple.

1) Source - Kafka (Flink task id: b93b267f087865c245babeb259c76593)
2) Group by key
3) Apply session window
4) Sink - Kafka

2, 3, 4 are assigned task id: b4a5c72b52779ab9b2b093b85b8b20c9

We bring down all kafka brokers once flink has received the messages from Kafka.
Flink tries to send the messages to Kafka sink, but isn't able to because Kafka is down. This task fails:

2017-06-01 20:22:44,426 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9).


Both tasks fail and this is communicated to job manager.

Job Manager fails the job:
2017-06-01 20:22:44,500 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state RUNNING to FAILING.


Job Manager restarts the job again:
2017-06-01 20:22:44,530 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Restarting the job event-filter (510a7a83f509adace6704e7f2caa0b75).


At this point we're expecting that the Flink task to send to Kafka should be recovered, because it wasn't successfully committed. I see some similar logs in job manager logs:

2017-06-01 20:22:54,536 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
2017-06-01 20:22:54,543 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.
2017-06-01 20:22:54,543 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to retrieve checkpoint 7.
2017-06-01 20:22:54,585 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring from latest valid checkpoint: Checkpoint 7 @ 1496348508893 for 510a7a83f509adace6704e7f2caa0b75.


Now, if I bring up all Kafka brokers, I am expecting that the messages which didn't make it to the Kafka sink should be sent.

But that's not happening.

All these logs are present in the files that I attached.

I am going to try this on the standalone cluster today.
Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

Till Rohrmann-2
Hi Ninad,

After recovery, the job should continue from where the last checkpoint was taken. Thus, it should output all remaining messages at least once to Kafka.

Could you share the complete JobManager and TaskManager logs with us? Maybe they contain some information which could be helpful to get to the bottom of the problem.

Cheers,
Till

On Fri, Jun 2, 2017 at 4:32 PM, ninad <[hidden email]> wrote:
Thanks Gordon.

*2017-06-01 20:22:44,400 WARN
org.apache.kafka.clients.producer.internals.Sender - Got error
produce response with correlation id 4 on topic-partit
ion topic.http.stream.event.processor-0, retrying (99999 attempts left).
Error: NOT_ENOUGH_REPLICAS

, not sure if this may be related to not being build with the Cloudera
binaries.*

This seems normal when kafka is down.

*Could you provide info on how exactly you’re verifying the lost messages?*

Our use case is pretty simple.

1) Source - Kafka (Flink task id: b93b267f087865c245babeb259c76593)
2) Group by key
3) Apply session window
4) Sink - Kafka

2, 3, 4 are assigned task id: b4a5c72b52779ab9b2b093b85b8b20c9

We bring down all kafka brokers once flink has received the messages from
Kafka.
Flink tries to send the messages to Kafka sink, but isn't able to because
Kafka is down. This task fails:

*2017-06-01 20:22:44,426 INFO  org.apache.flink.runtime.taskmanager.Task
- Attempting to fail task externally
TriggerWindow(ProcessingTimeSessionWindows(30000),
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)) ->
Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9).
*

Both tasks fail and this is communicated to job manager.

Job Manager fails the job:
*2017-06-01 20:22:44,500 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state RUNNING
to FAILING.
*

Job Manager restarts the job again:
*2017-06-01 20:22:44,530 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Restarting
the job event-filter (510a7a83f509adace6704e7f2caa0b75).
*

At this point we're expecting that the Flink task to send to Kafka should be
recovered, because it wasn't successfully committed. I see some similar logs
in job manager logs:

*2017-06-01 20:22:54,536 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Recovering checkpoints from ZooKeeper.
2017-06-01 20:22:54,543 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Found 1 checkpoints in ZooKeeper.
2017-06-01 20:22:54,543 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Trying to retrieve checkpoint 7.
2017-06-01 20:22:54,585 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring
from latest valid checkpoint: Checkpoint 7 @ 1496348508893 for
510a7a83f509adace6704e7f2caa0b75.*

Now, if I bring up all Kafka brokers, I am expecting that the messages which
didn't make it to the Kafka sink should be sent.

But that's not happening.

All these logs are present in the files that I attached.

I am going to try this on the standalone cluster today.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13458.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

ninad
Thanks Till. The log files I have attached are the complete logs. They are DEBUG level. There are three files:
jobManger.log, tmOne.log and tmTwo.log.
Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

ninad
This post was updated on .
In reply to this post by Till Rohrmann-2
I ran a few tests and was able to find the case where there won't be a data loss. And here's how the two tests are different.

The case where data loss is observed:

1) Kafka source receives data.  (Session window trigger hasn't been fired yet.)

2) Bring all Kafka brokers down.

3) Flink triggers a checkpoint. Checkpoints are successful. (Checkpoint 7)
        Kafka source task commits the correct offsets. Not sure what was check-pointed for the session window task.
       
        Relevant logs:
        2017-06-01 20:21:48,901 DEBUG org.apache.flink.yarn.YarnTaskManager                         - Receiver TriggerCheckpoint 7@1496348508893 for b93b267f087865c245babeb259c76593.
        2017-06-01 20:21:49,011 DEBUG org.apache.flink.yarn.YarnTaskManager                         - Receiver ConfirmCheckpoint 7@1496348508893 for b93b267f087865c245babeb259c76593.
        2017-06-01 20:21:49,011 DEBUG org.apache.flink.yarn.YarnTaskManager                         - Receiver ConfirmCheckpoint 7@1496348508893 for b4a5c72b52779ab9b2b093b85b8b20c9.

       
4) Session window trigger is fired and windows are merged and evaluated.

5) Flink attempts to send data to Kafka but fails because brokers are down.

6) Flink restarts the job and restores state from checkpoint 7. However, there's no state available in the checkpoint for the session windows.
        Otherwise we would have seen the following in the logs. Right?
                HeapKeyedStateBackend     - Initializing heap keyed state backend from snapshot
                HeapKeyedStateBackend     - Restoring snapshot from state handles
               
        Relevant logs:
        Job Manager:
        2017-06-01 20:22:54,585 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring from latest valid checkpoint: Checkpoint 7 @ 1496348508893 for 510a7a83f509adace6704e7f2caa0b75.
       
        Task Manager:
        2017-06-01 20:22:57,349 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Initializing heap keyed state backend with stream factory.

               

The case where data loss is not observed:
1) Kafka source receives data.

2) Bring all Kafka brokers down

3) Session window trigger is fired and windows are merged.

4) Flink triggers a checkpoint. Checkpoints are successful. (Checkpoint 12)
  Kafka source task commits the correct offsets. Not sure what was check-pointed for the session window task.
 
  Relevant logs:
  2017-06-02 18:53:52,953 DEBUG org.apache.flink.yarn.YarnTaskManager                         - Receiver TriggerCheckpoint 12@1496429632943 for 138cd5e5c8065174e8e326fbb66ac4cd.
        2017-06-02 18:53:53,151 DEBUG org.apache.flink.yarn.YarnTaskManager                         - Receiver ConfirmCheckpoint 12@1496429632943 for 138cd5e5c8065174e8e326fbb66ac4cd.
        2017-06-02 18:53:53,151 DEBUG org.apache.flink.yarn.YarnTaskManager                         - Receiver ConfirmCheckpoint 12@1496429632943 for dcfb670859a7fbf4ddb90c70ed5344dd.

 
5) Session windows are evaluated.

6) Flink attempts to send data to Kafka but fails because brokers are down.

7) Flink restarts the job and restores state from checkpoint 12. State is available for the session window in the checkpoint and the task is restored with the state.

        Relevant logs:
        JobManager logs:
        2017-06-02 18:54:19,826 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring from latest valid checkpoint: Checkpoint 12 @ 1496429632943 for ea51350d9d689b2b09ab8fd2fe0f6454.

       
       TaskManager logs:
        2017-06-02 18:54:23,582 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Initializing heap keyed state backend from snapshot.
        2017-06-02 18:54:23,583 DEBUG org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Restoring snapshot from state handles: [KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRan
        ge{startKeyGroup=0, endKeyGroup=127}, offsets=[3587, 3597, 3607, 3617, 3627, 3637, 3647, 3657, 3667, 3685, 3695, 3705, 3715, 3725, 3735, 3745, 3755, 3765, 3775, 3785, 3795, 3813, 3823, 3841, 3851, 3861, 3871, 3881, 3891
        , 3901, 3911, 3921, 3931, 3941, 3951, 3961, 3971, 3981, 3991, 4009, 4019, 4029, 15215, 15225, 15235, 15245, 15255, 15265, 15275, 15285, 15295, 16187, 16197, 16207, 16217, 16227, 16237, 16247, 16257, 17149, 17159, 17169,
         17179, 17189, 18081, 18091, 18101, 18111, 18121, 18131, 18141, 18151, 18161, 18171, 18181, 18191, 18201, 19093, 19985, 19995, 20005, 20015, 20025, 20043, 20061, 20071, 20081, 20099, 20109, 20119, 20129, 20139, 20149, 2
        0159, 20169, 20179, 20189, 20199, 20209, 20219, 20229, 20239, 20249, 20259, 20269, 20287, 20297, 20307, 20317, 20327, 20337, 20347, 33039, 33049, 33059, 33069, 34700, 34710, 34720, 34730, 34740, 34750, 34760, 34770, 34780, 34790, 34800, 34810]}, data=File State: hdfs://dc1udtlhcld005.stack.qadev.corp:8022/user/harmony_user/flink-checkpoint/ea51350d9d689b2b09ab8fd2fe0f6454/chk-12/8373feed-7494-4b43-98ae-e1c4214b7890 [34820 bytes]}].

       

Assuming that I am correctly interpreting the logs, I can think of three possible conclusions about why we observed data loss in the first case:

1) I am missing some Flink setting.

2) Flink thought it check-pointed the windows data successfully, but didn't. (We're using cloudera hadoop, but haven't built Flink with cloudera hadoop binaries. )

3) Flink is not set up to check-point the data in session windows before they are merged?

I have attached the log files for the successful run with this post.

Please let us know what you guys think. Thanks for your patience.

jobManagerNoDataLoss.log tmOneNoDataLoss.log tmTwoNoDataLoss.log



Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

ninad
I tested this with the standalone cluster, and I don't see this problem. So, the problem could be that we haven't built Flink against cloudera Hadoop? I will test it out.
Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

Tzu-Li (Gordon) Tai
Thanks for the updates and testing efforts on this!

I’m sorry that I currently haven’t found the change to look closely into the testing scenarios you’ve listed, yet.
But please keep us updated on this thread after testing it out also with the Cloudera build.

One other suggestion for your test to make sure that some failed record is actually retried: you can add a dummy verifying operator right before the Kafka sink.
At least that way you should be able to eliminate the possibility that the Kafka sink is incorrectly ignoring failed records when checkpointing. From another look at the Kafka sink code, I’m pretty sure this shouldn’t be the case.

Many thanks,
Gordon

On 4 June 2017 at 2:14:40 PM, ninad ([hidden email]) wrote:

I tested this with the standalone cluster, and I don't see this problem. So,
the problem could be that we haven't built Flink against cloudera Hadoop? I
will test it out.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13477.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

ninad
Yeah, this seems like a problem with flink check-pointing. The fact that flink thinks that a checkpoint was successful, but in fact it wasn't.

On Jun 4, 2017 7:37 AM, "Tzu-Li (Gordon) Tai [via Apache Flink User Mailing List archive.]" <[hidden email]> wrote:
Thanks for the updates and testing efforts on this!

I’m sorry that I currently haven’t found the change to look closely into the testing scenarios you’ve listed, yet.
But please keep us updated on this thread after testing it out also with the Cloudera build.

One other suggestion for your test to make sure that some failed record is actually retried: you can add a dummy verifying operator right before the Kafka sink.
At least that way you should be able to eliminate the possibility that the Kafka sink is incorrectly ignoring failed records when checkpointing. From another look at the Kafka sink code, I’m pretty sure this shouldn’t be the case.

Many thanks,
Gordon

On 4 June 2017 at 2:14:40 PM, ninad ([hidden email]) wrote:

I tested this with the standalone cluster, and I don't see this problem. So,
the problem could be that we haven't built Flink against cloudera Hadoop? I
will test it out.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13477.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13479.html
To unsubscribe from Fink: KafkaProducer Data Loss, click here.
NAML
Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

Till Rohrmann
Hi Ninad,

the logs for the data loss case weren't attached to the mails. Maybe you could attach them again in the same way as you did for the no data loss logs.

Cheers,
Till

On Sun, Jun 4, 2017 at 2:55 PM, ninad <[hidden email]> wrote:
Yeah, this seems like a problem with flink check-pointing. The fact that flink thinks that a checkpoint was successful, but in fact it wasn't.

On Jun 4, 2017 7:37 AM, "Tzu-Li (Gordon) Tai [via Apache Flink User Mailing List archive.]" <[hidden email]> wrote:
Thanks for the updates and testing efforts on this!

I’m sorry that I currently haven’t found the change to look closely into the testing scenarios you’ve listed, yet.
But please keep us updated on this thread after testing it out also with the Cloudera build.

One other suggestion for your test to make sure that some failed record is actually retried: you can add a dummy verifying operator right before the Kafka sink.
At least that way you should be able to eliminate the possibility that the Kafka sink is incorrectly ignoring failed records when checkpointing. From another look at the Kafka sink code, I’m pretty sure this shouldn’t be the case.

Many thanks,
Gordon

On 4 June 2017 at 2:14:40 PM, ninad ([hidden email]) wrote:

I tested this with the standalone cluster, and I don't see this problem. So,
the problem could be that we haven't built Flink against cloudera Hadoop? I
will test it out.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13477.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13479.html
To unsubscribe from Fink: KafkaProducer Data Loss, click here.
NAML


View this message in context: Re: Fink: KafkaProducer Data Loss

Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

ninad
Hi Till,
Attaching the logs to this post again.

Thanks.

jobManager.log tmOne.log tmTwo.log 
Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

Tzu-Li (Gordon) Tai
Hi,

From the logs and the description of your test scenarios where data loss is observed and not observed, it seems like the differentiating factor here is whether or not the session windows trigger was first fired when the checkpoint occurred.

It doesn’t however explain the case where your tests pass on a standalone cluster. Have you also re-tested your scenarios on Cloudera, with Flink built against the Cloudera binaries?


On 6 June 2017 at 4:53:30 PM, ninad ([hidden email]) wrote:

Hi Till,
Attaching the logs to this post again.

Thanks.

jobManager.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13527/jobManager.log>
tmOne.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13527/tmOne.log>
tmTwo.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13527/tmTwo.log>



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13527.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Fink: KafkaProducer Data Loss

ninad
Not yet. Planning to do that today.
12