Flink application has slightly data loss using Processing Time

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink application has slightly data loss using Processing Time

rainieli
Hello Flink Community,

Our flink application in v1.9, the basic logic of this application is consuming one large kafka topic and filter some fields, then produce data to a new kafka topic.
After comparing the original kafka topic count with the generated kafka topic based on the same field by using presto query, it had slightly data loss (around 1.37220156e-7 per hour). 
The Original kafka topic is collecting data from mobile devices, it could have late arrival events. That's why we use processing time since order does not matter. 

This job is using Processing time, any idea what could potentially cause this data loss?
Also if flink is using processing time, what is the default time window? Will the default time window cause it? 

Appreciated for any suggestions.
Thanks
Best regards
Rainie
Reply | Threaded
Open this post in threaded view
|

Re: Flink application has slightly data loss using Processing Time

Smile
Hi Rainie,

Could you please provide more information about your processing logic?
Do you use window operators?
If there's no time-based operator in your logic, late arrival data won't be
dropped by default and there might be something wrong with your flat map or
filter operator. Otherwise, you can use sideOutputLateData() to get the late
data of the window and have a look at them. See [1] for more information
about sideOutputLateData().

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output 

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink application has slightly data loss using Processing Time

rainieli
Thanks for the quick response, Smile.
I don't use window operators or flatmap.
Here is the core logic of my filter, it only iterates on filters list. Will rebalance() cause it? 

Thanks again.
Best regards
Rainie
SingleOutputStreamOperator<SplitterIntermediateRecord<T>> matchedRecordsStream =
eventStream
.rebalance()
.process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() {
public void processElement(
T element,
ProcessFunction<T, SplitterIntermediateRecord<T>>.Context context,
Collector<SplitterIntermediateRecord<T>> collector) {
for (StreamFilter filter : filters) {
if (filter.match(element)) {
SubstreamConfig substreamConfig = filter.getSubstreamConfig();
SplitterIntermediateRecord<T> result = new SplitterIntermediateRecord<>(
substreamConfig.getKafkaCluster(),
substreamConfig.getKafkaTopic(),
substreamConfig.getCutoverKafkaTopic(),
substreamConfig.getCutoverTimestampInMs(),
element);
collector.collect(result);
}
}
}
})
.name("Process-" + eventClass.getSimpleName());

On Mon, Mar 8, 2021 at 1:03 AM Smile <[hidden email]> wrote:
Hi Rainie,

Could you please provide more information about your processing logic?
Do you use window operators?
If there's no time-based operator in your logic, late arrival data won't be
dropped by default and there might be something wrong with your flat map or
filter operator. Otherwise, you can use sideOutputLateData() to get the late
data of the window and have a look at them. See [1] for more information
about sideOutputLateData().

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink application has slightly data loss using Processing Time

Yun Gao
Hi Rainie,

From the code it seems the current problem does not use the time-related functionality like
window/timer? If so, the problem would be indepdent with the time type used.

Also, it would not likely due to rebalance() since the network layer has the check of sequence
number. If there are missed record there would be failover. 

Since the current logic seems not rely on too much complex functionality, would it be possible
that there might be some inconsistency between the flink implementation and the presto one ?

Best,
Yun


------------------------------------------------------------------
Sender:Rainie Li<[hidden email]>
Date:2021/03/08 17:14:30
Recipient:Smile<[hidden email]>
Cc:user<[hidden email]>
Theme:Re: Flink application has slightly data loss using Processing Time

Thanks for the quick response, Smile.
I don't use window operators or flatmap.
Here is the core logic of my filter, it only iterates on filters list. Will rebalance() cause it? 

Thanks again.
Best regards
Rainie
SingleOutputStreamOperator<SplitterIntermediateRecord<T>> matchedRecordsStream =
eventStream
.rebalance()
.process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() {
public void processElement(
T element,
ProcessFunction<T, SplitterIntermediateRecord<T>>.Context context,
Collector<SplitterIntermediateRecord<T>> collector) {
for (StreamFilter filter : filters) {
if (filter.match(element)) {
SubstreamConfig substreamConfig = filter.getSubstreamConfig();
SplitterIntermediateRecord<T> result = new SplitterIntermediateRecord<>(
substreamConfig.getKafkaCluster(),
substreamConfig.getKafkaTopic(),
substreamConfig.getCutoverKafkaTopic(),
substreamConfig.getCutoverTimestampInMs(),
element);
collector.collect(result);
}
}
}
})
.name("Process-" + eventClass.getSimpleName());

On Mon, Mar 8, 2021 at 1:03 AM Smile <[hidden email]> wrote:
Hi Rainie,

Could you please provide more information about your processing logic?
Do you use window operators?
If there's no time-based operator in your logic, late arrival data won't be
dropped by default and there might be something wrong with your flat map or
filter operator. Otherwise, you can use sideOutputLateData() to get the late
data of the window and have a look at them. See [1] for more information
about sideOutputLateData().

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Flink application has slightly data loss using Processing Time

