Kafka consumer do not commit offset at checkpoint

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Kafka consumer do not commit offset at checkpoint

andy

Hi all, 

I posted a bug here but its seem is my configuration problem: https://issues.apache.org/jira/browse/FLINK-11335 so I resend this to mailing list

My env:

AWS EMR 5.20: hadoop, flink plugin

flink: 1.62/1.70

run under yarn-cluster

Kafka cluster: 1.0

I confirmed:

- No offset committed back to kafka cluster, checked by looking at current offset of each partition using kk-man (kafka manager) and kafka-consumer.sh script
- Checkpoint finished successfully base on task manager log (see below)
- I’m using aws s3 as statebackend 
- connection between kafka and task manager nodes is normal.
- increase the checkpoint interval to make sure it have enough time to commit (10sec)
- setCommitOffsetsOnCheckpoints is enable

Observed behavior:
- Redeploy the app again, would make it reprocess msgs again, not matter how long it was run before, no offset committed

How I deploy/redeploy app in yarn cluster:
- flink run -m yarn-cluster  -yid application_1551327455160_0002  pp_flink_convoy-1.1.jar
- flink cancel 6b109a9f7c5d3cab40a31118a128c1e6 -yid application_1551327455160_0002      # 6b109a9f7c5d3cab40a31118a128c1e6 is the task id
- flink run -m yarn-cluster  -yid application_1551327455160_0002  pp_flink_convoy-1.1.jar

When the app trying to commit offset to kafka, it always got warning:

2019-01-15 11:18:55,405 WARN  org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity.

The code was simplified be remove business

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStateBackend(new FsStateBackend("s3://pp-andy-test/checkpoint"))
    env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    val properties = new Properties()
    properties.setProperty("group.id", "my_groupid")

    val consumer = new FlinkKafkaConsumer011[ObjectNode]("my_topic",
      new JSONKeyValueDeserializationSchema(true),
      properties).setStartFromGroupOffsets().setCommitOffsetsOnCheckpoints(true)
    val stream = env.addSource(consumer)
    
    stream.map(new MapFunction[ObjectNode, Either[(Exception, ObjectNode), (Int, ujson.Value)]] {

      override def map(node:ObjectNode): scala.Either[(Exception, ObjectNode), (Int, ujson.Value)] = {
          logger.info("################## %s".format(node.get("metadata").toString))
          Thread.sleep(3000)
          return Right(200, writeJs(node.toString))
      }
    }).print()
    env.execute("pp_convoy_flink")
  }

Log when reading kafka msg and finished checked point
==> application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log <==
 2019-02-28 04:46:22,330 INFO activity - ##################
{"offset":1037252,"topic":"my_topic","partition":11}
==> application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.out <==
 Right((200,"MESSAGEEEEEE DETAIL"))
==> application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log <==
 2019-02-28 04:46:25,335 INFO activity - ##################
{"offset":1037253,"topic":"my_topic","partition":11}
2019-02-28 04:46:25,394 INFO com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close closed:false s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-33/70597bc7-728b-4c86-9488-2439f562fc98
==> application_1551327455160_0002/container_1551327455160_0002_01_000001/jobmanager.log <==
 2019-02-28 04:46:25,443 INFO com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close closed:false s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-33/_metadata
 2019-02-28 04:46:25,549 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 33 for job 40dd9e3dba228623226fcf3bda0d1c0a (8091 bytes in 5397 ms).
 2019-02-28 04:46:26,132 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 34 @ 1551329186078 for job 40dd9e3dba228623226fcf3bda0d1c0a.
Additional log if it might help:

Right after deploy the app:

==> container_1551327455160_0002_01_000001/jobmanager.log <==
2019-02-28 04:42:34,492 INFO org.apache.flink.yarn.YarnResourceManager - Registering TaskManager with ResourceID container_1551327455160_0002_01_000002 (akka.tcp://[hidden email]:44931/user/taskmanager_0) at ResourceManager
2019-02-28 04:42:34,693 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) (850716ec4421c9e70852a0eba5975f01) switched from SCHEDULED to DEPLOYING.
2019-02-28 04:42:34,694 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) (attempt #0) to ip-10-16-1-215
2019-02-28 04:42:35,247 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) (850716ec4421c9e70852a0eba5975f01) switched from DEPLOYING to RUNNING.
2019-02-28 04:42:35,592 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 @ 1551328955432 for job 40dd9e3dba228623226fcf3bda0d1c0a.
2019-02-28 04:42:35,662 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 1 by task 850716ec4421c9e70852a0eba5975f01 of job 40dd9e3dba228623226fcf3bda0d1c0a.
2019-02-28 04:42:35,663 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 1 of job 40dd9e3dba228623226fcf3bda0d1c0a.
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: Task Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) was not running
at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-02-28 04:42:41,484 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 @ 1551328961432 for job 40dd9e3dba228623226fcf3bda0d1c0a.
2019-02-28 04:42:46,555 INFO com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close closed:false s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-2/_metadata
2019-02-28 04:42:46,588 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 for job 40dd9e3dba228623226fcf3bda0d1c0a (8091 bytes in 5153 ms).