Idle windows

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Idle windows

Ustinov Anton
I have a simple job that reads JSON messages from Kafka topic and proccesses them like this:

SingleOutputStreamOperator<Integer> result = ds
        .filter(ev -> ev.has(cookieFieldName))
        .map(ev -> ev.get(cookieFieldName).asText())
        .keyBy(new CookieKeySelector(env.getParallelism()))
        .timeWindow(Time.seconds(period))
        .aggregate(new CookieAggregate())
        .timeWindowAll(Time.seconds(period))
        .reduce((v1, v2) -> v1 + v2);

CookieKeySelector counts MD5 hash from cookie value and calculate remainder from division on job parallelism. CookieAggreage counts unique cookie values in window. I see in Flink Dashboard that only half of windows are getting messages to process. Number of working windows depends on job parallelism. Why only part of windows compute useful aggregates? I’ve tried to use random numbers as a key and still get same result.

Additional information: Flink 1.8.0, runs on a single node with 56 CPUs, 256G RAM, 10GB/s network.


Anton Ustinov
[hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Idle windows

Hequn Cheng
Hi Ustinov,

I guess you have mixed the concept between remainder and the parallelism, i.e., data with remainder 0 don't mean they will be processed by the 0th task after keyBy.
Flink will perform a Hash function on the key you have provided, and partition the record based on the key group range. 

KeyBy makes sure that the same key goes to the same place, if you want to balance the workload, you need to have more different keys. 

Best, Hequn


On Fri, Jun 21, 2019 at 6:23 PM Ustinov Anton <[hidden email]> wrote:
I have a simple job that reads JSON messages from Kafka topic and proccesses them like this:

SingleOutputStreamOperator<Integer> result = ds
        .filter(ev -> ev.has(cookieFieldName))
        .map(ev -> ev.get(cookieFieldName).asText())
        .keyBy(new CookieKeySelector(env.getParallelism()))
        .timeWindow(Time.seconds(period))
        .aggregate(new CookieAggregate())
        .timeWindowAll(Time.seconds(period))
        .reduce((v1, v2) -> v1 + v2);

CookieKeySelector counts MD5 hash from cookie value and calculate remainder from division on job parallelism. CookieAggreage counts unique cookie values in window. I see in Flink Dashboard that only half of windows are getting messages to process. Number of working windows depends on job parallelism. Why only part of windows compute useful aggregates? I’ve tried to use random numbers as a key and still get same result.

Additional information: Flink 1.8.0, runs on a single node with 56 CPUs, 256G RAM, 10GB/s network.


Anton Ustinov
[hidden email]