David Anderson-4
In reply to this post by rainieli
Rainie,

Were there any failures/restarts, or is this discrepancy observed without any disruption to the processing?

Regards,
David

On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <[hidden email]> wrote:
Thanks for the quick response, Smile.
I don't use window operators or flatmap.
Here is the core logic of my filter, it only iterates on filters list. Will rebalance() cause it? 

Thanks again.
Best regards
Rainie
SingleOutputStreamOperator<SplitterIntermediateRecord<T>> matchedRecordsStream =
eventStream
.rebalance()
.process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() {
public void processElement(
T element,
ProcessFunction<T, SplitterIntermediateRecord<T>>.Context context,
Collector<SplitterIntermediateRecord<T>> collector) {
for (StreamFilter filter : filters) {
if (filter.match(element)) {
SubstreamConfig substreamConfig = filter.getSubstreamConfig();
SplitterIntermediateRecord<T> result = new SplitterIntermediateRecord<>(
substreamConfig.getKafkaCluster(),
substreamConfig.getKafkaTopic(),
substreamConfig.getCutoverKafkaTopic(),
substreamConfig.getCutoverTimestampInMs(),
element);
collector.collect(result);
}
}
}
})
.name("Process-" + eventClass.getSimpleName());

On Mon, Mar 8, 2021 at 1:03 AM Smile <[hidden email]> wrote:
Hi Rainie,

Could you please provide more information about your processing logic?
Do you use window operators?
If there's no time-based operator in your logic, late arrival data won't be
dropped by default and there might be something wrong with your flat map or
filter operator. Otherwise, you can use sideOutputLateData() to get the late
data of the window and have a look at them. See [1] for more information
about sideOutputLateData().

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink application has slightly data loss using Processing Time

rainieli
Thanks Yun and David.
There were some tasks that got restarted. We configured the restart policy and the job didn't fail.
Will task restart cause data loss?

Thanks
Rainie


On Mon, Mar 8, 2021 at 10:42 AM David Anderson <[hidden email]> wrote:
Rainie,

Were there any failures/restarts, or is this discrepancy observed without any disruption to the processing?

Regards,
David

On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <[hidden email]> wrote:
Thanks for the quick response, Smile.
I don't use window operators or flatmap.
Here is the core logic of my filter, it only iterates on filters list. Will rebalance() cause it? 

Thanks again.
Best regards
Rainie
SingleOutputStreamOperator<SplitterIntermediateRecord<T>> matchedRecordsStream =
eventStream
.rebalance()
.process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() {
public void processElement(
T element,
ProcessFunction<T, SplitterIntermediateRecord<T>>.Context context,
Collector<SplitterIntermediateRecord<T>> collector) {
for (StreamFilter filter : filters) {
if (filter.match(element)) {
SubstreamConfig substreamConfig = filter.getSubstreamConfig();
SplitterIntermediateRecord<T> result = new SplitterIntermediateRecord<>(
substreamConfig.getKafkaCluster(),
substreamConfig.getKafkaTopic(),
substreamConfig.getCutoverKafkaTopic(),
substreamConfig.getCutoverTimestampInMs(),
element);
collector.collect(result);
}
}
}
})
.name("Process-" + eventClass.getSimpleName());

On Mon, Mar 8, 2021 at 1:03 AM Smile <[hidden email]> wrote:
Hi Rainie,

Could you please provide more information about your processing logic?
Do you use window operators?
If there's no time-based operator in your logic, late arrival data won't be
dropped by default and there might be something wrong with your flat map or
filter operator. Otherwise, you can use sideOutputLateData() to get the late
data of the window and have a look at them. See [1] for more information
about sideOutputLateData().

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink application has slightly data loss using Processing Time

David Anderson-4
Rainie,

A restart after a failure can cause data loss if you aren't using checkpointing, or if you experience a transaction timeout. 

A manual restart can also lead to data loss, depending on how you manage the offsets, transactions, and other state during the restart. What happened in this case?

David

On Mon, Mar 8, 2021 at 7:53 PM Rainie Li <[hidden email]> wrote:
Thanks Yun and David.
There were some tasks that got restarted. We configured the restart policy and the job didn't fail.
Will task restart cause data loss?

Thanks
Rainie


On Mon, Mar 8, 2021 at 10:42 AM David Anderson <[hidden email]> wrote:
Rainie,

Were there any failures/restarts, or is this discrepancy observed without any disruption to the processing?

Regards,
David

On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <[hidden email]> wrote:
Thanks for the quick response, Smile.
I don't use window operators or flatmap.
Here is the core logic of my filter, it only iterates on filters list. Will rebalance() cause it? 

