Count window on partition

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Count window on partition

Dmitry Golubets
Hi,

I'm looking for the right way to do the following scheme:

1. Read data
2. Split it into partitions for parallel processing
3. In every partition group data in N elements batches
4. Process these batches

My first attempt was: dataStream.keyBy(_.key).countWindow(..)
But countWindow groups by a key. I however want to group all elements in partition.

Then I tried: dataStream.keyBy(_.key).countWindowAll(..)
But apparently countWindowAll doesn't work on partitioned data.

So, my last version is:
dataStream.keyBy(_.key.hashCode % 4).countWindow(..)
But is looks kida hacky with hardcoded partitions number.

So, what's the right way of doing it?

Best regards,
Dmitry
Reply | Threaded
Open this post in threaded view
|

Re: Count window on partition

Kostas Kloudas
Hi Dmitry,

In all cases, the result of the countWindow will be also grouped by key because of 
the keyBy() that you  are using.

If you want to have a non-keyed stream and then split it in count windows, remove 
the keyBy() and instead of countWindow(), use countWindowAll(). This will have
parallelism 1 but then you can repartition your stream so that the downstream 
operators have higher parallelism.

Hope this helps,
Kostas

On Jan 23, 2017, at 11:05 AM, Dmitry Golubets <[hidden email]> wrote:

Hi,

I'm looking for the right way to do the following scheme:

1. Read data
2. Split it into partitions for parallel processing
3. In every partition group data in N elements batches
4. Process these batches

My first attempt was: dataStream.keyBy(_.key).countWindow(..)
But countWindow groups by a key. I however want to group all elements in partition.

Then I tried: dataStream.keyBy(_.key).countWindowAll(..)
But apparently countWindowAll doesn't work on partitioned data.

So, my last version is:
dataStream.keyBy(_.key.hashCode % 4).countWindow(..)
But is looks kida hacky with hardcoded partitions number.

So, what's the right way of doing it?

Best regards,
Dmitry

Reply | Threaded
Open this post in threaded view
|

Re: Count window on partition

Fabian Hueske-2
Hi Dmitry,

the third version is the way to go, IMO.
You might want to have a larger number of partitions if you are planning to later increase the parallelism of the job.
Also note, that it is not guaranteed that 4 keys are uniformly distributed to 4 tasks. It might happen that one task ends up with two keys and another with none.

If you want more control you can use partitionCustom (which does not produce a KeyedStream) and a stateful Map or FlatMap function to do the aggregation yourself.

Best, Fabian

2017-01-23 11:12 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Dmitry,

In all cases, the result of the countWindow will be also grouped by key because of 
the keyBy() that you  are using.

If you want to have a non-keyed stream and then split it in count windows, remove 
the keyBy() and instead of countWindow(), use countWindowAll(). This will have
parallelism 1 but then you can repartition your stream so that the downstream 
operators have higher parallelism.

Hope this helps,
Kostas

On Jan 23, 2017, at 11:05 AM, Dmitry Golubets <[hidden email]> wrote:

Hi,

I'm looking for the right way to do the following scheme:

1. Read data
2. Split it into partitions for parallel processing
3. In every partition group data in N elements batches
4. Process these batches

My first attempt was: dataStream.keyBy(_.key).countWindow(..)
But countWindow groups by a key. I however want to group all elements in partition.

Then I tried: dataStream.keyBy(_.key).countWindowAll(..)
But apparently countWindowAll doesn't work on partitioned data.

So, my last version is:
dataStream.keyBy(_.key.hashCode % 4).countWindow(..)
But is looks kida hacky with hardcoded partitions number.

So, what's the right way of doing it?

Best regards,
Dmitry