Exactly-once semantics in Flink Kafka Producer

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Exactly-once semantics in Flink Kafka Producer

Vasily Melnik

I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source and Sink:

  1. Run flink app, simply transferring messages from one topic to another with parallelism=1, checkpoint interval 20 seconds
  2. Generate messages with incrementing integer numbers using Python script each 2 seconds.
  3. Read output topic with console consumer in read_committed isolation level.
  4. Manually kill TaskManager

I expect to see monotonically increasing integers in output topic regardless TaskManager killing and recovery.

But actually a see something unexpected in console-consumer output:

32
33
34
35
36
37
38
39
40
-- TaskManager Killed
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45

Looks like all messages between checkpoints where replayed in output topic. Also i expected to see results in output topic only after checkpointing i.e. each 20 seconds, but messages appeared in output immediately as they where send to input.
Is it supposed to be correct behaviour or i do something wrong?

Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional producer and read-committed console comsumer with custom code and it worked perfectly well reading messages only after commitTransaction on producer.  

My Flink code:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);
        env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data"));

        Properties producerProperty = new Properties();
        producerProperty.setProperty("bootstrap.servers", ...);
        producerProperty.setProperty("zookeeper.connect", ...);
        producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000");
        producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
        producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

        Properties consumerProperty = new Properties();
        consumerProperty.setProperty("bootstrap.servers", ...);
        consumerProperty.setProperty("zookeeper.connect", ...);
        consumerProperty.setProperty("group.id", "test2");

        FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<String>("stringTopic1", new ComplexStringSchema(), consumerProperty);
        consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());

        FlinkKafkaProducer<String> producer1 = new FlinkKafkaProducer<String>("test",  new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), producerProperty, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        producer1.ignoreFailuresAfterTransactionTimeout();
        DataStreamSource<String> s1 = env.addSource(consumer1);
        s1.addSink(producer1);
        env.execute("Test");

Reply | Threaded
Open this post in threaded view
|

Re: Exactly-once semantics in Flink Kafka Producer

Eduardo Winpenny Tejedor
Hi Vasily,

You're probably executing this from your IDE or from a local Flink cluster without starting your job from a checkpoint.

When you start your Flink job for the second time you need to specify the path to the latest checkpoint as an argument, otherwise Flink will start from scratch.

You're probably thinking that's not great, ideally Flink should be able to automatically continue from the last produced checkpoint, and actually that's what the docs say! Well, that's only when you're running in a proper cluster environment. Flink is able to recover using checkpoints when only part of the cluster fails, not when the whole job is stopped. For full stops you need to specify the checkpoint manually.

Hope that helps!


On Fri, 2 Aug 2019, 10:05 Vasily Melnik, <[hidden email]> wrote:

I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source and Sink:

  1. Run flink app, simply transferring messages from one topic to another with parallelism=1, checkpoint interval 20 seconds
  2. Generate messages with incrementing integer numbers using Python script each 2 seconds.
  3. Read output topic with console consumer in read_committed isolation level.
  4. Manually kill TaskManager

I expect to see monotonically increasing integers in output topic regardless TaskManager killing and recovery.

But actually a see something unexpected in console-consumer output:

32
33
34
35
36
37
38
39
40
-- TaskManager Killed
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45

Looks like all messages between checkpoints where replayed in output topic. Also i expected to see results in output topic only after checkpointing i.e. each 20 seconds, but messages appeared in output immediately as they where send to input.
Is it supposed to be correct behaviour or i do something wrong?

Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional producer and read-committed console comsumer with custom code and it worked perfectly well reading messages only after commitTransaction on producer.  

My Flink code:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);
        env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data"));

        Properties producerProperty = new Properties();
        producerProperty.setProperty("bootstrap.servers", ...);
        producerProperty.setProperty("zookeeper.connect", ...);
        producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000");
        producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
        producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

        Properties consumerProperty = new Properties();
        consumerProperty.setProperty("bootstrap.servers", ...);
        consumerProperty.setProperty("zookeeper.connect", ...);
        consumerProperty.setProperty("group.id", "test2");

        FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<String>("stringTopic1", new ComplexStringSchema(), consumerProperty);
        consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());

        FlinkKafkaProducer<String> producer1 = new FlinkKafkaProducer<String>("test",  new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), producerProperty, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        producer1.ignoreFailuresAfterTransactionTimeout();
        DataStreamSource<String> s1 = env.addSource(consumer1);
        s1.addSink(producer1);
        env.execute("Test");

Reply | Threaded
Open this post in threaded view
|

Re: Exactly-once semantics in Flink Kafka Producer

