This post was updated on .
Our Flink streaming workflow publishes messages to Kafka. KafkaProducer's 'retry' mechanism doesn't kick in until a message is added to it's internal buffer.
If there's an exception before that, KafkaProducer will throw that exception, and seems like Flink isn't handling that. In this case there will be a data loss. Related Flink code (FlinkKafkaProducerBase): if (logFailuresOnly) { callback = new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { LOG.error("Error while sending record to Kafka: " + e.getMessage(), e); } acknowledgeMessage(); } }; } else { callback = new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null && asyncException == null) { asyncException = exception; } acknowledgeMessage(); } }; } Here are the scenario's we've identified that will cause data loss: 1) All kafka brokers are down. In this case, before appending a message to it's buffer, KafkaProducer tries to fetch metadata. If the KafkaProducer isn't able to fetch the metadata in configured timeout, it throws an exception. 2) Memory records not writable (Existing bug in kafka 0.9.0.1 library) https://issues.apache.org/jira/browse/KAFKA-3594 In both the above cases, KafkaProducer won't retry, and Flink will ignore the messages. the messages aren't even logged. The exception is, but not the messages which failed. Possible workarounds (Kafka settings): 1) A very high value for metadata timeout (metadata.fetch.timeout.ms) 2) A very high value for buffer expiry (request.timeout.ms) We're still investigating the possible side effects of changing the above kafka settings. So, is our understanding correct? Or is there a way we can avoid this data loss by modifying some Flink settings? Thanks. |
Hi Ninad, thanks for reporting the issue. For me it looks also as if exceptions might go under certain circumstances unnoticed. So for example you have a write operation which fails this will set the asyncException field which is not checked before the next invoke call happens. If now a checkpoint operation happens, it will pass and mark all messages up to this point as being successfully processed. Only after the checkpoint, the producer will fail. And this constitutes a data loss imho. I've looped Robert and Gordon into the conversation which are more familiar with the Kafka producer. Maybe they can answer your and my questions. Cheers, Till On Thu, Feb 2, 2017 at 9:58 PM, ninad <[hidden email]> wrote: Our Flink streaming workflow publishes messages to Kafka. KafkaProducer's |
Hi Ninad and Till, Thank you for looking into the issue! This is actually a bug. Till’s suggestion is correct: The producer holds a `pendingRecords` value that is incremented on each invoke() and decremented on each callback, used to check if the producer needs to sync on pending callbacks on checkpoints. On each checkpoint, we should only consider the checkpoint succeeded iff after flushing the `pendingRecords == 0` and `asyncException == null` (currently, we’re only checking `pendingRecords`). A quick fix for this is to check and rethrow async exceptions in the `snapshotState` method both before and after flushing and `pendingRecords` becomes 0. I’ve filed a JIRA for this: https://issues.apache.org/jira/browse/FLINK-5701. Cheers, Gordon On February 3, 2017 at 6:05:23 AM, Till Rohrmann ([hidden email]) wrote:
|
Thanks, Gordon and Till.
|
This post was updated on .
In reply to this post by Tzu-Li (Gordon) Tai
Thanks for the fix guys. I am trying to test this with 1.1.5, but still seeing a data loss.
Here's our use case: 1) Consume from Kafka 2) Apply session window 3) Send messages of window to Kafka If there's a failure in step 3, because all kafka brokers are down, we see a data loss. Here are relevant logs: java.lang.Exception: Could not perform checkpoint 2 for operator TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) -> Sink: sink.http.sep (2/4). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:611) at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:360) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:272) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:174) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:195) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Could not perform the checkpoint 2 for 0th operator in chain. at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:666) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:603) ... 8 more Caused by: java.lang.Exception: Failed to snapshot function state of TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) -> Sink: sink.http.sep (2/4). at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:139) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:652) ... 9 more Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:366) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:335) |
Hi Ninad,
I think that Gordon could shed some more light on this but I suggest you should update your Flink version to at least the 1.2. The reason is that we are already in the process of releasing Flink 1.3 (which will come probably today) and a lot of things have changed/fixed/improved since the 1.1 release. In fact, it would help us a lot if you could check if your problem still exists in the upcoming 1.3 release. In addition, I suppose that the 1.1 release will soon be not supported anymore. Cheers, Kostas > On Jun 1, 2017, at 12:15 AM, ninad <[hidden email]> wrote: > > Thanks for the fix guys. I am trying to test this with 1.1.5, but still > seeing a data loss. I am not able to get much from logs except this: > > Here's our use case: > > 1) Consume from Kafka > 2) Apply session window > 3) Send messages of window to Kafka > > If there's a failure in step 3, because all kafka brokers are down, we see a > data loss. > > Here are relevant logs: > > java.lang.Exception: Could not perform checkpoint 2 for operator > TriggerWindow(ProcessingTimeSessionWindows(30000), > ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67}, > ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) -> > Sink: sink.http.sep (2/4). > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:611) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:360) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:272) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:174) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:195) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not perform the checkpoint 2 for 0th > operator in chain. > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:666) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:603) > ... 8 more > Caused by: java.lang.Exception: Failed to snapshot function state of > TriggerWindow(ProcessingTimeSessionWindows(30000), > ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67}, > ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) -> > Sink: sink.http.sep (2/4). > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:139) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:652) > ... 9 more > Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:366) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:335) > > > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13412.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Hi Ninad, This exception you’re seeing does not cause data loss. As a matter of fact, its preventing data loss based on how Flink’s checkpoints / fault-tolerance works. So, a recap of what the problem was when this “uncaught exception leak” issue was first reported: Prior to the fix, on checkpoints the Flink Kafka producer did not check for any async produce errors, therefore voiding the at-least-once guarantee of the sink. In other words, the checkpoint was incorrectly succeeding without respecting that some previous data wasn’t sent to Kafka. The fix included in 1.1.5 / 1.2.1 basically corrects this by rethrowing any async errors that occurred before the checkpoint happened, and fails the checkpoint snapshot (as what you are observing from this exception). When a failure occurs in the job, Flink uses the last completed checkpoint to restart the job. In the case of the Flink Kafka producer, this essentially makes sure that records which did not make it into Kafka and caused the last run to fail are reprocessed and sent to Kafka again. Hope this helps! Gordon
On 1 June 2017 at 12:15:47 PM, Kostas Kloudas ([hidden email]) wrote:
|
This post was updated on .
Thanks Gordon and Kostas.
Gordon, "When a failure occurs in the job, Flink uses the last completed checkpoint to restart the job. In the case of the Flink Kafka producer, this essentially makes sure that records which did not make it into Kafka and caused the last run to fail are reprocessed and sent to Kafka again." This is exactly what we were expecting. Thanks for confirming. However, we still do not see messages in Kafka. All the Kafka properties are as expected: Replication: 3 Min ISR: 2 acks: all We also tried this with Flink 1.2.1. Now, we haven't tested this with the standalone configuration. We will test it to see if the result is different. That being said, we're running this on cloudera YARN/hadoop cluster. But we haven't built FLINK against cloudera binaries. The logs certainly don't indicate that being the problem. Please let us know how we can troubleshoot this. I have attached the JobManager and TaskManager log files for reference. Relevant logs from the logs files: Job Manager 2017-06-01 20:22:44,499 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDesc riptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521 )) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9) switched from RUNNING to FAILED. 2017-06-01 20:22:44,530 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Restarting the job event-filter (510a7a83f509adace6704e7f2caa0b75). 2017-06-01 20:22:44,534 INFO org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestarter - Delaying retry of job execution for 10000 ms ... 2017-06-01 20:22:48,233 DEBUG org.apache.flink.runtime.metrics.dump.MetricDumpSerialization - Failed to serialize gauge. 2017-06-01 20:22:54,535 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Resetting execution vertex Source: Custom Source (1/1) for new execution. 2017-06-01 20:22:54,535 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Resetting execution vertex TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)) -> Sink: sink.http.sep (1/1) for new execution. 2017-06-01 20:22:54,535 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state RESTARTING to CREATED. 2017-06-01 20:22:54,536 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Recovering checkpoints from ZooKeeper. 2017-06-01 20:22:54,543 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 1 checkpoints in ZooKeeper. 2017-06-01 20:22:54,543 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7. 2017-06-01 20:22:54,585 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring from latest valid checkpoint: Checkpoint 7 @ 1496348508893 for 510a7a83f509adace6704e7f2caa0b75. 2017-06-01 20:22:54,591 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state CREATED to RUNNING. Task Manager 1 2017-06-01 20:22:44,400 WARN org.apache.kafka.clients.producer.internals.Sender - Got error produce response with correlation id 4 on topic-partit ion topic.http.stream.event.processor-0, retrying (99999 attempts left). Error: NOT_ENOUGH_REPLICAS 2017-06-01 20:22:44,426 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally TriggerWindow(ProcessingTimeS essionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293}, ProcessingTimeTrigger(), WindowedS tream.apply(WindowedStream.java:521)) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9). 2017-06-01 20:22:44,427 INFO org.apache.flink.runtime.taskmanager.Task - TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDesc riptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521 )) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9) switched from RUNNING to FAILED. TimerException{java.lang.RuntimeException: Could not forward element to next operator} Task Manager 2 2017-06-01 20:22:54,741 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - Source: Custom Source (1/1) (8ee2c8a628968bc3f8006f0740bb8ad1): Initialized ResultPartition 8d68b9c00d6a329d70ee2bf1ed320318@8ee2c8a628968bc3f8006f0740bb8ad1 [PIPELINED, 1 subpartitions, 1 pending references] 2017-06-01 20:22:54,760 INFO org.apache.flink.yarn.YarnTaskManager - Received task Source: Custom Source (1/1) 2017-06-01 20:27:30,388 WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 1 : {topic.event.filter=LEADER_NOT_AVAILABLE} jobManager.log tmOne.log tmTwo.log
|
Hi Ninad, Unfortunately I don’t think the provided logs shed any light here. It does complain about: 2017-06-01 20:22:44,400 WARN , not sure if this may be related to not being build with the Cloudera binaries. Could you provide info on how exactly you’re verifying the lost messages? Gordon On 1 June 2017 at 11:14:17 PM, ninad ([hidden email]) wrote:
|
Thanks Gordon.
2017-06-01 20:22:44,400 WARN org.apache.kafka.clients.producer.internals.Sender - Got error produce response with correlation id 4 on topic-partit ion topic.http.stream.event.processor-0, retrying (99999 attempts left). Error: NOT_ENOUGH_REPLICAS , not sure if this may be related to not being build with the Cloudera binaries. This seems normal when kafka is down. Could you provide info on how exactly you’re verifying the lost messages? Our use case is pretty simple. 1) Source - Kafka (Flink task id: b93b267f087865c245babeb259c76593) 2) Group by key 3) Apply session window 4) Sink - Kafka 2, 3, 4 are assigned task id: b4a5c72b52779ab9b2b093b85b8b20c9 We bring down all kafka brokers once flink has received the messages from Kafka. Flink tries to send the messages to Kafka sink, but isn't able to because Kafka is down. This task fails: 2017-06-01 20:22:44,426 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9). Both tasks fail and this is communicated to job manager. Job Manager fails the job: 2017-06-01 20:22:44,500 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state RUNNING to FAILING. Job Manager restarts the job again: 2017-06-01 20:22:44,530 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Restarting the job event-filter (510a7a83f509adace6704e7f2caa0b75). At this point we're expecting that the Flink task to send to Kafka should be recovered, because it wasn't successfully committed. I see some similar logs in job manager logs: 2017-06-01 20:22:54,536 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Recovering checkpoints from ZooKeeper. 2017-06-01 20:22:54,543 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 1 checkpoints in ZooKeeper. 2017-06-01 20:22:54,543 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7. 2017-06-01 20:22:54,585 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring from latest valid checkpoint: Checkpoint 7 @ 1496348508893 for 510a7a83f509adace6704e7f2caa0b75. Now, if I bring up all Kafka brokers, I am expecting that the messages which didn't make it to the Kafka sink should be sent. But that's not happening. All these logs are present in the files that I attached. I am going to try this on the standalone cluster today. |
Hi Ninad, After recovery, the job should continue from where the last checkpoint was taken. Thus, it should output all remaining messages at least once to Kafka. Could you share the complete JobManager and TaskManager logs with us? Maybe they contain some information which could be helpful to get to the bottom of the problem. Cheers, Till On Fri, Jun 2, 2017 at 4:32 PM, ninad <[hidden email]> wrote: Thanks Gordon. |
Thanks Till. The log files I have attached are the complete logs. They are DEBUG level. There are three files:
jobManger.log, tmOne.log and tmTwo.log. |
This post was updated on .
In reply to this post by Till Rohrmann-2
I ran a few tests and was able to find the case where there won't be a data loss. And here's how the two tests are different.
The case where data loss is observed: 1) Kafka source receives data. (Session window trigger hasn't been fired yet.) 2) Bring all Kafka brokers down. 3) Flink triggers a checkpoint. Checkpoints are successful. (Checkpoint 7) Kafka source task commits the correct offsets. Not sure what was check-pointed for the session window task. Relevant logs: 2017-06-01 20:21:48,901 DEBUG org.apache.flink.yarn.YarnTaskManager - Receiver TriggerCheckpoint 7@1496348508893 for b93b267f087865c245babeb259c76593. 2017-06-01 20:21:49,011 DEBUG org.apache.flink.yarn.YarnTaskManager - Receiver ConfirmCheckpoint 7@1496348508893 for b93b267f087865c245babeb259c76593. 2017-06-01 20:21:49,011 DEBUG org.apache.flink.yarn.YarnTaskManager - Receiver ConfirmCheckpoint 7@1496348508893 for b4a5c72b52779ab9b2b093b85b8b20c9. 4) Session window trigger is fired and windows are merged and evaluated. 5) Flink attempts to send data to Kafka but fails because brokers are down. 6) Flink restarts the job and restores state from checkpoint 7. However, there's no state available in the checkpoint for the session windows. Otherwise we would have seen the following in the logs. Right? HeapKeyedStateBackend - Initializing heap keyed state backend from snapshot HeapKeyedStateBackend - Restoring snapshot from state handles Relevant logs: Job Manager: 2017-06-01 20:22:54,585 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring from latest valid checkpoint: Checkpoint 7 @ 1496348508893 for 510a7a83f509adace6704e7f2caa0b75. Task Manager: 2017-06-01 20:22:57,349 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory. The case where data loss is not observed: 1) Kafka source receives data. 2) Bring all Kafka brokers down 3) Session window trigger is fired and windows are merged. 4) Flink triggers a checkpoint. Checkpoints are successful. (Checkpoint 12) Kafka source task commits the correct offsets. Not sure what was check-pointed for the session window task. Relevant logs: 2017-06-02 18:53:52,953 DEBUG org.apache.flink.yarn.YarnTaskManager - Receiver TriggerCheckpoint 12@1496429632943 for 138cd5e5c8065174e8e326fbb66ac4cd. 2017-06-02 18:53:53,151 DEBUG org.apache.flink.yarn.YarnTaskManager - Receiver ConfirmCheckpoint 12@1496429632943 for 138cd5e5c8065174e8e326fbb66ac4cd. 2017-06-02 18:53:53,151 DEBUG org.apache.flink.yarn.YarnTaskManager - Receiver ConfirmCheckpoint 12@1496429632943 for dcfb670859a7fbf4ddb90c70ed5344dd. 5) Session windows are evaluated. 6) Flink attempts to send data to Kafka but fails because brokers are down. 7) Flink restarts the job and restores state from checkpoint 12. State is available for the session window in the checkpoint and the task is restored with the state. Relevant logs: JobManager logs: 2017-06-02 18:54:19,826 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring from latest valid checkpoint: Checkpoint 12 @ 1496429632943 for ea51350d9d689b2b09ab8fd2fe0f6454. TaskManager logs: 2017-06-02 18:54:23,582 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend from snapshot. 2017-06-02 18:54:23,583 DEBUG org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Restoring snapshot from state handles: [KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRan ge{startKeyGroup=0, endKeyGroup=127}, offsets=[3587, 3597, 3607, 3617, 3627, 3637, 3647, 3657, 3667, 3685, 3695, 3705, 3715, 3725, 3735, 3745, 3755, 3765, 3775, 3785, 3795, 3813, 3823, 3841, 3851, 3861, 3871, 3881, 3891 , 3901, 3911, 3921, 3931, 3941, 3951, 3961, 3971, 3981, 3991, 4009, 4019, 4029, 15215, 15225, 15235, 15245, 15255, 15265, 15275, 15285, 15295, 16187, 16197, 16207, 16217, 16227, 16237, 16247, 16257, 17149, 17159, 17169, 17179, 17189, 18081, 18091, 18101, 18111, 18121, 18131, 18141, 18151, 18161, 18171, 18181, 18191, 18201, 19093, 19985, 19995, 20005, 20015, 20025, 20043, 20061, 20071, 20081, 20099, 20109, 20119, 20129, 20139, 20149, 2 0159, 20169, 20179, 20189, 20199, 20209, 20219, 20229, 20239, 20249, 20259, 20269, 20287, 20297, 20307, 20317, 20327, 20337, 20347, 33039, 33049, 33059, 33069, 34700, 34710, 34720, 34730, 34740, 34750, 34760, 34770, 34780, 34790, 34800, 34810]}, data=File State: hdfs://dc1udtlhcld005.stack.qadev.corp:8022/user/harmony_user/flink-checkpoint/ea51350d9d689b2b09ab8fd2fe0f6454/chk-12/8373feed-7494-4b43-98ae-e1c4214b7890 [34820 bytes]}]. Assuming that I am correctly interpreting the logs, I can think of three possible conclusions about why we observed data loss in the first case: 1) I am missing some Flink setting. 2) Flink thought it check-pointed the windows data successfully, but didn't. (We're using cloudera hadoop, but haven't built Flink with cloudera hadoop binaries. ) 3) Flink is not set up to check-point the data in session windows before they are merged? I have attached the log files for the successful run with this post. Please let us know what you guys think. Thanks for your patience. jobManagerNoDataLoss.log tmOneNoDataLoss.log tmTwoNoDataLoss.log |
I tested this with the standalone cluster, and I don't see this problem. So, the problem could be that we haven't built Flink against cloudera Hadoop? I will test it out.
|
Thanks for the updates and testing efforts on this! I’m sorry that I currently haven’t found the change to look closely into the testing scenarios you’ve listed, yet. But please keep us updated on this thread after testing it out also with the Cloudera build. One other suggestion for your test to make sure that some failed record is actually retried: you can add a dummy verifying operator right before the Kafka sink. At least that way you should be able to eliminate the possibility that the Kafka sink is incorrectly ignoring failed records when checkpointing. From another look at the Kafka sink code, I’m pretty sure this shouldn’t be the case. Gordon
On 4 June 2017 at 2:14:40 PM, ninad ([hidden email]) wrote:
|
Yeah, this seems like a problem with flink check-pointing. The fact that flink thinks that a checkpoint was successful, but in fact it wasn't. On Jun 4, 2017 7:37 AM, "Tzu-Li (Gordon) Tai [via Apache Flink User Mailing List archive.]" <[hidden email]> wrote:
|
Hi Ninad, the logs for the data loss case weren't attached to the mails. Maybe you could attach them again in the same way as you did for the no data loss logs. Cheers, Till On Sun, Jun 4, 2017 at 2:55 PM, ninad <[hidden email]> wrote:
|
Hi, From the logs and the description of your test scenarios where data loss is observed and not observed, it seems like the differentiating factor here is whether or not the session windows trigger was first fired when the checkpoint occurred. It doesn’t however explain the case where your tests pass on a standalone cluster. Have you also re-tested your scenarios on Cloudera, with Flink built against the Cloudera binaries? On 6 June 2017 at 4:53:30 PM, ninad ([hidden email]) wrote:
|
Not yet. Planning to do that today.
|
Free forum by Nabble | Edit this page |