Duplicates in Flink

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Duplicates in Flink

Rico Bergmann
Hi!

I still have an issue... I was now using 0.9.1 and the new
KafkaConnector. But I still get duplicates in my flink prog. Here's the
relevant part:

         final FlinkKafkaConsumer082<String> kafkaSrc = new
FlinkKafkaConsumer082<String>(
             kafkaTopicIn, new SimpleStringSchema(), properties);

         DataStream<String> start = env.addSource(kafkaSrc)
             .setParallelism(numReadPartitions); //numReadPartitions = 2

         DataStream<JSONObject> jsonized = start
             .flatMap(new ExtractAndFilterJSON());

         DataStream<Session> sessions = jsonized
             .partitionByHash(new KeySelector<JSONObject, String>() {
             /**
              * partition by session id
              */
             @Override
             public String getKey(JSONObject value)
                 throws Exception {
                 try {
                 return /*session id*/;
                 } catch (Exception e) {
                 LOG.error("no session could be retrieved", e);
                 }
                 return "";
             }
             }).flatMap(new StatefulSearchSessionizer());

In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
sure that the kafka topic I'm reading from does not contain any
duplicates. So it must be in the flink program ...

Any ideas?

Cheers, Rico.

Reply | Threaded
Open this post in threaded view
|

Re: Duplicates in Flink

Aljoscha Krettek
Hi Rico,
unfortunately the 0.9 branch still seems to have problems with exactly once processing and checkpointed operators. We reworked how the checkpoints are handled for the 0.10 release so it should work well there. 

Could you maybe try running on the 0.10-SNAPSHOT release and see if the problems persist there?

Cheers,
Aljoscha

On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann <[hidden email]> wrote:
Hi!

I still have an issue... I was now using 0.9.1 and the new
KafkaConnector. But I still get duplicates in my flink prog. Here's the
relevant part:

         final FlinkKafkaConsumer082<String> kafkaSrc = new
FlinkKafkaConsumer082<String>(
             kafkaTopicIn, new SimpleStringSchema(), properties);

         DataStream<String> start = env.addSource(kafkaSrc)
             .setParallelism(numReadPartitions); //numReadPartitions = 2

         DataStream<JSONObject> jsonized = start
             .flatMap(new ExtractAndFilterJSON());

         DataStream<Session> sessions = jsonized
             .partitionByHash(new KeySelector<JSONObject, String>() {
             /**
              * partition by session id
              */
             @Override
             public String getKey(JSONObject value)
                 throws Exception {
                 try {
                 return /*session id*/;
                 } catch (Exception e) {
                 LOG.error("no session could be retrieved", e);
                 }
                 return "";
             }
             }).flatMap(new StatefulSearchSessionizer());

In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
sure that the kafka topic I'm reading from does not contain any
duplicates. So it must be in the flink program ...

Any ideas?

Cheers, Rico.

Reply | Threaded
Open this post in threaded view
|

Re: Duplicates in Flink

Rico Bergmann
Hi!

Testing it with the current 0.10 snapshot is not easily possible atm

But I deactivated checkpointing in my program and still get duplicates in my output. So it seems not only to come from the checkpointing feature, or?

May be the KafkaSink is responsible for this? (Just my guess)

Cheers Rico. 



Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <[hidden email]>:

Hi Rico,
unfortunately the 0.9 branch still seems to have problems with exactly once processing and checkpointed operators. We reworked how the checkpoints are handled for the 0.10 release so it should work well there. 

Could you maybe try running on the 0.10-SNAPSHOT release and see if the problems persist there?

Cheers,
Aljoscha

On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann <[hidden email]> wrote:
Hi!

I still have an issue... I was now using 0.9.1 and the new
KafkaConnector. But I still get duplicates in my flink prog. Here's the
relevant part:

         final FlinkKafkaConsumer082<String> kafkaSrc = new
FlinkKafkaConsumer082<String>(
             kafkaTopicIn, new SimpleStringSchema(), properties);

         DataStream<String> start = env.addSource(kafkaSrc)
             .setParallelism(numReadPartitions); //numReadPartitions = 2

         DataStream<JSONObject> jsonized = start
             .flatMap(new ExtractAndFilterJSON());

         DataStream<Session> sessions = jsonized
             .partitionByHash(new KeySelector<JSONObject, String>() {
             /**
              * partition by session id
              */
             @Override
             public String getKey(JSONObject value)
                 throws Exception {
                 try {
                 return /*session id*/;
                 } catch (Exception e) {
                 LOG.error("no session could be retrieved", e);
                 }
                 return "";
             }
             }).flatMap(new StatefulSearchSessionizer());

In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
sure that the kafka topic I'm reading from does not contain any
duplicates. So it must be in the flink program ...

Any ideas?

Cheers, Rico.

Reply | Threaded
Open this post in threaded view
|

Re: Duplicates in Flink

Stephan Ewen
Do you mean the KafkaSource?

Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the KafkaSource?

