How to achieve exactly once on node failure using Kafka

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to achieve exactly once on node failure using Kafka

Y. Sakamoto
Hi,
I'm using Flink 1.2.0 and try to do "exactly once" data transfer
from Kafka to Elasticsearch, but I cannot.
(Scala 2.11, Kafka 0.10, without YARN)

There are 2 Flink TaskManager nodes, and when processing
with 2 parallelism, shutdown one of them (simulating node failure).

Using flink-connector-kafka, I wrote following code:

    StreamExecutionEnvironment env = StreamExecutionEnvironment
          .getExecutionEnvironment();
    env.enableCheckpointing(1000L);
    env.setParallelism(2);

    Properties kafkaProp = new Properties();
    kafkaProp.setProperty("bootstrap.servers", "192.168.97.42:9092");
    kafkaProp.setProperty("zookeeper.connect", "192.168.97.42:2181");
    kafkaProp.setProperty("group.id", "id");

    DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(
          "topic", new SimpleStringSchema(), kafkaProp));

I found duplicated data transfer on map function.
Data from the checkpoint before node failure seems duplicated.

Is there any way to achieve "exactly once" on failure?


Thanks.
Yuichiro
Reply | Threaded
Open this post in threaded view
|

Re: How to achieve exactly once on node failure using Kafka

Stephan Ewen
Hi!

Exactly-once end-to-end requires sinks that support that kind of behavior (typically some form of transactions support).

Kafka currently does not have the mechanisms in place to support exactly-once sinks, but the Kafka project is working on that feature.
For ElasticSearch, it is also not simply possible (because of missing transactions), but you can use Flink's state as the "authorative" state (it is exactly once) and then write changes to Flink's state to Elastic. That way the writes to ElasticSearch become "idempotent", which means duplicates simple make no additional changes.

Hope that helps!

Stephan




On Mon, Feb 20, 2017 at 5:53 PM, Y. Sakamoto <[hidden email]> wrote:
Hi,
I'm using Flink 1.2.0 and try to do "exactly once" data transfer
from Kafka to Elasticsearch, but I cannot.
(Scala 2.11, Kafka 0.10, without YARN)

There are 2 Flink TaskManager nodes, and when processing
with 2 parallelism, shutdown one of them (simulating node failure).

Using flink-connector-kafka, I wrote following code:

   StreamExecutionEnvironment env = StreamExecutionEnvironment
         .getExecutionEnvironment();
   env.enableCheckpointing(1000L);
   env.setParallelism(2);

   Properties kafkaProp = new Properties();
   kafkaProp.setProperty("bootstrap.servers", "192.168.97.42:9092");
   kafkaProp.setProperty("zookeeper.connect", "192.168.97.42:2181");
   kafkaProp.setProperty("group.id", "id");

   DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(
         "topic", new SimpleStringSchema(), kafkaProp));

I found duplicated data transfer on map function.
Data from the checkpoint before node failure seems duplicated.

Is there any way to achieve "exactly once" on failure?


Thanks.
Yuichiro

Reply | Threaded
Open this post in threaded view
|

Re: How to achieve exactly once on node failure using Kafka

Y. Sakamoto
Thank you for your reply.

Under my understanding, Map / Filter Function operate with "at least once" when a failure occurs, and it is necessary to code that it will be saved (overwritten) in Elasticsearch with the same ID even if double data comes. Is it correct?
(sorry, I cannot understand how to "write changes to Flink's state to Elastic")

Regards,
Yuichiro


On 2017/02/21 3:56, Stephan Ewen wrote:

