Kafka and Flink's partitions

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka and Flink's partitions

rss rss
Hello,

  I want to implement something like a schema of processing which is presented on following diagram. This is calculation of number of unique users per specified time with assumption that we have > 100k events per second and > 100M unique users:

 

 I have one Kafka's topic of events with a partitioner by hash(userId) % partitionsNum  https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/KafkaPartitioner.java. I have prepared a runnable example https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/FlinkStreamingConsumer.java

 And the project is available by https://github.com/rssdev10/flink-kafka-streaming/ . Also see this page about how to run data generator and run the test.

 Basic assumption. I need to calculate a number of unique identifiers, so I need to store them in a memory in Set<String> structure but the size of this data structure is dozens GB. So I need to partitioning data by identifier to reduce size and collect only already calculated numbers per specified time. E.g. every hour.

 Questions:
  1. The logic of Flink is very hidden. Window operator requires keyed stream. Does it means that when I'm doing
    eventStream.keyBy(event -> event.partition(partNum));
    with the same partitioner as used for Kafka then Flink saves primary partitions? I want to avoid any repartitioning.
  2. Then I'm doing
    WindowedStream<Event, Integer, TimeWindow> uniqUsersWin =
    userIdKeyed.timeWindow(Time.seconds(windowDurationTime));

    DataStream<ProductAggregator> uniqUsers = uniqUsersWin.trigger(ProcessingTimeTrigger.create())
    .fold(new UniqAggregator(), (FoldFunction<Event, UniqAggregator>) (accumulator, value) -> {
    accumulator.uniqIds.add(value.getUserId());

    accumulator.registerEvent(value);

    return accumulator;
    })
    does it mean that I have only one partition?
  3. Next, I want to collect partial results of aggregation. I'm using a custom trigger https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/CountOrTimeTrigger.java which provides firing on collected partial aggregates accordingly to number of Kafka's partitions of by emergency time if the number of aggregates is not enough. And the following code for aggregation:
    AllWindowedStream<ProductAggregator, TimeWindow> combinedUniqNumStream =
    uniqUsers
    .timeWindowAll(Time.seconds(emergencyTriggerTimeout))
    .trigger(PurgingTrigger.of(CountOrTimeTrigger.of(partNum)));

    combinedUniqNumStream
    .fold(new ProductAggregator(),
    (FoldFunction<ProductAggregator, ProductAggregator>) (accumulator, value) -> {
    accumulator.value += value.value;

    accumulator.summarize(value);

    return accumulator;
    })
    But sometime I see an incorrect number of unique identifiers probably because of skewing of the partial aggregates. This test generates not more than 1000 identifiers. It is possible to see it when this test is ran after preloading of messages to Kafka.


PS: I found some information at http://data-artisans.com/kafka-flink-a-practical-how-to/ and https://www.elastic.co/blog/building-real-time-dashboard-applications-with-apache-flink-elasticsearch-and-kibana but unfortunately these articles doesn't answer how to build the specified schema.


Cheers



Reply | Threaded
Open this post in threaded view
|

Re: Kafka and Flink's partitions

rmetzger0
Hi rss,

Concerning your questions:
1. There is currently no way to avoid the repartitioning. When you do a keyBy(), Flink will shuffle the data through the network. What you would need is a way to tell Flink that the data is already partitioned. If you would use keyed state, you would also need to ensure that the same hash function is used for the partitions and the state.

2. Why do you assume that this would end up in one partition?

3. You can also read old messages from a Kafka topic by setting the "auto.offset.reset" to "smallest" (or "latest") and using a new "group.id".

I'll add Aljoscha and Kostas to the eMail. Maybe they can help with the incorrect results of the windowing.

Regards,
Robert


On Thu, Aug 25, 2016 at 8:21 PM, rss rss <[hidden email]> wrote:
Hello,

  I want to implement something like a schema of processing which is presented on following diagram. This is calculation of number of unique users per specified time with assumption that we have > 100k events per second and > 100M unique users:

 

 I have one Kafka's topic of events with a partitioner by hash(userId) % partitionsNum  https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/KafkaPartitioner.java. I have prepared a runnable example https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/FlinkStreamingConsumer.java

 And the project is available by https://github.com/rssdev10/flink-kafka-streaming/ . Also see this page about how to run data generator and run the test.

 Basic assumption. I need to calculate a number of unique identifiers, so I need to store them in a memory in Set<String> structure but the size of this data structure is dozens GB. So I need to partitioning data by identifier to reduce size and collect only already calculated numbers per specified time. E.g. every hour.

 Questions:
  1. The logic of Flink is very hidden. Window operator requires keyed stream. Does it means that when I'm doing
    eventStream.keyBy(event -> event.partition(partNum));
    with the same partitioner as used for Kafka then Flink saves primary partitions? I want to avoid any repartitioning.
  2. Then I'm doing
    WindowedStream<Event, Integer, TimeWindow> uniqUsersWin =
    userIdKeyed.timeWindow(Time.seconds(windowDurationTime));

    DataStream<ProductAggregator> uniqUsers = uniqUsersWin.trigger(ProcessingTimeTrigger.create())
    .fold(new UniqAggregator(), (FoldFunction<Event, UniqAggregator>) (accumulator, value) -> {
    accumulator.uniqIds.add(value.getUserId());

    accumulator.registerEvent(value);

    return accumulator;
    })
    does it mean that I have only one partition?
  3. Next, I want to collect partial results of aggregation. I'm using a custom trigger https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/CountOrTimeTrigger.java which provides firing on collected partial aggregates accordingly to number of Kafka's partitions of by emergency time if the number of aggregates is not enough. And the following code for aggregation:
    AllWindowedStream<ProductAggregator, TimeWindow> combinedUniqNumStream =
    uniqUsers
    .timeWindowAll(Time.seconds(emergencyTriggerTimeout))
    .trigger(PurgingTrigger.of(CountOrTimeTrigger.of(partNum)));

    combinedUniqNumStream
    .fold(new ProductAggregator(),
    (FoldFunction<ProductAggregator, ProductAggregator>) (accumulator, value) -> {
    accumulator.value += value.value;

    accumulator.summarize(value);

    return accumulator;
    })
    But sometime I see an incorrect number of unique identifiers probably because of skewing of the partial aggregates. This test generates not more than 1000 identifiers. It is possible to see it when this test is ran after preloading of messages to Kafka.


