Re: Fink: KafkaProducer Data Loss

Posted by ninad on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13597.html

I built Flink v1.3.0 with cloudera hadoop and am able to see the data loss.

Here are the details:

tmOneCloudera583.log

Received session window task:
2017-06-08 15:10:46,131 INFO  org.apache.flink.runtime.taskmanager.Task                     - TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@f4361ec}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:1100)) -> Sink: sink.http.sep (1/1) (3ef2f947096f3b0986b8ad334c7d5d3c) switched from CREATED to DEPLOYING.

Finished checkpoint 2 (Synchronous part)
2017-06-08 15:15:51,982 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@f4361ec}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:1100)) -> Sink: sink.http.sep (1/1) - finished synchronous part of checkpoint 2.Alignment duration: 0 ms, snapshot duration 215 ms


The task failed before the verification of completed checkpoint could be verified.
i.e, I don't see the log saying "Notification of complete checkpoint for task TriggerWindow" for checkpoint 2.

jmCloudera583.log

Job Manager received acks for checkpoint 2

2017-06-08 15:15:51,898 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received acknowledge message for checkpoint 2 from task 3e980e4d3278ef0d0f5e0fa9591cc53d of job 3f5aef5e15a23bce627c05c94760fb16.
2017-06-08 15:15:51,982 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received acknowledge message for checkpoint 2 from task 3ef2f947096f3b0986b8ad334c7d5d3c of job 3f5aef5e15a23bce627c05c94760fb16
.

Job Manager tried to restore from checkpoint 2.

2017-06-08 15:16:02,111 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.
2017-06-08 15:16:02,111 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to retrieve checkpoint 2.
2017-06-08 15:16:02,122 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring from latest valid checkpoint: Checkpoint 2 @ 149693476
6105 for 3f5aef5e15a23bce627c05c94760fb16.


tmTwocloudera583.log

Task Manager tried to restore the data and was successful.

2017-06-08 15:16:02,258 DEBUG org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Restoring snapshot from state handles: [KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}, offsets=[13460, 13476, 13492, 13508, 13524, 13540, 13556, 13572, 13588, 13604, 13620, 13636, 13652, 13668, 13684, 14566, 14582, 14598, 14614, 14630, 14646, 14662, 14678, 14694, 14710, 14726, 14742, 14758, 14774, 14790, 14806, 14822, 14838, 14854, 14870, 14886, 14902, 14918, 14934, 14950, 14966, 14982, 14998, 15014, 15030, 15046, 15062, 15078, 15094, 15110, 15126, 15142, 15158, 15174, 15190, 15206, 16088, 16104, 16120, 16136, 28818, 28834, 28850, 28866, 28882, 28898, 28914, 28930, 28946, 28962, 28978, 28994, 29010, 29026, 29042, 29058, 29074, 29090, 29106, 29122, 29138, 29154, 29170, 40346, 40362, 40378, 40394, 40410, 40426, 40442, 40458, 40474, 40490, 40506, 40522, 40538, 41420, 41436, 41452, 41468, 41484, 41500, 41516, 41532, 41548, 41564, 41580, 41596, 41612, 41628, 41644, 41660, 41676, 41692, 41708, 41724, 41740, 41756, 41772, 41788, 41804, 41820, 41836, 41852, 41868, 41884, 41900, 41916]}, data=File State: hdfs://dc1udtlhcld005.stack.qadev.corp:8022/user/harmony_user/flink-checkpoint/3f5aef5e15a23bce627c05c94760fb16/chk-2/aa076014-120b-4955-ab46-89094358ebeb [41932 bytes]}].

But apparently, the retore state didn't have all the messages the window had received. Because
a few messages were not replayed, and the kafka sink didn't receive all the messages.

Attaching the files here.

jmCloudera583.log tmOneCloudera583.log tmTwoCloudera583.log 

Thanks for your patience guys.