Thanks again.
Best regards
Rainie
SingleOutputStreamOperator<SplitterIntermediateRecord<T>> matchedRecordsStream =
eventStream
.rebalance()
.process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() {
public void processElement(
T element,
ProcessFunction<T, SplitterIntermediateRecord<T>>.Context context,
Collector<SplitterIntermediateRecord<T>> collector) {
for (StreamFilter filter : filters) {
if (filter.match(element)) {
SubstreamConfig substreamConfig = filter.getSubstreamConfig();
SplitterIntermediateRecord<T> result = new SplitterIntermediateRecord<>(
substreamConfig.getKafkaCluster(),
substreamConfig.getKafkaTopic(),
substreamConfig.getCutoverKafkaTopic(),
substreamConfig.getCutoverTimestampInMs(),
element);
collector.collect(result);
}
}
}
})
.name("Process-" + eventClass.getSimpleName());

On Mon, Mar 8, 2021 at 1:03 AM Smile <[hidden email]> wrote:
Hi Rainie,

Could you please provide more information about your processing logic?
Do you use window operators?
If there's no time-based operator in your logic, late arrival data won't be
dropped by default and there might be something wrong with your flat map or
filter operator. Otherwise, you can use sideOutputLateData() to get the late
data of the window and have a look at them. See [1] for more information
about sideOutputLateData().

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink application has slightly data loss using Processing Time

rainieli
Thanks for the info, David.
The job has checkpointing. 
I saw some tasks failed due to "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka"
Here is stacktrack from JM log:

container_e17_1611597945897_8007_01_000240 @ worker-node-host (dataPort=42321).
2021-02-10 01:19:27,206 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 42 record(s) for topic-name-38: 116447 ms has passed since last append
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
... 18 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 42 record(s) for frontend_event_core-38: 116447 ms has passed since last append
2021-02-10 01:19:27,216 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job (7dab4c1a1c6984e70732b8e3f218020f) switched from state RUNNING to FAILING.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1410)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1320)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:689)
at org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Best regards
Rainie

On Mon, Mar 8, 2021 at 11:09 AM David Anderson <[hidden email]> wrote:
Rainie,

A restart after a failure can cause data loss if you aren't using checkpointing, or if you experience a transaction timeout. 

A manual restart can also lead to data loss, depending on how you manage the offsets, transactions, and other state during the restart. What happened in this case?

David

On Mon, Mar 8, 2021 at 7:53 PM Rainie Li <[hidden email]> wrote:
Thanks Yun and David.
There were some tasks that got restarted. We configured the restart policy and the job didn't fail.
Will task restart cause data loss?

Thanks
Rainie


On Mon, Mar 8, 2021 at 10:42 AM David Anderson <[hidden email]> wrote:
Rainie,

Were there any failures/restarts, or is this discrepancy observed without any disruption to the processing?

Regards,
David

On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <[hidden email]> wrote:
Thanks for the quick response, Smile.
I don't use window operators or flatmap.
Here is the core logic of my filter, it only iterates on filters list. Will rebalance() cause it? 

Thanks again.
Best regards
Rainie
SingleOutputStreamOperator<SplitterIntermediateRecord<T>> matchedRecordsStream =
eventStream
.rebalance()
.process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() {
public void processElement(
T element,
ProcessFunction<T, SplitterIntermediateRecord<T>>.Context context,
Collector<SplitterIntermediateRecord<T>> collector) {
for (StreamFilter filter : filters) {
if (filter.match(element)) {
SubstreamConfig substreamConfig = filter.getSubstreamConfig();
SplitterIntermediateRecord<T> result = new SplitterIntermediateRecord<>(
substreamConfig.getKafkaCluster(),
substreamConfig.getKafkaTopic(),
substreamConfig.getCutoverKafkaTopic(),
substreamConfig.getCutoverTimestampInMs(),
element);
collector.collect(result);
}
}
}
})
.name("Process-" + eventClass.getSimpleName());

On Mon, Mar 8, 2021 at 1:03 AM Smile <[hidden email]> wrote:
Hi Rainie,

Could you please provide more information about your processing logic?
Do you use window operators?
If there's no time-based operator in your logic, late arrival data won't be
dropped by default and there might be something wrong with your flat map or
filter operator. Otherwise, you can use sideOutputLateData() to get the late
data of the window and have a look at them. See [1] for more information
about sideOutputLateData().

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink application has slightly data loss using Processing Time

Arvid Heise-4
Hi Rainie,

This looks like the record batching in Kafka producer timed out. At this point, the respective records are lost forever. You probably want to tweak your Kafka settings [1].

Usually, Flink should fail and restart at this point and recover without data loss. However, if the transactions are also timing out, that may explain the data loss. So you probably also want to increase the transaction timeout.


On Mon, Mar 8, 2021 at 8:34 PM Rainie Li <[hidden email]> wrote:
Thanks for the info, David.
The job has checkpointing. 
I saw some tasks failed due to "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka"
Here is stacktrack from JM log:

