Hi all,
In Flink 1.9 couple of changes was introduced to deal with bounded streams e.g. BoundedOneInput interface. I'm wondering would it be doable to do some kind of global sort after receiving end input event on finished data stream source, using only DataStream API? We have made some experiments with BoundedOneInput - buffering elements and then sorting them after receiving the end input event and finally emitting sorted elements. it is seems to be working as excepted though we are having troubles to sort a big stream efficiently. One problem is missing appropriate state type something like SortedMapState. While using MapState the elements are inserted into a kind of byte order. I think it could be possible to do some key modification to achieve correct bytes order but it's not trivial for every type (string, int, tuples, and so on). Do you plan adding such kind of sorted state? In Flink Table API there is SortOperator but it is restricted to BinaryRow. Would it be possible to adapt this functionality in streaming API for arbitrary types? What do you think? Thanks, Łukasz |
Hi Łukasz, First, we are planning to design and implement the BoundedStream story, which will be discussed further in 1.11 or 1.12. SortedMapState was discussed in FLINK-6219 [1], But there are some problems that can not be solved well, so they have not been introduced. If it is a pure BoundedStream without checkpoints, it is not recommended to use state, because state is usually used for checkpoints, which will cause more overhead. SortOperator is introduced for table BaseRow, I recommend that you use the UnilateralSortMerger to construct your own SortOperator. Best, Jingsong Lee On Fri, Jan 31, 2020 at 2:08 AM Łukasz Jędrzejewski <[hidden email]> wrote: Hi all, Best, Jingsong Lee |
Hi, Thank you very much for suggestions. I will check out the
UnilateralSortMerge. However in our case we are using checkpoints. Kind regards, W dniu 31.01.2020 o 07:54, Jingsong Li
pisze:
|
Hi, If you are using checkpoints, I think a simple way is using a ListState to store all coming records. And in endInput(), drain all records from ListState to a sorter to sort all records. Best, Jingsong Lee On Fri, Jan 31, 2020 at 3:10 PM Łukasz Jędrzejewski <[hidden email]> wrote:
Best, Jingsong Lee |
Free forum by Nabble | Edit this page |