On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann <[hidden email]> wrote:
Hi!

Testing it with the current 0.10 snapshot is not easily possible atm

But I deactivated checkpointing in my program and still get duplicates in my output. So it seems not only to come from the checkpointing feature, or?

May be the KafkaSink is responsible for this? (Just my guess)

Cheers Rico. 



Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <[hidden email]>:

Hi Rico,
unfortunately the 0.9 branch still seems to have problems with exactly once processing and checkpointed operators. We reworked how the checkpoints are handled for the 0.10 release so it should work well there. 

Could you maybe try running on the 0.10-SNAPSHOT release and see if the problems persist there?

Cheers,
Aljoscha

On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann <[hidden email]> wrote:
Hi!

I still have an issue... I was now using 0.9.1 and the new
KafkaConnector. But I still get duplicates in my flink prog. Here's the
relevant part:

         final FlinkKafkaConsumer082<String> kafkaSrc = new
FlinkKafkaConsumer082<String>(
             kafkaTopicIn, new SimpleStringSchema(), properties);

         DataStream<String> start = env.addSource(kafkaSrc)
             .setParallelism(numReadPartitions); //numReadPartitions = 2

         DataStream<JSONObject> jsonized = start
             .flatMap(new ExtractAndFilterJSON());

         DataStream<Session> sessions = jsonized
             .partitionByHash(new KeySelector<JSONObject, String>() {
             /**
              * partition by session id
              */
             @Override
             public String getKey(JSONObject value)
                 throws Exception {
                 try {
                 return /*session id*/;
                 } catch (Exception e) {
                 LOG.error("no session could be retrieved", e);
                 }
                 return "";
             }
             }).flatMap(new StatefulSearchSessionizer());

In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
sure that the kafka topic I'm reading from does not contain any
duplicates. So it must be in the flink program ...

Any ideas?

Cheers, Rico.


Reply | Threaded
Open this post in threaded view
|

Re: Duplicates in Flink

Rico Bergmann
No. I mean the KafkaSink. 

A bit more insight to my program: I read from a Kafka topic with flinkKafkaConsumer082, then hashpartition the data, then I do a deduplication (does not eliminate all duplicates though). Then some computation, afterwards again deduplication (group by message in a window of last 2 seconds). 

Of course the last deduplication is not perfect.

Cheers. Rico. 



Am 03.09.2015 um 13:29 schrieb Stephan Ewen <[hidden email]>:

Do you mean the KafkaSource?

Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the KafkaSource?

On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann <[hidden email]> wrote:
Hi!

Testing it with the current 0.10 snapshot is not easily possible atm

But I deactivated checkpointing in my program and still get duplicates in my output. So it seems not only to come from the checkpointing feature, or?

May be the KafkaSink is responsible for this? (Just my guess)

Cheers Rico. 



Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <[hidden email]>:

Hi Rico,
unfortunately the 0.9 branch still seems to have problems with exactly once processing and checkpointed operators. We reworked how the checkpoints are handled for the 0.10 release so it should work well there. 

Could you maybe try running on the 0.10-SNAPSHOT release and see if the problems persist there?

Cheers,
Aljoscha

On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann <[hidden email]> wrote:
Hi!

I still have an issue... I was now using 0.9.1 and the new
KafkaConnector. But I still get duplicates in my flink prog. Here's the
relevant part:

         final FlinkKafkaConsumer082<String> kafkaSrc = new
FlinkKafkaConsumer082<String>(
             kafkaTopicIn, new SimpleStringSchema(), properties);

         DataStream<String> start = env.addSource(kafkaSrc)
             .setParallelism(numReadPartitions); //numReadPartitions = 2

         DataStream<JSONObject> jsonized = start
             .flatMap(new ExtractAndFilterJSON());

         DataStream<Session> sessions = jsonized
             .partitionByHash(new KeySelector<JSONObject, String>() {
             /**
              * partition by session id
              */
             @Override
             public String getKey(JSONObject value)
                 throws Exception {
                 try {
                 return /*session id*/;
                 } catch (Exception e) {
                 LOG.error("no session could be retrieved", e);
                 }
                 return "";
             }
             }).flatMap(new StatefulSearchSessionizer());

In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
sure that the kafka topic I'm reading from does not contain any
duplicates. So it must be in the flink program ...

Any ideas?

Cheers, Rico.


Reply | Threaded
Open this post in threaded view
|

Re: Duplicates in Flink

Stephan Ewen
Can you tell us where the KafkaSink comes into play? At what point do the duplicates come up?

On Thu, Sep 3, 2015 at 2:09 PM, Rico Bergmann <[hidden email]> wrote:
No. I mean the KafkaSink. 

A bit more insight to my program: I read from a Kafka topic with flinkKafkaConsumer082, then hashpartition the data, then I do a deduplication (does not eliminate all duplicates though). Then some computation, afterwards again deduplication (group by message in a window of last 2 seconds). 

Of course the last deduplication is not perfect.