container_e17_1611597945897_8007_01_000240 @ worker-node-host (dataPort=42321).
2021-02-10 01:19:27,206 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 42 record(s) for topic-name-38: 116447 ms has passed since last append
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
... 18 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 42 record(s) for frontend_event_core-38: 116447 ms has passed since last append
2021-02-10 01:19:27,216 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job (7dab4c1a1c6984e70732b8e3f218020f) switched from state RUNNING to FAILING.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1410)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1320)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:689)
at org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Best regards
Rainie

On Mon, Mar 8, 2021 at 11:09 AM David Anderson <[hidden email]> wrote:
Rainie,

A restart after a failure can cause data loss if you aren't using checkpointing, or if you experience a transaction timeout. 

A manual restart can also lead to data loss, depending on how you manage the offsets, transactions, and other state during the restart. What happened in this case?

David

On Mon, Mar 8, 2021 at 7:53 PM Rainie Li <[hidden email]> wrote:
Thanks Yun and David.
There were some tasks that got restarted. We configured the restart policy and the job didn't fail.
Will task restart cause data loss?

Thanks
Rainie


On Mon, Mar 8, 2021 at 10:42 AM David Anderson <[hidden email]> wrote:
Rainie,

Were there any failures/restarts, or is this discrepancy observed without any disruption to the processing?

Regards,
David

On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <[hidden email]> wrote:
Thanks for the quick response, Smile.
I don't use window operators or flatmap.
Here is the core logic of my filter, it only iterates on filters list. Will rebalance() cause it? 

Thanks again.
Best regards
Rainie
SingleOutputStreamOperator<SplitterIntermediateRecord<T>> matchedRecordsStream =
eventStream
.rebalance()
.process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() {
public void processElement(
T element,
ProcessFunction<T, SplitterIntermediateRecord<T>>.Context context,
Collector<SplitterIntermediateRecord<T>> collector) {
for (StreamFilter filter : filters) {
if (filter.match(element)) {
SubstreamConfig substreamConfig = filter.getSubstreamConfig();
SplitterIntermediateRecord<T> result = new SplitterIntermediateRecord<>(
substreamConfig.getKafkaCluster(),
substreamConfig.getKafkaTopic(),
substreamConfig.getCutoverKafkaTopic(),
substreamConfig.getCutoverTimestampInMs(),
element);
collector.collect(result);
}
}
}
})
.name("Process-" + eventClass.getSimpleName());

On Mon, Mar 8, 2021 at 1:03 AM Smile <[hidden email]> wrote:
Hi Rainie,

Could you please provide more information about your processing logic?
Do you use window operators?
If there's no time-based operator in your logic, late arrival data won't be
dropped by default and there might be something wrong with your flat map or
filter operator. Otherwise, you can use sideOutputLateData() to get the late
data of the window and have a look at them. See [1] for more information
about sideOutputLateData().

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink application has slightly data loss using Processing Time

rainieli
Thanks for the suggestion, Arvid.
Currently my job is using producer.kafka.request.timeout.ms=90000
I will try to increase to 120000.

Best regards
Rainie

On Thu, Mar 11, 2021 at 3:58 AM Arvid Heise <[hidden email]> wrote:
Hi Rainie,

This looks like the record batching in Kafka producer timed out. At this point, the respective records are lost forever. You probably want to tweak your Kafka settings [1].

Usually, Flink should fail and restart at this point and recover without data loss. However, if the transactions are also timing out, that may explain the data loss. So you probably also want to increase the transaction timeout.


On Mon, Mar 8, 2021 at 8:34 PM Rainie Li <[hidden email]> wrote:
Thanks for the info, David.
The job has checkpointing. 
I saw some tasks failed due to "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka"
Here is stacktrack from JM log:

container_e17_1611597945897_8007_01_000240 @ worker-node-host (dataPort=42321).
2021-02-10 01:19:27,206 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 42 record(s) for topic-name-38: 116447 ms has passed since last append
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
... 18 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 42 record(s) for frontend_event_core-38: 116447 ms has passed since last append
2021-02-10 01:19:27,216 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job (7dab4c1a1c6984e70732b8e3f218020f) switched from state RUNNING to FAILING.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1410)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1320)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:689)
at org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Best regards
Rainie

On Mon, Mar 8, 2021 at 11:09 AM David Anderson <[hidden email]> wrote:
Rainie,

A restart after a failure can cause data loss if you aren't using checkpointing, or if you experience a transaction timeout. 

A manual restart can also lead to data loss, depending on how you manage the offsets, transactions, and other state during the restart. What happened in this case?

David

On Mon, Mar 8, 2021 at 7:53 PM Rainie Li <[hidden email]> wrote:
Thanks Yun and David.
There were some tasks that got restarted. We configured the restart policy and the job didn't fail.
Will task restart cause data loss?

Thanks
Rainie


On Mon, Mar 8, 2021 at 10:42 AM David Anderson <[hidden email]> wrote:
Rainie,

Were there any failures/restarts, or is this discrepancy observed without any disruption to the processing?

Regards,
David

On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <[hidden email]> wrote:
Thanks for the quick response, Smile.
I don't use window operators or flatmap.
Here is the core logic of my filter, it only iterates on filters list. Will rebalance() cause it? 