Vasily Melnik
Hi, Eduardo. 
Maybe i should describe experiment design  precisely : 
1) I run Flink on YARN (YARN Session method). 
2) I do not stop/cancell application, i just kill TaskManager process
3) After that YARN creates another TaskManager Process and auto checkpoint restore from HDFS happens.

That's why i expect to see correct restore. 

С уважением,
Василий Мельник

GlowByte Consulting

===================

Моб. тел.: +7 (903) 101-43-71
[hidden email]



On Fri, 2 Aug 2019 at 13:04, Eduardo Winpenny Tejedor <[hidden email]> wrote:
Hi Vasily,

You're probably executing this from your IDE or from a local Flink cluster without starting your job from a checkpoint.

When you start your Flink job for the second time you need to specify the path to the latest checkpoint as an argument, otherwise Flink will start from scratch.

You're probably thinking that's not great, ideally Flink should be able to automatically continue from the last produced checkpoint, and actually that's what the docs say! Well, that's only when you're running in a proper cluster environment. Flink is able to recover using checkpoints when only part of the cluster fails, not when the whole job is stopped. For full stops you need to specify the checkpoint manually.

Hope that helps!


On Fri, 2 Aug 2019, 10:05 Vasily Melnik, <[hidden email]> wrote:

I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source and Sink:

  1. Run flink app, simply transferring messages from one topic to another with parallelism=1, checkpoint interval 20 seconds
  2. Generate messages with incrementing integer numbers using Python script each 2 seconds.
  3. Read output topic with console consumer in read_committed isolation level.
  4. Manually kill TaskManager

I expect to see monotonically increasing integers in output topic regardless TaskManager killing and recovery.

But actually a see something unexpected in console-consumer output:

32
33
34
35
36
37
38
39
40
-- TaskManager Killed
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45

Looks like all messages between checkpoints where replayed in output topic. Also i expected to see results in output topic only after checkpointing i.e. each 20 seconds, but messages appeared in output immediately as they where send to input.
Is it supposed to be correct behaviour or i do something wrong?

Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional producer and read-committed console comsumer with custom code and it worked perfectly well reading messages only after commitTransaction on producer.  

My Flink code:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);
        env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data"));

        Properties producerProperty = new Properties();
        producerProperty.setProperty("bootstrap.servers", ...);
        producerProperty.setProperty("zookeeper.connect", ...);
        producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000");
        producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
        producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

        Properties consumerProperty = new Properties();
        consumerProperty.setProperty("bootstrap.servers", ...);
        consumerProperty.setProperty("zookeeper.connect", ...);
        consumerProperty.setProperty("group.id", "test2");

        FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<String>("stringTopic1", new ComplexStringSchema(), consumerProperty);
        consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());

        FlinkKafkaProducer<String> producer1 = new FlinkKafkaProducer<String>("test",  new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), producerProperty, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        producer1.ignoreFailuresAfterTransactionTimeout();
        DataStreamSource<String> s1 = env.addSource(consumer1);
        s1.addSink(producer1);
        env.execute("Test");

Reply | Threaded
Open this post in threaded view
|

Re: Exactly-once semantics in Flink Kafka Producer

Maxim Parkachov
Hi Vasily,

as far as I know, by default console-consumer reads uncommited.
Try setting  isolation.level to read_committed in console-consumer properties.

Hope this helps,
Maxim.

On Fri, Aug 2, 2019 at 12:42 PM Vasily Melnik <[hidden email]> wrote:
Hi, Eduardo. 
Maybe i should describe experiment design  precisely : 
1) I run Flink on YARN (YARN Session method). 
2) I do not stop/cancell application, i just kill TaskManager process
3) After that YARN creates another TaskManager Process and auto checkpoint restore from HDFS happens.

That's why i expect to see correct restore. 

С уважением,
Василий Мельник

GlowByte Consulting

===================

Моб. тел.: +7 (903) 101-43-71
[hidden email]



On Fri, 2 Aug 2019 at 13:04, Eduardo Winpenny Tejedor <[hidden email]> wrote:
Hi Vasily,

You're probably executing this from your IDE or from a local Flink cluster without starting your job from a checkpoint.

When you start your Flink job for the second time you need to specify the path to the latest checkpoint as an argument, otherwise Flink will start from scratch.

You're probably thinking that's not great, ideally Flink should be able to automatically continue from the last produced checkpoint, and actually that's what the docs say! Well, that's only when you're running in a proper cluster environment. Flink is able to recover using checkpoints when only part of the cluster fails, not when the whole job is stopped. For full stops you need to specify the checkpoint manually.

Hope that helps!


On Fri, 2 Aug 2019, 10:05 Vasily Melnik, <[hidden email]> wrote:

