DataSet: combineGroup/reduceGroup with large number of groups

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

DataSet: combineGroup/reduceGroup with large number of groups

Urs Schoenenberger
Hi,

I'm working on a batch job (roughly 10 billion records of input, 10
million groups) that is essentially a 'fold' over each group, that is, I
have a function

AggregateT addToAggrate(AggregateT agg, RecordT record) {...}

and want to fold this over each group in my DataSet.

My understanding is that I cannot use .groupBy(0).reduce(...) since the
ReduceFunction only supports the case where AggregateT is the same as
RecordT.

A simple solution using .reduceGroup(...) works, but spills all input
data in the reduce step, which produces a lot of slow & expensive Disk IO.

Therefore, we tried using .combineGroup(...).reduceGroup(...), but
experienced a similar amount of spilling. Checking the source of the
*Combine drivers, it seems that they accumulate events in a buffer, sort
the buffer by key, and combine adjacent records in the same group. This
does not work in my case due to the large number of groups - the records
in the buffer are most likely to all belong to different groups. The
"combine" phase therefore becomes a noop turning a single RecordT into
an AggregateT, and the reduce phase has 10 billion AggregateTs to combine.

Is there a way of modelling this computation efficiently with the
DataSet API? Alternatively, can I turn this into a DataStream job? (The
implementation there would simply be a MapFunction on a KeyedStream with
the AggregateT residing in keyed state, although I don't know how I
would emit this state at the end of the data stream only.)

Thanks,
Urs

--
Urs Schönenberger
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: DataSet: combineGroup/reduceGroup with large number of groups

Fabian Hueske-2
Hi Urs,

on the DataSet API, the only memory-safe way to do it is GroupReduceFunction.
As you observed this requires a full sort of the dataset which can be quite expensive but after the sort the computation is streamed.
You could also try to manually implement a hash-based combiner using a MapPartitionFunction. The function would have a HashMap on the key with a fixed size that needs to be manually tuned.
When you have to insert a new record into the HashMap but it reached the max size, you have to evict a record first. Since all of this happens on the heap, it won't be memory-safe and might fail with an OOME.

On the DataStream API you can use a ProcessFunction with keyed ValueState for the current AggregateT of each key. For each record you fetch the Aggregate from the state and update it.
To emit the results at the end, you'll need to register a timer to emit the results at the end because the final aggregates are stored in the local state but never emitted.
Another thing to consider is the state backend. You'll probably have to use the RocksDBStateBackend to be able to spill state to disk.

Hope this helps,
Fabian


2017-06-16 17:00 GMT+02:00 Urs Schoenenberger <[hidden email]>:
Hi,

I'm working on a batch job (roughly 10 billion records of input, 10
million groups) that is essentially a 'fold' over each group, that is, I
have a function

AggregateT addToAggrate(AggregateT agg, RecordT record) {...}

and want to fold this over each group in my DataSet.

My understanding is that I cannot use .groupBy(0).reduce(...) since the
ReduceFunction only supports the case where AggregateT is the same as
RecordT.

A simple solution using .reduceGroup(...) works, but spills all input
data in the reduce step, which produces a lot of slow & expensive Disk IO.

Therefore, we tried using .combineGroup(...).reduceGroup(...), but
experienced a similar amount of spilling. Checking the source of the
*Combine drivers, it seems that they accumulate events in a buffer, sort
the buffer by key, and combine adjacent records in the same group. This
does not work in my case due to the large number of groups - the records
in the buffer are most likely to all belong to different groups. The
"combine" phase therefore becomes a noop turning a single RecordT into
an AggregateT, and the reduce phase has 10 billion AggregateTs to combine.

Is there a way of modelling this computation efficiently with the
DataSet API? Alternatively, can I turn this into a DataStream job? (The
implementation there would simply be a MapFunction on a KeyedStream with
the AggregateT residing in keyed state, although I don't know how I
would emit this state at the end of the data stream only.)

Thanks,
Urs

--
Urs Schönenberger
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: DataSet: combineGroup/reduceGroup with large number of groups

Urs Schoenenberger
Hi Fabian,

thanks, that is very helpful indeed - I now understand why the DataSet
drivers insist on sorting the buffers and then processing instead of
keeping state.

In our case, the state should easily fit into the heap of the cluster,
though. In a quick&dirty example I tried just now, the MapPartition
solution outperforms GroupReduce/Combine by a factor of 3, looking
forward to testing this on our real data set soon.

Two things I'd like to clarify:

- Your suggestion of limiting the size of the HashMap in the
MapPartitionFunction is meant to reduce the risks of OOMEs, right? If
I'm confident my state fits into heap, there's no reason to do this?

- With your DataStream suggestion, I can't tell when to schedule a
processing time timer. I would therefore need to use an event time timer
(at Long.MAX_VALUE-1, say), and modify my source to emit a watermark
with Long.MAX_VALUE after it reaches the end of input, correct?

Thanks,
Urs

On 16.06.2017 17:58, Fabian Hueske wrote:

> Hi Urs,
>
> on the DataSet API, the only memory-safe way to do it is
> GroupReduceFunction.
> As you observed this requires a full sort of the dataset which can be quite
> expensive but after the sort the computation is streamed.
> You could also try to manually implement a hash-based combiner using a
> MapPartitionFunction. The function would have a HashMap on the key with a
> fixed size that needs to be manually tuned.
> When you have to insert a new record into the HashMap but it reached the
> max size, you have to evict a record first. Since all of this happens on
> the heap, it won't be memory-safe and might fail with an OOME.
>
> On the DataStream API you can use a ProcessFunction with keyed ValueState
> for the current AggregateT of each key. For each record you fetch the
> Aggregate from the state and update it.
> To emit the results at the end, you'll need to register a timer to emit the
> results at the end because the final aggregates are stored in the local
> state but never emitted.
> Another thing to consider is the state backend. You'll probably have to use
> the RocksDBStateBackend to be able to spill state to disk.
>
> Hope this helps,
> Fabian
>
>
> 2017-06-16 17:00 GMT+02:00 Urs Schoenenberger <
> [hidden email]>:
>
>> Hi,
>>
>> I'm working on a batch job (roughly 10 billion records of input, 10
>> million groups) that is essentially a 'fold' over each group, that is, I
>> have a function
>>
>> AggregateT addToAggrate(AggregateT agg, RecordT record) {...}
>>
>> and want to fold this over each group in my DataSet.
>>
>> My understanding is that I cannot use .groupBy(0).reduce(...) since the
>> ReduceFunction only supports the case where AggregateT is the same as
>> RecordT.
>>
>> A simple solution using .reduceGroup(...) works, but spills all input
>> data in the reduce step, which produces a lot of slow & expensive Disk IO.
>>
>> Therefore, we tried using .combineGroup(...).reduceGroup(...), but
>> experienced a similar amount of spilling. Checking the source of the
>> *Combine drivers, it seems that they accumulate events in a buffer, sort
>> the buffer by key, and combine adjacent records in the same group. This
>> does not work in my case due to the large number of groups - the records
>> in the buffer are most likely to all belong to different groups. The
>> "combine" phase therefore becomes a noop turning a single RecordT into
>> an AggregateT, and the reduce phase has 10 billion AggregateTs to combine.
>>
>> Is there a way of modelling this computation efficiently with the
>> DataSet API? Alternatively, can I turn this into a DataStream job? (The
>> implementation there would simply be a MapFunction on a KeyedStream with
>> the AggregateT residing in keyed state, although I don't know how I
>> would emit this state at the end of the data stream only.)
>>
>> Thanks,
>> Urs
>>
>> --
>> Urs Schönenberger
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>>
>>
>> Hi Urs,
>>
>> on the DataSet API, the only memory-safe way to do it is
>> GroupReduceFunction.
>> As you observed this requires a full sort of the dataset which can be
>> quite expensive but after the sort the computation is streamed.
>> You could also try to manually implement a hash-based combiner using a
>> MapPartitionFunction. The function would have a HashMap on the key
>> with a fixed size that needs to be manually tuned.
>> When you have to insert a new record into the HashMap but it reached
>> the max size, you have to evict a record first. Since all of this
>> happens on the heap, it won't be memory-safe and might fail with an OOME.
>>
>> On the DataStream API you can use a ProcessFunction with keyed
>> ValueState for the current AggregateT of each key. For each record you
>> fetch the Aggregate from the state and update it.
>> To emit the results at the end, you'll need to register a timer to
>> emit the results at the end because the final aggregates are stored in
>> the local state but never emitted.
>> Another thing to consider is the state backend. You'll probably have
>> to use the RocksDBStateBackend to be able to spill state to disk.
>>
>> Hope this helps,
>> Fabian
>>
>>
>> 2017-06-16 17:00 GMT+02:00 Urs Schoenenberger
>> <[hidden email] <mailto:[hidden email]>>:
>>
>>     Hi,
>>
>>     I'm working on a batch job (roughly 10 billion records of input, 10
>>     million groups) that is essentially a 'fold' over each group, that
>>     is, I
>>     have a function
>>
>>     AggregateT addToAggrate(AggregateT agg, RecordT record) {...}
>>
>>     and want to fold this over each group in my DataSet.
>>
>>     My understanding is that I cannot use .groupBy(0).reduce(...)
>>     since the
>>     ReduceFunction only supports the case where AggregateT is the same as
>>     RecordT.
>>
>>     A simple solution using .reduceGroup(...) works, but spills all input
>>     data in the reduce step, which produces a lot of slow & expensive
>>     Disk IO.
>>
>>     Therefore, we tried using .combineGroup(...).reduceGroup(...), but
>>     experienced a similar amount of spilling. Checking the source of the
>>     *Combine drivers, it seems that they accumulate events in a
>>     buffer, sort
>>     the buffer by key, and combine adjacent records in the same group.
>>     This
>>     does not work in my case due to the large number of groups - the
>>     records
>>     in the buffer are most likely to all belong to different groups. The
>>     "combine" phase therefore becomes a noop turning a single RecordT into
>>     an AggregateT, and the reduce phase has 10 billion AggregateTs to
>>     combine.
>>
>>     Is there a way of modelling this computation efficiently with the
>>     DataSet API? Alternatively, can I turn this into a DataStream job?
>>     (The
>>     implementation there would simply be a MapFunction on a
>>     KeyedStream with
>>     the AggregateT residing in keyed state, although I don't know how I
>>     would emit this state at the end of the data stream only.)
>>
>>     Thanks,
>>     Urs
>>
>>     --
>>     Urs Schönenberger
>>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>>

--
Urs Schönenberger - [hidden email]
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: DataSet: combineGroup/reduceGroup with large number of groups

Fabian Hueske-2
Hi Urs,

ad 1) Yes, my motivation for the bound was to prevent OOMEs. If you have enough memory to hold the AggregateT for each key in memory, you should be fine without a bound. If the size of AggregateT depends on the number of aggregated elements, you might run into skew issues though.
ad 2) AFAIK, all sources emit a Long.MAX_VALUE watermark once they completely emitted their data (would only happen for bounded data of course). You do not need to generate any other watermarks or timestamps because you want the results after all data was processed.

Best, Fabian

2017-06-17 0:19 GMT+02:00 Urs Schoenenberger <[hidden email]>:
Hi Fabian,

thanks, that is very helpful indeed - I now understand why the DataSet
drivers insist on sorting the buffers and then processing instead of
keeping state.

In our case, the state should easily fit into the heap of the cluster,
though. In a quick&dirty example I tried just now, the MapPartition
solution outperforms GroupReduce/Combine by a factor of 3, looking
forward to testing this on our real data set soon.

Two things I'd like to clarify:

- Your suggestion of limiting the size of the HashMap in the
MapPartitionFunction is meant to reduce the risks of OOMEs, right? If
I'm confident my state fits into heap, there's no reason to do this?

- With your DataStream suggestion, I can't tell when to schedule a
processing time timer. I would therefore need to use an event time timer
(at Long.MAX_VALUE-1, say), and modify my source to emit a watermark
with Long.MAX_VALUE after it reaches the end of input, correct?

Thanks,
Urs

On 16.06.2017 17:58, Fabian Hueske wrote:
> Hi Urs,
>
> on the DataSet API, the only memory-safe way to do it is
> GroupReduceFunction.
> As you observed this requires a full sort of the dataset which can be quite
> expensive but after the sort the computation is streamed.
> You could also try to manually implement a hash-based combiner using a
> MapPartitionFunction. The function would have a HashMap on the key with a
> fixed size that needs to be manually tuned.
> When you have to insert a new record into the HashMap but it reached the
> max size, you have to evict a record first. Since all of this happens on
> the heap, it won't be memory-safe and might fail with an OOME.
>
> On the DataStream API you can use a ProcessFunction with keyed ValueState
> for the current AggregateT of each key. For each record you fetch the
> Aggregate from the state and update it.
> To emit the results at the end, you'll need to register a timer to emit the
> results at the end because the final aggregates are stored in the local
> state but never emitted.
> Another thing to consider is the state backend. You'll probably have to use
> the RocksDBStateBackend to be able to spill state to disk.
>
> Hope this helps,
> Fabian
>
>
> 2017-06-16 17:00 GMT+02:00 Urs Schoenenberger <
> [hidden email]>:
>
>> Hi,
>>
>> I'm working on a batch job (roughly 10 billion records of input, 10
>> million groups) that is essentially a 'fold' over each group, that is, I
>> have a function
>>
>> AggregateT addToAggrate(AggregateT agg, RecordT record) {...}
>>
>> and want to fold this over each group in my DataSet.
>>
>> My understanding is that I cannot use .groupBy(0).reduce(...) since the
>> ReduceFunction only supports the case where AggregateT is the same as
>> RecordT.
>>
>> A simple solution using .reduceGroup(...) works, but spills all input
>> data in the reduce step, which produces a lot of slow & expensive Disk IO.
>>
>> Therefore, we tried using .combineGroup(...).reduceGroup(...), but
>> experienced a similar amount of spilling. Checking the source of the
>> *Combine drivers, it seems that they accumulate events in a buffer, sort
>> the buffer by key, and combine adjacent records in the same group. This
>> does not work in my case due to the large number of groups - the records
>> in the buffer are most likely to all belong to different groups. The
>> "combine" phase therefore becomes a noop turning a single RecordT into
>> an AggregateT, and the reduce phase has 10 billion AggregateTs to combine.
>>
>> Is there a way of modelling this computation efficiently with the
>> DataSet API? Alternatively, can I turn this into a DataStream job? (The
>> implementation there would simply be a MapFunction on a KeyedStream with
>> the AggregateT residing in keyed state, although I don't know how I
>> would emit this state at the end of the data stream only.)
>>
>> Thanks,
>> Urs
>>
>> --
>> Urs Schönenberger
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>>
>>
>> Hi Urs,
>>
>> on the DataSet API, the only memory-safe way to do it is
>> GroupReduceFunction.
>> As you observed this requires a full sort of the dataset which can be
>> quite expensive but after the sort the computation is streamed.
>> You could also try to manually implement a hash-based combiner using a
>> MapPartitionFunction. The function would have a HashMap on the key
>> with a fixed size that needs to be manually tuned.
>> When you have to insert a new record into the HashMap but it reached
>> the max size, you have to evict a record first. Since all of this
>> happens on the heap, it won't be memory-safe and might fail with an OOME.
>>
>> On the DataStream API you can use a ProcessFunction with keyed
>> ValueState for the current AggregateT of each key. For each record you
>> fetch the Aggregate from the state and update it.
>> To emit the results at the end, you'll need to register a timer to
>> emit the results at the end because the final aggregates are stored in
>> the local state but never emitted.
>> Another thing to consider is the state backend. You'll probably have
>> to use the RocksDBStateBackend to be able to spill state to disk.
>>
>> Hope this helps,
>> Fabian
>>
>>
>> 2017-06-16 17:00 GMT+02:00 Urs Schoenenberger
>> <[hidden email] <mailto:[hidden email]>>:
>>
>>     Hi,
>>
>>     I'm working on a batch job (roughly 10 billion records of input, 10
>>     million groups) that is essentially a 'fold' over each group, that
>>     is, I
>>     have a function
>>
>>     AggregateT addToAggrate(AggregateT agg, RecordT record) {...}
>>
>>     and want to fold this over each group in my DataSet.
>>
>>     My understanding is that I cannot use .groupBy(0).reduce(...)
>>     since the
>>     ReduceFunction only supports the case where AggregateT is the same as
>>     RecordT.
>>
>>     A simple solution using .reduceGroup(...) works, but spills all input
>>     data in the reduce step, which produces a lot of slow & expensive
>>     Disk IO.
>>
>>     Therefore, we tried using .combineGroup(...).reduceGroup(...), but
>>     experienced a similar amount of spilling. Checking the source of the
>>     *Combine drivers, it seems that they accumulate events in a
>>     buffer, sort
>>     the buffer by key, and combine adjacent records in the same group.
>>     This
>>     does not work in my case due to the large number of groups - the
>>     records
>>     in the buffer are most likely to all belong to different groups. The
>>     "combine" phase therefore becomes a noop turning a single RecordT into
>>     an AggregateT, and the reduce phase has 10 billion AggregateTs to
>>     combine.
>>
>>     Is there a way of modelling this computation efficiently with the
>>     DataSet API? Alternatively, can I turn this into a DataStream job?
>>     (The
>>     implementation there would simply be a MapFunction on a
>>     KeyedStream with
>>     the AggregateT residing in keyed state, although I don't know how I
>>     would emit this state at the end of the data stream only.)
>>
>>     Thanks,
>>     Urs
>>
>>     --
>>     Urs Schönenberger
>>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>>

--
Urs Schönenberger - [hidden email]
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082