Thanks again.
Best regards
Rainie
SingleOutputStreamOperator<SplitterIntermediateRecord<T>> matchedRecordsStream =
eventStream
.rebalance()
.process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() {
public void processElement(
T element,
ProcessFunction<T, SplitterIntermediateRecord<T>>.Context context,
Collector<SplitterIntermediateRecord<T>> collector) {
for (StreamFilter filter : filters) {
if (filter.match(element)) {
SubstreamConfig substreamConfig = filter.getSubstreamConfig();
SplitterIntermediateRecord<T> result = new SplitterIntermediateRecord<>(
substreamConfig.getKafkaCluster(),
substreamConfig.getKafkaTopic(),
substreamConfig.getCutoverKafkaTopic(),
substreamConfig.getCutoverTimestampInMs(),
element);
collector.collect(result);
}
}
}
})
.name("Process-" + eventClass.getSimpleName());

On Mon, Mar 8, 2021 at 1:03 AM Smile <[hidden email]> wrote:
Hi Rainie,

Could you please provide more information about your processing logic?
Do you use window operators?
If there's no time-based operator in your logic, late arrival data won't be
dropped by default and there might be something wrong with your flat map or
filter operator. Otherwise, you can use sideOutputLateData() to get the late
data of the window and have a look at them. See [1] for more information
about sideOutputLateData().

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink application has slightly data loss using Processing Time

rainieli
Hi Arvid,

After increasing  producer.kafka.request.timeout.ms from 90000 to 120000.
The job has been running fine for almost 5 days, but one of the tasks failed again recently for the same timeout error. (attached stack trace below)
Should I keep increasing producer.kafka.request.timeout.ms value?

Thanks again for the help.
Best regards
Rainie

Stacktrace:
{job_name}/{job_id}/chk-43556/_metadata, reference=(default), fileStateSizeThreshold=2048, writeBufferSize=4096}, synchronous part) in thread Thread[Process-Event -&gt; Filter-data08 (237/240),5,Flink Task Threads] took 0 ms.
Canceler.run(Task.java:1434)
... 1 more2021-03-17 17:46:42,284 INFO  org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Could not complete snapshot 43556 for operator Sink: Sink-data08 (237/240).
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 53 record(s) for frontend_event_core-46: 122269 ms has passed since last append
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
  at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
  at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
  at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
  at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
  at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
  at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
  at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
  at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
  at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
  at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 53 record(s) for frontend_event_core-46: 122269 ms has passed since last append
2021-03-17 17:46:42,354 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Source: {operator name}-Event (214/240) (69ec48cfb074de8812eb622ffa097233).
2021-03-17 17:46:42,354 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: {operator name}-Event (214/240) (69ec48cfb074de8812eb622ffa097233) switched from RUNNING to CANCELING.
2021-03-17 17:46:42,355 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Source: {operator name}-Event (214/240) (69ec48cfb074de8812eb622ffa097233).
2021-03-17 17:46:42,358 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - Error while canceling task.
java.lang.Exception: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232)
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
  at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
  at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
  at com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:436)
  at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528)
  at org.apache.flink.runtime.taskmanager.Task$Task
2021-03-17 17:46:42,391 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Process-Event -&gt; Filter-data08 (237/240) (34cf65b29f7153d6f0a82819eeaf218d).


On Thu, Mar 11, 2021 at 7:12 AM Rainie Li <[hidden email]> wrote:
Thanks for the suggestion, Arvid.
Currently my job is using producer.kafka.request.timeout.ms=90000
I will try to increase to 120000.

Best regards
Rainie

On Thu, Mar 11, 2021 at 3:58 AM Arvid Heise <[hidden email]> wrote:
Hi Rainie,

This looks like the record batching in Kafka producer timed out. At this point, the respective records are lost forever. You probably want to tweak your Kafka settings [1].

Usually, Flink should fail and restart at this point and recover without data loss. However, if the transactions are also timing out, that may explain the data loss. So you probably also want to increase the transaction timeout.


On Mon, Mar 8, 2021 at 8:34 PM Rainie Li <[hidden email]> wrote:
Thanks for the info, David.
The job has checkpointing. 
I saw some tasks failed due to "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka"
Here is stacktrack from JM log:

container_e17_1611597945897_8007_01_000240 @ worker-node-host (dataPort=42321).
2021-02-10 01:19:27,206 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 42 record(s) for topic-name-38: 116447 ms has passed since last append
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
... 18 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 42 record(s) for frontend_event_core-38: 116447 ms has passed since last append
2021-02-10 01:19:27,216 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job (7dab4c1a1c6984e70732b8e3f218020f) switched from state RUNNING to FAILING.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1410)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1320)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:689)
at org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Best regards
Rainie

On Mon, Mar 8, 2021 at 11:09 AM David Anderson <[hidden email]> wrote:
Rainie,