> Hi!
>
> Exactly-once end-to-end requires sinks that support that kind of behavior (typically some form of transactions support).
>
> Kafka currently does not have the mechanisms in place to support exactly-once sinks, but the Kafka project is working on that feature.
> For ElasticSearch, it is also not simply possible (because of missing transactions), but you can use Flink's state as the "authorative" state (it is exactly once) and then write changes to Flink's state to Elastic. That way the writes to ElasticSearch
> become "idempotent", which means duplicates simple make no additional changes.
>
> Hope that helps!
>
> Stephan
>
>
>
>
> On Mon, Feb 20, 2017 at 5:53 PM, Y. Sakamoto <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Hi,
>     I'm using Flink 1.2.0 and try to do "exactly once" data transfer
>     from Kafka to Elasticsearch, but I cannot.
>     (Scala 2.11, Kafka 0.10, without YARN)
>
>     There are 2 Flink TaskManager nodes, and when processing
>     with 2 parallelism, shutdown one of them (simulating node failure).
>
>     Using flink-connector-kafka, I wrote following code:
>
>        StreamExecutionEnvironment env = StreamExecutionEnvironment
>              .getExecutionEnvironment();
>        env.enableCheckpointing(1000L);
>        env.setParallelism(2);
>
>        Properties kafkaProp = new Properties();
>        kafkaProp.setProperty("bootstrap.servers", "192.168.97.42:9092 <http://192.168.97.42:9092>");
>        kafkaProp.setProperty("zookeeper.connect", "192.168.97.42:2181 <http://192.168.97.42:2181>");
>        kafkaProp.setProperty("group.id <http://group.id>", "id");
>
>        DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(
>              "topic", new SimpleStringSchema(), kafkaProp));
>
>     I found duplicated data transfer on map function.
>     Data from the checkpoint before node failure seems duplicated.
>
>     Is there any way to achieve "exactly once" on failure?
>
>
>     Thanks.
>     Yuichiro
>
>


--
☆ ─────────────── ─ ─ - -
   Yuichiro SAKAMOTO
     [hidden email]
     [hidden email]
     http://phonypianist.sakura.ne.jp

Reply | Threaded
Open this post in threaded view
|

Re: How to achieve exactly once on node failure using Kafka

rmetzger0
Hi,

exactly. You have to make sure that you can write data for the same ID multiple times.
Exactly once in Flink is only guaranteed for registered state. So if you have a flatMap() with a "counter" variable, that is held in a "ValueState", this counter will always be in sync with the number of elements in the kafka topic (because the counter is reset on a failure).

On Tue, Feb 21, 2017 at 4:04 PM, Y. Sakamoto <[hidden email]> wrote:
Thank you for your reply.

Under my understanding, Map / Filter Function operate with "at least once" when a failure occurs, and it is necessary to code that it will be saved (overwritten) in Elasticsearch with the same ID even if double data comes. Is it correct?
(sorry, I cannot understand how to "write changes to Flink's state to Elastic")

Regards,
Yuichiro


On 2017/02/21 3:56, Stephan Ewen wrote:
Hi!

Exactly-once end-to-end requires sinks that support that kind of behavior (typically some form of transactions support).

Kafka currently does not have the mechanisms in place to support exactly-once sinks, but the Kafka project is working on that feature.
For ElasticSearch, it is also not simply possible (because of missing transactions), but you can use Flink's state as the "authorative" state (it is exactly once) and then write changes to Flink's state to Elastic. That way the writes to ElasticSearch become "idempotent", which means duplicates simple make no additional changes.

Hope that helps!

Stephan




On Mon, Feb 20, 2017 at 5:53 PM, Y. Sakamoto <[hidden email] <mailto:[hidden email]>> wrote:

    Hi,
    I'm using Flink 1.2.0 and try to do "exactly once" data transfer
    from Kafka to Elasticsearch, but I cannot.
    (Scala 2.11, Kafka 0.10, without YARN)

    There are 2 Flink TaskManager nodes, and when processing
    with 2 parallelism, shutdown one of them (simulating node failure).

    Using flink-connector-kafka, I wrote following code:

       StreamExecutionEnvironment env = StreamExecutionEnvironment
             .getExecutionEnvironment();
       env.enableCheckpointing(1000L);
       env.setParallelism(2);

       Properties kafkaProp = new Properties();
       kafkaProp.setProperty("bootstrap.servers", "192.168.97.42:9092 <http://192.168.97.42:9092>");
       kafkaProp.setProperty("zookeeper.connect", "192.168.97.42:2181 <http://192.168.97.42:2181>");
       kafkaProp.setProperty("group.id <http://group.id>", "id");

       DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(
             "topic", new SimpleStringSchema(), kafkaProp));

    I found duplicated data transfer on map function.
    Data from the checkpoint before node failure seems duplicated.

    Is there any way to achieve "exactly once" on failure?


    Thanks.
    Yuichiro




