Problem when use kafka sink with EXACTLY_ONCE in IDEA

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Problem when use kafka sink with EXACTLY_ONCE in IDEA

Kaibo Zhou
Hi,
I encountered an error while running the kafka sink demo in IDEA.

This is the complete code:

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper

object kafka_test {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStateBackend(new FsStateBackend("file:///tmp/checkpoint"))

    val config = env.getCheckpointConfig
    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    config.setCheckpointInterval(15 * 1000)

    val event = env.socketTextStream("localhost", 9999)
    val propsTarget = new Properties()
    propsTarget.setProperty("bootstrap.servers", "127.0.0.1:9092")
    propsTarget.setProperty("enable.idempotence", "true")

    val outputProducer = new FlinkKafkaProducer011[String](
      "test-output",
      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),
      propsTarget,
      FlinkKafkaProducer011.Semantic.EXACTLY_ONCE // ok when change to Semantic.AT_LEAST_ONCE
    )

    event.addSink(outputProducer).name("sink_to_kafka")
    env.execute()
  }
}

Start the command "nc -l 9999" before running the above code.
Error message:

7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version : 0.11.0.2
7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId : 73be1e1168f91ee2
7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011  - Starting FlinkKafkaProducer (1/1) to produce into default topic test-output
7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId Source: Socket Stream -> Sink: sink_to_kafka-7df19f87deec5680128845fd9a6ca18d-6] ProducerId set to -1 with epoch -1
7199 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO  org.apache.kafka.clients.producer.KafkaProducer  - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Socket Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a v0 FindCoordinator request because we require features supported only in 1 or later.
7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Source: Socket Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3).
7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Source: Socket Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3) [FAILED]
7201 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task and sending final execution state FAILED to JobManager for task Source: Socket Stream -> Sink: sink_to_kafka a7cea618f99152987bb4a52b4d1df0e3.
7201 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Socket Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a v0 FindCoordinator request because we require features supported only in 1 or later.
7201 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job Flink Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) switched from state RUNNING to FAILING.
org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a v0 FindCoordinator request because we require features supported only in 1 or later.
7202 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Discarding the results produced by task execution a7cea618f99152987bb4a52b4d1df0e3.
7202 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Try to restart or fail the job Flink Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) if no longer possible.
7202 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job Flink Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) switched from state FAILING to RESTARTING.

When change the code to: FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE,Everything is ok. 

This problem is easy to reproduce. Is it wrong to use kafka sink with exactly once, or is a bug?

Cheers




Reply | Threaded
Open this post in threaded view
|

Re: Problem when use kafka sink with EXACTLY_ONCE in IDEA

Till Rohrmann
Hi Kaibo,

which Kafka version are you running locally? When enabling exactly once processing guarantees, you need at least Kafka >= 0.11. The UnsupportedVersionException indicates that this constraint is not fulfilled [1].


Cheers,
Till

On Wed, Jan 2, 2019 at 5:02 AM Kaibo Zhou <[hidden email]> wrote:
Hi,
I encountered an error while running the kafka sink demo in IDEA.

This is the complete code:

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper

object kafka_test {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStateBackend(new FsStateBackend("file:///tmp/checkpoint"))

    val config = env.getCheckpointConfig
    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    config.setCheckpointInterval(15 * 1000)

    val event = env.socketTextStream("localhost", 9999)
    val propsTarget = new Properties()
    propsTarget.setProperty("bootstrap.servers", "127.0.0.1:9092")
    propsTarget.setProperty("enable.idempotence", "true")

    val outputProducer = new FlinkKafkaProducer011[String](
      "test-output",
      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),
      propsTarget,
      FlinkKafkaProducer011.Semantic.EXACTLY_ONCE // ok when change to Semantic.AT_LEAST_ONCE
    )

    event.addSink(outputProducer).name("sink_to_kafka")
    env.execute()
  }
}

Start the command "nc -l 9999" before running the above code.
Error message:

7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version : 0.11.0.2
7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId : 73be1e1168f91ee2
7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011  - Starting FlinkKafkaProducer (1/1) to produce into default topic test-output
7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId Source: Socket Stream -> Sink: sink_to_kafka-7df19f87deec5680128845fd9a6ca18d-6] ProducerId set to -1 with epoch -1
7199 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO  org.apache.kafka.clients.producer.KafkaProducer  - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Socket Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a v0 FindCoordinator request because we require features supported only in 1 or later.
7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Source: Socket Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3).
7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Source: Socket Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3) [FAILED]
7201 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task and sending final execution state FAILED to JobManager for task Source: Socket Stream -> Sink: sink_to_kafka a7cea618f99152987bb4a52b4d1df0e3.
7201 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Socket Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a v0 FindCoordinator request because we require features supported only in 1 or later.
7201 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job Flink Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) switched from state RUNNING to FAILING.
org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a v0 FindCoordinator request because we require features supported only in 1 or later.
7202 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Discarding the results produced by task execution a7cea618f99152987bb4a52b4d1df0e3.
7202 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Try to restart or fail the job Flink Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) if no longer possible.
7202 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job Flink Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) switched from state FAILING to RESTARTING.

When change the code to: FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE,Everything is ok. 

This problem is easy to reproduce. Is it wrong to use kafka sink with exactly once, or is a bug?

Cheers