A restart after a failure can cause data loss if you aren't using checkpointing, or if you experience a transaction timeout. 

A manual restart can also lead to data loss, depending on how you manage the offsets, transactions, and other state during the restart. What happened in this case?

David

On Mon, Mar 8, 2021 at 7:53 PM Rainie Li <[hidden email]> wrote:
Thanks Yun and David.
There were some tasks that got restarted. We configured the restart policy and the job didn't fail.
Will task restart cause data loss?

Thanks
Rainie


On Mon, Mar 8, 2021 at 10:42 AM David Anderson <[hidden email]> wrote:
Rainie,

Were there any failures/restarts, or is this discrepancy observed without any disruption to the processing?

Regards,
David

On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <[hidden email]> wrote:
Thanks for the quick response, Smile.
I don't use window operators or flatmap.
Here is the core logic of my filter, it only iterates on filters list. Will rebalance() cause it? 

Thanks again.
Best regards
Rainie
SingleOutputStreamOperator<SplitterIntermediateRecord<T>> matchedRecordsStream =
eventStream
.rebalance()
.process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() {
public void processElement(
T element,
ProcessFunction<T, SplitterIntermediateRecord<T>>.Context context,
Collector<SplitterIntermediateRecord<T>> collector) {
for (StreamFilter filter : filters) {
if (filter.match(element)) {
SubstreamConfig substreamConfig = filter.getSubstreamConfig();
SplitterIntermediateRecord<T> result = new SplitterIntermediateRecord<>(
substreamConfig.getKafkaCluster(),
substreamConfig.getKafkaTopic(),
substreamConfig.getCutoverKafkaTopic(),
substreamConfig.getCutoverTimestampInMs(),
element);
collector.collect(result);
}
}
}
})
.name("Process-" + eventClass.getSimpleName());

On Mon, Mar 8, 2021 at 1:03 AM Smile <[hidden email]> wrote:
Hi Rainie,

Could you please provide more information about your processing logic?
Do you use window operators?
If there's no time-based operator in your logic, late arrival data won't be
dropped by default and there might be something wrong with your flat map or
filter operator. Otherwise, you can use sideOutputLateData() to get the late
data of the window and have a look at them. See [1] for more information
about sideOutputLateData().

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink application has slightly data loss using Processing Time

David Anderson-4
You should increase the kafka transaction timeout -- transaction.max.timeout.ms -- to something much larger than the default, which I believe is 15 minutes. Suitable values are more on the order of a few hours to a few days -- long enough to allow for any conceivable outage. This way, if a request does timeout and causes the Flink job to fail, so long as Kafka and Flink recover within the transaction timeout you won't lose any data.

Regards,
David



On Sat, Mar 20, 2021 at 12:02 AM Rainie Li <[hidden email]> wrote:
Hi Arvid,

After increasing  producer.kafka.request.timeout.ms from 90000 to 120000.
The job has been running fine for almost 5 days, but one of the tasks failed again recently for the same timeout error. (attached stack trace below)
Should I keep increasing producer.kafka.request.timeout.ms value?

Thanks again for the help.
Best regards
Rainie

Stacktrace:
{job_name}/{job_id}/chk-43556/_metadata, reference=(default), fileStateSizeThreshold=2048, writeBufferSize=4096}, synchronous part) in thread Thread[Process-Event -&gt; Filter-data08 (237/240),5,Flink Task Threads] took 0 ms.
Canceler.run(Task.java:1434)
... 1 more2021-03-17 17:46:42,284 INFO  org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Could not complete snapshot 43556 for operator Sink: Sink-data08 (237/240).
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 53 record(s) for frontend_event_core-46: 122269 ms has passed since last append
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
  at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
  at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
  at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
  at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
  at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
  at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
  at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
  at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
  at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
  at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 53 record(s) for frontend_event_core-46: 122269 ms has passed since last append
2021-03-17 17:46:42,354 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Source: {operator name}-Event (214/240) (69ec48cfb074de8812eb622ffa097233).
2021-03-17 17:46:42,354 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: {operator name}-Event (214/240) (69ec48cfb074de8812eb622ffa097233) switched from RUNNING to CANCELING.
2021-03-17 17:46:42,355 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Source: {operator name}-Event (214/240) (69ec48cfb074de8812eb622ffa097233).
2021-03-17 17:46:42,358 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - Error while canceling task.
java.lang.Exception: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232)
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
  at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
  at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
  at com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:436)
  at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528)
  at org.apache.flink.runtime.taskmanager.Task$Task
2021-03-17 17:46:42,391 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Process-Event -&gt; Filter-data08 (237/240) (34cf65b29f7153d6f0a82819eeaf218d).


On Thu, Mar 11, 2021 at 7:12 AM Rainie Li <[hidden email]> wrote:
Thanks for the suggestion, Arvid.
Currently my job is using producer.kafka.request.timeout.ms=90000
I will try to increase to 120000.

Best regards
Rainie

On Thu, Mar 11, 2021 at 3:58 AM Arvid Heise <[hidden email]> wrote:
Hi Rainie,