PS: I found some information at http://data-artisans.com/kafka-flink-a-practical-how-to/ and https://www.elastic.co/blog/building-real-time-dashboard-applications-with-apache-flink-elasticsearch-and-kibana but unfortunately these articles doesn't answer how to build the specified schema.


Cheers




Reply | Threaded
Open this post in threaded view
|

Re: Kafka and Flink's partitions

rss rss
Hello, thanks for the answer.

1. There is currently no way to avoid the repartitioning. When you do a keyBy(), Flink will shuffle the data through the network. What you would need is a way to tell Flink that the data is already partitioned. If you would use keyed state, you would also need to ensure that the same hash function is used for the partitions and the state.

Is it an assumption only or are some examples exist? Yesterday I wrote a question about incompatibility of keyed serializer in Flink with Kafka's deserializer.

2. Why do you assume that this would end up in one partition?

Just assumption. I don't know ways how to check it.

3. You can also read old messages from a Kafka topic by setting the "auto.offset.reset" to "smallest" (or "latest") and using a new "group.id".

Ok, I know about it. But "smallest" is a way to repeat test with same data.


The question from my side in general. Is the implementation https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/FlinkStreamingConsumer.java appropriate to the schema in the first email?


Regarding the example. This small autonomous test is based on DIMA's project. And in this form you can use it, If it may be useful.


Thanks,
best regards.

2016-08-29 13:54 GMT+02:00 Robert Metzger <[hidden email]>:
Hi rss,

Concerning your questions:
1. There is currently no way to avoid the repartitioning. When you do a keyBy(), Flink will shuffle the data through the network. What you would need is a way to tell Flink that the data is already partitioned. If you would use keyed state, you would also need to ensure that the same hash function is used for the partitions and the state.

2. Why do you assume that this would end up in one partition?

3. You can also read old messages from a Kafka topic by setting the "auto.offset.reset" to "smallest" (or "latest") and using a new "group.id".

I'll add Aljoscha and Kostas to the eMail. Maybe they can help with the incorrect results of the windowing.

Regards,
Robert


On Thu, Aug 25, 2016 at 8:21 PM, rss rss <[hidden email]> wrote:
Hello,

  I want to implement something like a schema of processing which is presented on following diagram. This is calculation of number of unique users per specified time with assumption that we have > 100k events per second and > 100M unique users:

 

 I have one Kafka's topic of events with a partitioner by hash(userId) % partitionsNum  https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/KafkaPartitioner.java. I have prepared a runnable example https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/FlinkStreamingConsumer.java

 And the project is available by https://github.com/rssdev10/flink-kafka-streaming/ . Also see this page about how to run data generator and run the test.

 Basic assumption. I need to calculate a number of unique identifiers, so I need to store them in a memory in Set<String> structure but the size of this data structure is dozens GB. So I need to partitioning data by identifier to reduce size and collect only already calculated numbers per specified time. E.g. every hour.

 Questions:
  1. The logic of Flink is very hidden. Window operator requires keyed stream. Does it means that when I'm doing
    eventStream.keyBy(event -> event.partition(partNum));
    with the same partitioner as used for Kafka then Flink saves primary partitions? I want to avoid any repartitioning.
  2. Then I'm doing
    WindowedStream<Event, Integer, TimeWindow> uniqUsersWin =
    userIdKeyed.timeWindow(Time.seconds(windowDurationTime));

    DataStream<ProductAggregator> uniqUsers = uniqUsersWin.trigger(ProcessingTimeTrigger.create())
    .fold(new UniqAggregator(), (FoldFunction<Event, UniqAggregator>) (accumulator, value) -> {
    accumulator.uniqIds.add(value.getUserId());

    accumulator.registerEvent(value);

    return accumulator;
    })
    does it mean that I have only one partition?
  3. Next, I want to collect partial results of aggregation. I'm using a custom trigger https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/CountOrTimeTrigger.java which provides firing on collected partial aggregates accordingly to number of Kafka's partitions of by emergency time if the number of aggregates is not enough. And the following code for aggregation:
    AllWindowedStream<ProductAggregator, TimeWindow> combinedUniqNumStream =
    uniqUsers
    .timeWindowAll(Time.seconds(emergencyTriggerTimeout))
    .trigger(PurgingTrigger.of(CountOrTimeTrigger.of(partNum)));

    combinedUniqNumStream
    .fold(new ProductAggregator(),
    (FoldFunction<ProductAggregator, ProductAggregator>) (accumulator, value) -> {
    accumulator.value += value.value;

    accumulator.summarize(value);

    return accumulator;
    })
    But sometime I see an incorrect number of unique identifiers probably because of skewing of the partial aggregates. This test generates not more than 1000 identifiers. It is possible to see it when this test is ran after preloading of messages to Kafka.


PS: I found some information at http://data-artisans.com/kafka-flink-a-practical-how-to/ and https://www.elastic.co/blog/building-real-time-dashboard-applications-with-apache-flink-elasticsearch-and-kibana but unfortunately these articles doesn't answer how to build the specified schema.


Cheers