Hi!
I still have an issue... I was now using 0.9.1 and the new KafkaConnector. But I still get duplicates in my flink prog. Here's the relevant part: final FlinkKafkaConsumer082<String> kafkaSrc = new FlinkKafkaConsumer082<String>( kafkaTopicIn, new SimpleStringSchema(), properties); DataStream<String> start = env.addSource(kafkaSrc) .setParallelism(numReadPartitions); //numReadPartitions = 2 DataStream<JSONObject> jsonized = start .flatMap(new ExtractAndFilterJSON()); DataStream<Session> sessions = jsonized .partitionByHash(new KeySelector<JSONObject, String>() { /** * partition by session id */ @Override public String getKey(JSONObject value) throws Exception { try { return /*session id*/; } catch (Exception e) { LOG.error("no session could be retrieved", e); } return ""; } }).flatMap(new StatefulSearchSessionizer()); In the StatefulSearchSessionizer I receive duplicates sporadically. I'm sure that the kafka topic I'm reading from does not contain any duplicates. So it must be in the flink program ... Any ideas? Cheers, Rico. |
Hi Rico, unfortunately the 0.9 branch still seems to have problems with exactly once processing and checkpointed operators. We reworked how the checkpoints are handled for the 0.10 release so it should work well there. Could you maybe try running on the 0.10-SNAPSHOT release and see if the problems persist there? Cheers, Aljoscha On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann <[hidden email]> wrote: Hi! |
Hi! Testing it with the current 0.10 snapshot is not easily possible atm But I deactivated checkpointing in my program and still get duplicates in my output. So it seems not only to come from the checkpointing feature, or? May be the KafkaSink is responsible for this? (Just my guess) Cheers Rico.
|
Do you mean the KafkaSource? Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the KafkaSource? On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann <[hidden email]> wrote:
|
No. I mean the KafkaSink. A bit more insight to my program: I read from a Kafka topic with flinkKafkaConsumer082, then hashpartition the data, then I do a deduplication (does not eliminate all duplicates though). Then some computation, afterwards again deduplication (group by message in a window of last 2 seconds). Of course the last deduplication is not perfect. Cheers. Rico.
|
Can you tell us where the KafkaSink comes into play? At what point do the duplicates come up? On Thu, Sep 3, 2015 at 2:09 PM, Rico Bergmann <[hidden email]> wrote:
|
The KafkaSink is the last step in my program after the 2nd deduplication. I could not yet track down where duplicates show up. That's a bit difficult to find out... But I'm trying to find it...
|
Free forum by Nabble | Edit this page |