This looks like the record batching in Kafka producer timed out. At this point, the respective records are lost forever. You probably want to tweak your Kafka settings [1].

Usually, Flink should fail and restart at this point and recover without data loss. However, if the transactions are also timing out, that may explain the data loss. So you probably also want to increase the transaction timeout.


On Mon, Mar 8, 2021 at 8:34 PM Rainie Li <[hidden email]> wrote:
Thanks for the info, David.
The job has checkpointing. 
I saw some tasks failed due to "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka"
Here is stacktrack from JM log:

container_e17_1611597945897_8007_01_000240 @ worker-node-host (dataPort=42321).
2021-02-10 01:19:27,206 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 42 record(s) for topic-name-38: 116447 ms has passed since last append
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
... 18 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 42 record(s) for frontend_event_core-38: 116447 ms has passed since last append
2021-02-10 01:19:27,216 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job (7dab4c1a1c6984e70732b8e3f218020f) switched from state RUNNING to FAILING.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1410)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1320)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:689)
at org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Best regards
Rainie

On Mon, Mar 8, 2021 at 11:09 AM David Anderson <[hidden email]> wrote:
Rainie,

A restart after a failure can cause data loss if you aren't using checkpointing, or if you experience a transaction timeout. 

A manual restart can also lead to data loss, depending on how you manage the offsets, transactions, and other state during the restart. What happened in this case?

David

On Mon, Mar 8, 2021 at 7:53 PM Rainie Li <[hidden email]> wrote:
Thanks Yun and David.
There were some tasks that got restarted. We configured the restart policy and the job didn't fail.
Will task restart cause data loss?

Thanks
Rainie


On Mon, Mar 8, 2021 at 10:42 AM David Anderson <[hidden email]> wrote:
Rainie,

Were there any failures/restarts, or is this discrepancy observed without any disruption to the processing?

Regards,
David

On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <[hidden email]> wrote:
Thanks for the quick response, Smile.
I don't use window operators or flatmap.
Here is the core logic of my filter, it only iterates on filters list. Will rebalance() cause it? 

Thanks again.
Best regards
Rainie
SingleOutputStreamOperator<SplitterIntermediateRecord<T>> matchedRecordsStream =
eventStream
.rebalance()
.process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() {
public void processElement(
T element,
ProcessFunction<T, SplitterIntermediateRecord<T>>.Context context,
Collector<SplitterIntermediateRecord<T>> collector) {
for (StreamFilter filter : filters) {
if (filter.match(element)) {
SubstreamConfig substreamConfig = filter.getSubstreamConfig();
SplitterIntermediateRecord<T> result = new SplitterIntermediateRecord<>(
substreamConfig.getKafkaCluster(),
substreamConfig.getKafkaTopic(),
substreamConfig.getCutoverKafkaTopic(),
substreamConfig.getCutoverTimestampInMs(),
element);
collector.collect(result);
}
}
}
})
.name("Process-" + eventClass.getSimpleName());

On Mon, Mar 8, 2021 at 1:03 AM Smile <[hidden email]> wrote:
Hi Rainie,

Could you please provide more information about your processing logic?
Do you use window operators?
If there's no time-based operator in your logic, late arrival data won't be
dropped by default and there might be something wrong with your flat map or
filter operator. Otherwise, you can use sideOutputLateData() to get the late
data of the window and have a look at them. See [1] for more information
about sideOutputLateData().

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink application has slightly data loss using Processing Time

rainieli
I will try that.
Thanks for your help, David.

Best regards
Rainie

On Sat, Mar 20, 2021 at 9:46 AM David Anderson <[hidden email]> wrote:
You should increase the kafka transaction timeout -- transaction.max.timeout.ms -- to something much larger than the default, which I believe is 15 minutes. Suitable values are more on the order of a few hours to a few days -- long enough to allow for any conceivable outage. This way, if a request does timeout and causes the Flink job to fail, so long as Kafka and Flink recover within the transaction timeout you won't lose any data.

Regards,
David



On Sat, Mar 20, 2021 at 12:02 AM Rainie Li <[hidden email]> wrote:
Hi Arvid,

After increasing  producer.kafka.request.timeout.ms from 90000 to 120000.
The job has been running fine for almost 5 days, but one of the tasks failed again recently for the same timeout error. (attached stack trace below)
Should I keep increasing producer.kafka.request.timeout.ms value?

Thanks again for the help.
Best regards
Rainie

Stacktrace:
{job_name}/{job_id}/chk-43556/_metadata, reference=(default), fileStateSizeThreshold=2048, writeBufferSize=4096}, synchronous part) in thread Thread[Process-Event -&gt; Filter-data08 (237/240),5,Flink Task Threads] took 0 ms.
Canceler.run(Task.java:1434)
... 1 more2021-03-17 17:46:42,284 INFO  org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Could not complete snapshot 43556 for operator Sink: Sink-data08 (237/240).
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 53 record(s) for frontend_event_core-46: 122269 ms has passed since last append
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
  at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
  at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
  at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
  at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
  at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
  at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
  at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
  at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
  at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
  at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 53 record(s) for frontend_event_core-46: 122269 ms has passed since last append