Cheers. Rico. 



Am 03.09.2015 um 13:29 schrieb Stephan Ewen <[hidden email]>:

Do you mean the KafkaSource?

Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the KafkaSource?

On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann <[hidden email]> wrote:
Hi!

Testing it with the current 0.10 snapshot is not easily possible atm

But I deactivated checkpointing in my program and still get duplicates in my output. So it seems not only to come from the checkpointing feature, or?

May be the KafkaSink is responsible for this? (Just my guess)

Cheers Rico. 



Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <[hidden email]>:

Hi Rico,
unfortunately the 0.9 branch still seems to have problems with exactly once processing and checkpointed operators. We reworked how the checkpoints are handled for the 0.10 release so it should work well there. 

Could you maybe try running on the 0.10-SNAPSHOT release and see if the problems persist there?

Cheers,
Aljoscha

On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann <[hidden email]> wrote:
Hi!

I still have an issue... I was now using 0.9.1 and the new
KafkaConnector. But I still get duplicates in my flink prog. Here's the
relevant part:

         final FlinkKafkaConsumer082<String> kafkaSrc = new
FlinkKafkaConsumer082<String>(
             kafkaTopicIn, new SimpleStringSchema(), properties);

         DataStream<String> start = env.addSource(kafkaSrc)
             .setParallelism(numReadPartitions); //numReadPartitions = 2

         DataStream<JSONObject> jsonized = start
             .flatMap(new ExtractAndFilterJSON());

         DataStream<Session> sessions = jsonized
             .partitionByHash(new KeySelector<JSONObject, String>() {
             /**
              * partition by session id
              */
             @Override
             public String getKey(JSONObject value)
                 throws Exception {
                 try {
                 return /*session id*/;
                 } catch (Exception e) {
                 LOG.error("no session could be retrieved", e);
                 }
                 return "";
             }
             }).flatMap(new StatefulSearchSessionizer());

In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
sure that the kafka topic I'm reading from does not contain any
duplicates. So it must be in the flink program ...

Any ideas?

Cheers, Rico.



Reply | Threaded
Open this post in threaded view
|

Re: Duplicates in Flink

Rico Bergmann
The KafkaSink is the last step in my program after the 2nd deduplication. 

I could not yet track down where duplicates show up. That's a bit difficult to find out... But I'm trying to find it...



Am 03.09.2015 um 14:14 schrieb Stephan Ewen <[hidden email]>:

Can you tell us where the KafkaSink comes into play? At what point do the duplicates come up?

On Thu, Sep 3, 2015 at 2:09 PM, Rico Bergmann <[hidden email]> wrote:
No. I mean the KafkaSink. 

A bit more insight to my program: I read from a Kafka topic with flinkKafkaConsumer082, then hashpartition the data, then I do a deduplication (does not eliminate all duplicates though). Then some computation, afterwards again deduplication (group by message in a window of last 2 seconds). 

Of course the last deduplication is not perfect.

Cheers. Rico. 



Am 03.09.2015 um 13:29 schrieb Stephan Ewen <[hidden email]>:

Do you mean the KafkaSource?

Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the KafkaSource?

On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann <[hidden email]> wrote:
Hi!

Testing it with the current 0.10 snapshot is not easily possible atm

But I deactivated checkpointing in my program and still get duplicates in my output. So it seems not only to come from the checkpointing feature, or?

May be the KafkaSink is responsible for this? (Just my guess)

Cheers Rico. 



Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <[hidden email]>:

Hi Rico,
unfortunately the 0.9 branch still seems to have problems with exactly once processing and checkpointed operators. We reworked how the checkpoints are handled for the 0.10 release so it should work well there. 

Could you maybe try running on the 0.10-SNAPSHOT release and see if the problems persist there?

Cheers,
Aljoscha

On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann <[hidden email]> wrote:
Hi!

I still have an issue... I was now using 0.9.1 and the new
KafkaConnector. But I still get duplicates in my flink prog. Here's the
relevant part:

         final FlinkKafkaConsumer082<String> kafkaSrc = new
FlinkKafkaConsumer082<String>(
             kafkaTopicIn, new SimpleStringSchema(), properties);

         DataStream<String> start = env.addSource(kafkaSrc)
             .setParallelism(numReadPartitions); //numReadPartitions = 2

         DataStream<JSONObject> jsonized = start
             .flatMap(new ExtractAndFilterJSON());

         DataStream<Session> sessions = jsonized
             .partitionByHash(new KeySelector<JSONObject, String>() {
             /**
              * partition by session id
              */
             @Override
             public String getKey(JSONObject value)
                 throws Exception {
                 try {
                 return /*session id*/;
                 } catch (Exception e) {
                 LOG.error("no session could be retrieved", e);
                 }
                 return "";
             }
             }).flatMap(new StatefulSearchSessionizer());

In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
sure that the kafka topic I'm reading from does not contain any
duplicates. So it must be in the flink program ...

Any ideas?

Cheers, Rico.