I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source and Sink:

  1. Run flink app, simply transferring messages from one topic to another with parallelism=1, checkpoint interval 20 seconds
  2. Generate messages with incrementing integer numbers using Python script each 2 seconds.
  3. Read output topic with console consumer in read_committed isolation level.
  4. Manually kill TaskManager

I expect to see monotonically increasing integers in output topic regardless TaskManager killing and recovery.

But actually a see something unexpected in console-consumer output:

32
33
34
35
36
37
38
39
40
-- TaskManager Killed
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45

Looks like all messages between checkpoints where replayed in output topic. Also i expected to see results in output topic only after checkpointing i.e. each 20 seconds, but messages appeared in output immediately as they where send to input.
Is it supposed to be correct behaviour or i do something wrong?

Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional producer and read-committed console comsumer with custom code and it worked perfectly well reading messages only after commitTransaction on producer.  

My Flink code:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);
        env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data"));

        Properties producerProperty = new Properties();
        producerProperty.setProperty("bootstrap.servers", ...);
        producerProperty.setProperty("zookeeper.connect", ...);
        producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000");
        producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
        producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

        Properties consumerProperty = new Properties();
        consumerProperty.setProperty("bootstrap.servers", ...);
        consumerProperty.setProperty("zookeeper.connect", ...);
        consumerProperty.setProperty("group.id", "test2");

        FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<String>("stringTopic1", new ComplexStringSchema(), consumerProperty);
        consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());

        FlinkKafkaProducer<String> producer1 = new FlinkKafkaProducer<String>("test",  new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), producerProperty, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        producer1.ignoreFailuresAfterTransactionTimeout();
        DataStreamSource<String> s1 = env.addSource(consumer1);
        s1.addSink(producer1);
        env.execute("Test");

Reply | Threaded
Open this post in threaded view
|

Re: Exactly-once semantics in Flink Kafka Producer

Vasily Melnik
Hi, Maxim. 
My console-consumer command:
    kafka-console-consumer --zookeeper ... --topic test --from-beginning --isolation-level read_committed
It works perfectly well with manually written kafka producer - it reads data only after commitTransaction. 

On Fri, 2 Aug 2019 at 14:19, Maxim Parkachov <[hidden email]> wrote:
Hi Vasily,

as far as I know, by default console-consumer reads uncommited.
Try setting  isolation.level to read_committed in console-consumer properties.

Hope this helps,
Maxim.

On Fri, Aug 2, 2019 at 12:42 PM Vasily Melnik <[hidden email]> wrote:
Hi, Eduardo. 
Maybe i should describe experiment design  precisely : 
1) I run Flink on YARN (YARN Session method). 
2) I do not stop/cancell application, i just kill TaskManager process
3) After that YARN creates another TaskManager Process and auto checkpoint restore from HDFS happens.

That's why i expect to see correct restore. 

С уважением,
Василий Мельник

GlowByte Consulting

===================

Моб. тел.: +7 (903) 101-43-71
[hidden email]



On Fri, 2 Aug 2019 at 13:04, Eduardo Winpenny Tejedor <[hidden email]> wrote:
Hi Vasily,

You're probably executing this from your IDE or from a local Flink cluster without starting your job from a checkpoint.

When you start your Flink job for the second time you need to specify the path to the latest checkpoint as an argument, otherwise Flink will start from scratch.

You're probably thinking that's not great, ideally Flink should be able to automatically continue from the last produced checkpoint, and actually that's what the docs say! Well, that's only when you're running in a proper cluster environment. Flink is able to recover using checkpoints when only part of the cluster fails, not when the whole job is stopped. For full stops you need to specify the checkpoint manually.

Hope that helps!


On Fri, 2 Aug 2019, 10:05 Vasily Melnik, <[hidden email]> wrote:

I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source and Sink:

  1. Run flink app, simply transferring messages from one topic to another with parallelism=1, checkpoint interval 20 seconds
  2. Generate messages with incrementing integer numbers using Python script each 2 seconds.
  3. Read output topic with console consumer in read_committed isolation level.
  4. Manually kill TaskManager

I expect to see monotonically increasing integers in output topic regardless TaskManager killing and recovery.

But actually a see something unexpected in console-consumer output:

32
33
34
35
36
37
38
39
40
-- TaskManager Killed
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45

Looks like all messages between checkpoints where replayed in output topic. Also i expected to see results in output topic only after checkpointing i.e. each 20 seconds, but messages appeared in output immediately as they where send to input.
Is it supposed to be correct behaviour or i do something wrong?

Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional producer and read-committed console comsumer with custom code and it worked perfectly well reading messages only after commitTransaction on producer.  

