I have a pretty big but final stream and I need to be able to window it by number of elements. In this case from my observations flink can 'skip' the latest chunk of data if it has lower amount of elements than window size:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Output: 0,1,2,3,4,5,6,7,8,9 10,11,12,13,14,15,16,17,18,19 20,21,22,23,24,25,26,27,28,29 I.e. elements from 10 to 35 are not being processed. Does it make sense to have: count OR timeout window which will evict new window when number of elements reach a threshold OR collecting timeout occurs? |
Hi, yes, you can achieve this by writing a custom Trigger that can trigger both on the count or after a long-enough timeout. It would be a combination of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you could look to those to get started. Cheers, Aljoscha On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <[hidden email]> wrote:
|
Thanks, I wonder wouldn't it be good to have a built-in such functionality. At least when incoming stream is finished - flush remaining elements.On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <[hidden email]> wrote:
|
People have wondered about that a few times, yes. My opinion is that a stream is potentially infinite and processing only stops for anomalous reasons: when the job crashes, when stopping a job to later redeploy it. In those cases you would not want to flush out your data but keep them and restart from the same state when the job is restarted. You can implement the behavior by writing a custom Trigger that behaves like the count trigger but also fires when receiving a Long.MAX_VALUE watermark. A watermark of Long.MAX_VALUE signifies that a source has stopped processing for natural reasons. Cheers, Aljoscha On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <[hidden email]> wrote:
|
Maybe if it is not the first time it worth considering adding this thing as an option? ;-) My usecase - I have a pretty big amount of data basically for ETL. It is finite but it is big. I see it more as a stream not as a dataset. Also I would re-use the same code for infinite stream later...On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <[hidden email]> wrote:
|
Hi, if you are doing the windows not for their actual semantics I would suggest not using count based windows and also not using the *All windows. The *All windows are all non-parallel, i.e. you always only get one parallel instance of your window operator even if you have a huge cluster. Also, in most cases it is better to not use a plain WindowFunction with apply because all elements have to be buffered so that they can be passed as an Iterable, Iterable<Long> in your example. If you can, I would suggest to use a ReduceFunction or FoldFunction or an apply() with an incremental aggregation function: apply(ReduceFunction, WindowFunction) or apply(FoldFunction, WindowFunction). These allow incremental aggregation of the result as elements arrive and don't require buffering of all elements until the window fires. Cheers, Aljoscha On Thu, 21 Apr 2016 at 16:53 Kostya Kulagin <[hidden email]> wrote:
|
Thanks for reply. Maybe I would need some advise in this case. My situation: we have a stream of data, generally speaking <Long;String> tuples where long is a unique key (ie there are no tuples with the same key)I need to filter out all tuples that do not match certain lucene query. Creating lucene index on one entry is too expensive and I cannot guess what load in terms of number of entries per second would be. Idea was to group entries by count, create index, filter and stream remaining tuples for further processing.As a sample application if we replace lucene indexing with something like String's 'contains' method source would look like this: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();And I need lets say to window tuples and preserve only those which value.contains("3"). There are no grouping by key since basically all keys are different. I might not know everything about flink yet but for this particular example - does what you were saying make sense? Thanks! Kostya On Thu, Apr 21, 2016 at 11:02 AM, Aljoscha Krettek <[hidden email]> wrote:
|
Hi, I'm afraid I don't understand your use case yet. In you example you want to preserve only the elements where the string value contains a "3"? This can be done using a filter, as in source.filter( value -> value.f1.contains("3") ) This is probably too easy, though, and I'm misunderstanding the problem. Cheers, Aljoscha On Thu, 21 Apr 2016 at 18:26 Kostya Kulagin <[hidden email]> wrote:
|
No problems at all, there is not much flink people and a lot of asking guys - it should be hard to understand each person's issues :) Yes, it is not as easy as 'contains' operator: I need to collect some amount of tuples in order to create a in-memory lucene index. After that I will filter entries basing on some predefined query.So in a simplified case - -> for a window of tuples (preferably based on elements count) -> apply some operation to all elements in a window (create an index in my case, but lets say strings concatenation would work as well, i.e any operation that involves all window's tuples and produces some resulting data would work) -> filter each of this window's elements basing on resulting data of this all-window-elements operation -> emit filtered tuples On Fri, Apr 22, 2016 at 9:27 AM, Aljoscha Krettek <[hidden email]> wrote:
|
In reply to this post by Aljoscha Krettek
I was trying to implement this (force flink to handle all values from input) but had no success... Probably I am not getting smth with flink windowing mechanismI've created my 'finishing' trigger which is basically a copy of purging trigger But was not able to make it work: On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <[hidden email]> wrote:
|
I finally was able to do that. Kinda ugly, but works: https://gist.github.com/krolen/ed1344e4d7be5b2116061685268651f5 On Fri, Apr 22, 2016 at 6:14 PM, Konstantin Kulagin <[hidden email]> wrote:
|
Yes, this looks correct for a Counting Trigger that flushes when the sources finish. Could you also solve your filtering problem with this or is this still an open issue? Cheers, Aljoscha On Sat, 23 Apr 2016 at 16:57 Konstantin Kulagin <[hidden email]> wrote:
|
Thanks! Now I can call myself a super flink developer :) Thanks for your help! On Mon, Apr 25, 2016 at 9:26 AM, Aljoscha Krettek <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |