Sorting bounded data stream

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Sorting bounded data stream

Łukasz Jędrzejewski
Hi all,

In Flink 1.9 couple of changes was introduced to deal with bounded
streams e.g.  BoundedOneInput interface. I'm wondering would it be
doable to do some kind of global sort after receiving end input event on
finished data stream source, using only DataStream API?

We have made some experiments with BoundedOneInput - buffering elements
and then sorting them after receiving the end input event and finally
emitting sorted elements. it is seems to be working as excepted though
we are having troubles to sort a big stream efficiently. One problem is
missing appropriate state type something like SortedMapState. While
using MapState the elements are inserted into a kind of byte order. I
think it could be possible to do some key modification to achieve
correct bytes order but it's not trivial for every type (string, int,
tuples, and so on). Do you plan adding such kind of sorted state?

In Flink Table API there is SortOperator but it is restricted to
BinaryRow. Would it be possible to adapt this functionality in streaming
API for arbitrary types? What do you think?

Thanks,
Łukasz

Reply | Threaded
Open this post in threaded view
|

Re: Sorting bounded data stream

Jingsong Li
Hi Łukasz,

First, we are planning to design and implement the BoundedStream story, which will be discussed further in 1.11 or 1.12.

SortedMapState was discussed in FLINK-6219 [1], But there are some problems that can not be solved well, so they have not been introduced.
 
If it is a pure BoundedStream without checkpoints, it is not recommended to use state, because state is usually used for checkpoints, which will cause more overhead.

SortOperator is introduced for table BaseRow, I recommend that you use the UnilateralSortMerger to construct your own SortOperator.


Best,
Jingsong Lee

On Fri, Jan 31, 2020 at 2:08 AM Łukasz Jędrzejewski <[hidden email]> wrote:
Hi all,

In Flink 1.9 couple of changes was introduced to deal with bounded
streams e.g.  BoundedOneInput interface. I'm wondering would it be
doable to do some kind of global sort after receiving end input event on
finished data stream source, using only DataStream API?

We have made some experiments with BoundedOneInput - buffering elements
and then sorting them after receiving the end input event and finally
emitting sorted elements. it is seems to be working as excepted though
we are having troubles to sort a big stream efficiently. One problem is
missing appropriate state type something like SortedMapState. While
using MapState the elements are inserted into a kind of byte order. I
think it could be possible to do some key modification to achieve
correct bytes order but it's not trivial for every type (string, int,
tuples, and so on). Do you plan adding such kind of sorted state?

In Flink Table API there is SortOperator but it is restricted to
BinaryRow. Would it be possible to adapt this functionality in streaming
API for arbitrary types? What do you think?

Thanks,
Łukasz



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Sorting bounded data stream

Łukasz Jędrzejewski

Hi,

Thank you very much for suggestions. I will check out the UnilateralSortMerge. However in our case we are using checkpoints.

Kind regards,
Łukasz

W dniu 31.01.2020 o 07:54, Jingsong Li pisze:
Hi Łukasz,

First, we are planning to design and implement the BoundedStream story, which will be discussed further in 1.11 or 1.12.

SortedMapState was discussed in FLINK-6219 [1], But there are some problems that can not be solved well, so they have not been introduced.
 
If it is a pure BoundedStream without checkpoints, it is not recommended to use state, because state is usually used for checkpoints, which will cause more overhead.

SortOperator is introduced for table BaseRow, I recommend that you use the UnilateralSortMerger to construct your own SortOperator.


Best,
Jingsong Lee

On Fri, Jan 31, 2020 at 2:08 AM Łukasz Jędrzejewski <[hidden email]> wrote:
Hi all,

In Flink 1.9 couple of changes was introduced to deal with bounded
streams e.g.  BoundedOneInput interface. I'm wondering would it be
doable to do some kind of global sort after receiving end input event on
finished data stream source, using only DataStream API?

We have made some experiments with BoundedOneInput - buffering elements
and then sorting them after receiving the end input event and finally
emitting sorted elements. it is seems to be working as excepted though
we are having troubles to sort a big stream efficiently. One problem is
missing appropriate state type something like SortedMapState. While
using MapState the elements are inserted into a kind of byte order. I
think it could be possible to do some key modification to achieve
correct bytes order but it's not trivial for every type (string, int,
tuples, and so on). Do you plan adding such kind of sorted state?

In Flink Table API there is SortOperator but it is restricted to
BinaryRow. Would it be possible to adapt this functionality in streaming
API for arbitrary types? What do you think?

Thanks,
Łukasz



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Sorting bounded data stream

Jingsong Li
Hi,

If you are using checkpoints, I think a simple way is using a ListState to store all coming records.
And in endInput(), drain all records from ListState to a sorter to sort all records. 

Best,
Jingsong Lee

On Fri, Jan 31, 2020 at 3:10 PM Łukasz Jędrzejewski <[hidden email]> wrote:

Hi,

Thank you very much for suggestions. I will check out the UnilateralSortMerge. However in our case we are using checkpoints.

Kind regards,
Łukasz

W dniu 31.01.2020 o 07:54, Jingsong Li pisze:
Hi Łukasz,

First, we are planning to design and implement the BoundedStream story, which will be discussed further in 1.11 or 1.12.

SortedMapState was discussed in FLINK-6219 [1], But there are some problems that can not be solved well, so they have not been introduced.
 
If it is a pure BoundedStream without checkpoints, it is not recommended to use state, because state is usually used for checkpoints, which will cause more overhead.

SortOperator is introduced for table BaseRow, I recommend that you use the UnilateralSortMerger to construct your own SortOperator.


Best,
Jingsong Lee

On Fri, Jan 31, 2020 at 2:08 AM Łukasz Jędrzejewski <[hidden email]> wrote:
Hi all,

In Flink 1.9 couple of changes was introduced to deal with bounded
streams e.g.  BoundedOneInput interface. I'm wondering would it be
doable to do some kind of global sort after receiving end input event on
finished data stream source, using only DataStream API?

We have made some experiments with BoundedOneInput - buffering elements
and then sorting them after receiving the end input event and finally
emitting sorted elements. it is seems to be working as excepted though
we are having troubles to sort a big stream efficiently. One problem is
missing appropriate state type something like SortedMapState. While
using MapState the elements are inserted into a kind of byte order. I
think it could be possible to do some key modification to achieve
correct bytes order but it's not trivial for every type (string, int,
tuples, and so on). Do you plan adding such kind of sorted state?

In Flink Table API there is SortOperator but it is restricted to
BinaryRow. Would it be possible to adapt this functionality in streaming
API for arbitrary types? What do you think?

Thanks,
Łukasz



--
Best, Jingsong Lee


--
Best, Jingsong Lee