My Flink code:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);
        env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data"));

        Properties producerProperty = new Properties();
        producerProperty.setProperty("bootstrap.servers", ...);
        producerProperty.setProperty("zookeeper.connect", ...);
        producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000");
        producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
        producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

        Properties consumerProperty = new Properties();
        consumerProperty.setProperty("bootstrap.servers", ...);
        consumerProperty.setProperty("zookeeper.connect", ...);
        consumerProperty.setProperty("group.id", "test2");

        FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<String>("stringTopic1", new ComplexStringSchema(), consumerProperty);
        consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());

        FlinkKafkaProducer<String> producer1 = new FlinkKafkaProducer<String>("test",  new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), producerProperty, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        producer1.ignoreFailuresAfterTransactionTimeout();
        DataStreamSource<String> s1 = env.addSource(consumer1);
        s1.addSink(producer1);
        env.execute("Test");

Reply | Threaded
Open this post in threaded view
|

Re: Exactly-once semantics in Flink Kafka Producer

moe_hoss
In reply to this post by Vasily Melnik

Hi Vasily,

I haven't tested the stare recovery under YARN setup. But in case of stand-alone Flink cluster setup, I needed to run the application with proper open-checkpoint recovery directory (whose name stars with 'chk-') passed as -s parameter value. This was the only way I could recover my application state from an ungraceful shutdown.

Hope that it helps, and would be glad if some one could suggest a better solution.

BR, Moe
On 02/08/2019 12:41, Vasily Melnik wrote:
Hi, Eduardo. 
Maybe i should describe experiment design  precisely : 
1) I run Flink on YARN (YARN Session method). 
2) I do not stop/cancell application, i just kill TaskManager process
3) After that YARN creates another TaskManager Process and auto checkpoint restore from HDFS happens.

That's why i expect to see correct restore. 

С уважением,
Василий Мельник

GlowByte Consulting

===================

Моб. тел.: +7 (903) 101-43-71
[hidden email]



On Fri, 2 Aug 2019 at 13:04, Eduardo Winpenny Tejedor <[hidden email]> wrote:
Hi Vasily,

You're probably executing this from your IDE or from a local Flink cluster without starting your job from a checkpoint.

When you start your Flink job for the second time you need to specify the path to the latest checkpoint as an argument, otherwise Flink will start from scratch.

You're probably thinking that's not great, ideally Flink should be able to automatically continue from the last produced checkpoint, and actually that's what the docs say! Well, that's only when you're running in a proper cluster environment. Flink is able to recover using checkpoints when only part of the cluster fails, not when the whole job is stopped. For full stops you need to specify the checkpoint manually.

Hope that helps!


On Fri, 2 Aug 2019, 10:05 Vasily Melnik, <[hidden email]> wrote:

I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source and Sink:

  1. Run flink app, simply transferring messages from one topic to another with parallelism=1, checkpoint interval 20 seconds
  2. Generate messages with incrementing integer numbers using Python script each 2 seconds.
  3. Read output topic with console consumer in read_committed isolation level.
  4. Manually kill TaskManager

I expect to see monotonically increasing integers in output topic regardless TaskManager killing and recovery.

But actually a see something unexpected in console-consumer output:

32
33
34
35
36
37
38
39
40
-- TaskManager Killed
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45

Looks like all messages between checkpoints where replayed in output topic. Also i expected to see results in output topic only after checkpointing i.e. each 20 seconds, but messages appeared in output immediately as they where send to input.
Is it supposed to be correct behaviour or i do something wrong?

Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional producer and read-committed console comsumer with custom code and it worked perfectly well reading messages only after commitTransaction on producer.  

My Flink code:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);
        env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data"));

        Properties producerProperty = new Properties();
        producerProperty.setProperty("bootstrap.servers", ...);
        producerProperty.setProperty("zookeeper.connect", ...);
        producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000");
        producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
        producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

        Properties consumerProperty = new Properties();
        consumerProperty.setProperty("bootstrap.servers", ...);
        consumerProperty.setProperty("zookeeper.connect", ...);
        consumerProperty.setProperty("group.id", "test2");

        FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<String>("stringTopic1", new ComplexStringSchema(), consumerProperty);
        consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());

        FlinkKafkaProducer<String> producer1 = new FlinkKafkaProducer<String>("test",  new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), producerProperty, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        producer1.ignoreFailuresAfterTransactionTimeout();
        DataStreamSource<String> s1 = env.addSource(consumer1);
        s1.addSink(producer1);
        env.execute("Test");

--

Mohammad Hosseinian
Software Developer
Information Design One AG


Phone +49-69-244502-0
Fax +49-69-244502-10
Web www.id1.de



Information Design One AG, Baseler Strasse 10, 60329 Frankfurt am Main, Germany
Registration: Amtsgericht Frankfurt am Main, HRB 52596
Executive Board: Robert Peters, Benjamin Walther, Supervisory Board: Christian Hecht