--
☆ ─────────────── ─ ─ - -
   Yuichiro SAKAMOTO
     [hidden email]
     [hidden email]
     http://phonypianist.sakura.ne.jp


Reply | Threaded
Open this post in threaded view
|

Re: How to achieve exactly once on node failure using Kafka

Y. Sakamoto
Hi Robert,

It became clear to me.
Thanks!

Regards,
Yuichiro


On 2017/02/24 1:08, Robert Metzger wrote:

> Hi,
>
> exactly. You have to make sure that you can write data for the same ID multiple times.
> Exactly once in Flink is only guaranteed for registered state. So if you have a flatMap() with a "counter" variable, that is held in a "ValueState", this counter will always be in sync with the number of elements in the kafka topic (because the counter
> is reset on a failure).
>
> On Tue, Feb 21, 2017 at 4:04 PM, Y. Sakamoto <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Thank you for your reply.
>
>     Under my understanding, Map / Filter Function operate with "at least once" when a failure occurs, and it is necessary to code that it will be saved (overwritten) in Elasticsearch with the same ID even if double data comes. Is it correct?
>     (sorry, I cannot understand how to "write changes to Flink's state to Elastic")
>
>     Regards,
>     Yuichiro
>
>
>     On 2017/02/21 3:56, Stephan Ewen wrote:
>
>         Hi!
>
>         Exactly-once end-to-end requires sinks that support that kind of behavior (typically some form of transactions support).
>
>         Kafka currently does not have the mechanisms in place to support exactly-once sinks, but the Kafka project is working on that feature.
>         For ElasticSearch, it is also not simply possible (because of missing transactions), but you can use Flink's state as the "authorative" state (it is exactly once) and then write changes to Flink's state to Elastic. That way the writes to
>         ElasticSearch become "idempotent", which means duplicates simple make no additional changes.
>
>         Hope that helps!
>
>         Stephan
>
>
>
>
>         On Mon, Feb 20, 2017 at 5:53 PM, Y. Sakamoto <[hidden email] <mailto:[hidden email]> <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>
>             Hi,
>             I'm using Flink 1.2.0 and try to do "exactly once" data transfer
>             from Kafka to Elasticsearch, but I cannot.
>             (Scala 2.11, Kafka 0.10, without YARN)
>
>             There are 2 Flink TaskManager nodes, and when processing
>             with 2 parallelism, shutdown one of them (simulating node failure).
>
>             Using flink-connector-kafka, I wrote following code:
>
>                StreamExecutionEnvironment env = StreamExecutionEnvironment
>                      .getExecutionEnvironment();
>                env.enableCheckpointing(1000L);
>                env.setParallelism(2);
>
>                Properties kafkaProp = new Properties();
>                kafkaProp.setProperty("bootstrap.servers", "192.168.97.42:9092 <http://192.168.97.42:9092> <http://192.168.97.42:9092>");
>                kafkaProp.setProperty("zookeeper.connect", "192.168.97.42:2181 <http://192.168.97.42:2181> <http://192.168.97.42:2181>");
>                kafkaProp.setProperty("group.id <http://group.id> <http://group.id>", "id");
>
>                DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(
>                      "topic", new SimpleStringSchema(), kafkaProp));
>
>             I found duplicated data transfer on map function.
>             Data from the checkpoint before node failure seems duplicated.
>
>             Is there any way to achieve "exactly once" on failure?
>
>
>             Thanks.
>             Yuichiro
>
>
>
>
>     --
>     ☆ ─────────────── ─ ─ - -
>        Yuichiro SAKAMOTO
>          [hidden email] <mailto:[hidden email]>
>          [hidden email] <mailto:[hidden email]>
>          http://phonypianist.sakura.ne.jp <http://phonypianist.sakura.ne.jp>
>
>