2021-03-17 17:46:42,354 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Source: {operator name}-Event (214/240) (69ec48cfb074de8812eb622ffa097233).
2021-03-17 17:46:42,354 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: {operator name}-Event (214/240) (69ec48cfb074de8812eb622ffa097233) switched from RUNNING to CANCELING.
2021-03-17 17:46:42,355 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Source: {operator name}-Event (214/240) (69ec48cfb074de8812eb622ffa097233).
2021-03-17 17:46:42,358 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - Error while canceling task.
java.lang.Exception: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232)
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
  at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
  at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
  at com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:436)
  at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528)
  at org.apache.flink.runtime.taskmanager.Task$Task
2021-03-17 17:46:42,391 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Process-Event -&gt; Filter-data08 (237/240) (34cf65b29f7153d6f0a82819eeaf218d).


On Thu, Mar 11, 2021 at 7:12 AM Rainie Li <[hidden email]> wrote:
Thanks for the suggestion, Arvid.
Currently my job is using producer.kafka.request.timeout.ms=90000
I will try to increase to 120000.

Best regards
Rainie

On Thu, Mar 11, 2021 at 3:58 AM Arvid Heise <[hidden email]> wrote:
Hi Rainie,

This looks like the record batching in Kafka producer timed out. At this point, the respective records are lost forever. You probably want to tweak your Kafka settings [1].

Usually, Flink should fail and restart at this point and recover without data loss. However, if the transactions are also timing out, that may explain the data loss. So you probably also want to increase the transaction timeout.


On Mon, Mar 8, 2021 at 8:34 PM Rainie Li <[hidden email]> wrote:
Thanks for the info, David.
The job has checkpointing. 
I saw some tasks failed due to "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka"
Here is stacktrack from JM log:

container_e17_1611597945897_8007_01_000240 @ worker-node-host (dataPort=42321).
2021-02-10 01:19:27,206 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 42 record(s) for topic-name-38: 116447 ms has passed since last append
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
... 18 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 42 record(s) for frontend_event_core-38: 116447 ms has passed since last append
2021-02-10 01:19:27,216 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job (7dab4c1a1c6984e70732b8e3f218020f) switched from state RUNNING to FAILING.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1410)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1320)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:689)
at org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:573)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Best regards
Rainie

On Mon, Mar 8, 2021 at 11:09 AM David Anderson <[hidden email]> wrote:
Rainie,

A restart after a failure can cause data loss if you aren't using checkpointing, or if you experience a transaction timeout. 

A manual restart can also lead to data loss, depending on how you manage the offsets, transactions, and other state during the restart. What happened in this case?

David

On Mon, Mar 8, 2021 at 7:53 PM Rainie Li <[hidden email]> wrote:
Thanks Yun and David.
There were some tasks that got restarted. We configured the restart policy and the job didn't fail.
Will task restart cause data loss?

Thanks
Rainie


On Mon, Mar 8, 2021 at 10:42 AM David Anderson <[hidden email]> wrote:
Rainie,

Were there any failures/restarts, or is this discrepancy observed without any disruption to the processing?

Regards,
David

On Mon, Mar 8, 2021 at 10:14 AM Rainie Li <[hidden email]> wrote:
Thanks for the quick response, Smile.
I don't use window operators or flatmap.
Here is the core logic of my filter, it only iterates on filters list. Will rebalance() cause it? 

Thanks again.
Best regards
Rainie
SingleOutputStreamOperator<SplitterIntermediateRecord<T>> matchedRecordsStream =
eventStream
.rebalance()
.process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() {
public void processElement(
T element,
ProcessFunction<T, SplitterIntermediateRecord<T>>.Context context,
Collector<SplitterIntermediateRecord<T>> collector) {
for (StreamFilter filter : filters) {
if (filter.match(element)) {
SubstreamConfig substreamConfig = filter.getSubstreamConfig();
SplitterIntermediateRecord<T> result = new SplitterIntermediateRecord<>(
substreamConfig.getKafkaCluster(),
substreamConfig.getKafkaTopic(),
substreamConfig.getCutoverKafkaTopic(),
substreamConfig.getCutoverTimestampInMs(),
element);
collector.collect(result);
}
}
}
})
.name("Process-" + eventClass.getSimpleName());

On Mon, Mar 8, 2021 at 1:03 AM Smile <[hidden email]> wrote:
Hi Rainie,

Could you please provide more information about your processing logic?
Do you use window operators?
If there's no time-based operator in your logic, late arrival data won't be
dropped by default and there might be something wrong with your flat map or
filter operator. Otherwise, you can use sideOutputLateData() to get the late
data of the window and have a look at them. See [1] for more information
about sideOutputLateData().

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/