Count windows missing last elements?

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Count windows missing last elements?

Kostya Kulagin
I have a pretty big but final stream and I need to be able to window it by number of elements.
In this case from my observations flink can 'skip' the latest chunk of data if it has lower amount of elements than window size:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 35).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
System.out.println(Joiner.on(',').join(values));
}
}).print();

env.execute("yoyoyo");

Output:
0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19
20,21,22,23,24,25,26,27,28,29

I.e. elements from 10 to 35 are not being processed.

Does it make sense to have: count OR timeout window which will evict new window when number of elements reach a threshold OR collecting timeout occurs?
Reply | Threaded
Open this post in threaded view
|

Re: Count windows missing last elements?

Aljoscha Krettek
Hi,
yes, you can achieve this by writing a custom Trigger that can trigger both on the count or after a long-enough timeout. It would be a combination of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you could look to those to get started.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <[hidden email]> wrote:
I have a pretty big but final stream and I need to be able to window it by number of elements.
In this case from my observations flink can 'skip' the latest chunk of data if it has lower amount of elements than window size:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 35).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
System.out.println(Joiner.on(',').join(values));
}
}).print();

env.execute("yoyoyo");

Output:
0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19
20,21,22,23,24,25,26,27,28,29

I.e. elements from 10 to 35 are not being processed.

Does it make sense to have: count OR timeout window which will evict new window when number of elements reach a threshold OR collecting timeout occurs?
Reply | Threaded
Open this post in threaded view
|

Re: Count windows missing last elements?

Kostya Kulagin
Thanks,

I wonder wouldn't it be good to have a built-in such functionality. At least when incoming stream is finished - flush remaining elements.

On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, you can achieve this by writing a custom Trigger that can trigger both on the count or after a long-enough timeout. It would be a combination of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you could look to those to get started.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <[hidden email]> wrote:
I have a pretty big but final stream and I need to be able to window it by number of elements.
In this case from my observations flink can 'skip' the latest chunk of data if it has lower amount of elements than window size:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 35).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
System.out.println(Joiner.on(',').join(values));
}
}).print();

env.execute("yoyoyo");

Output:
0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19
20,21,22,23,24,25,26,27,28,29

I.e. elements from 10 to 35 are not being processed.

Does it make sense to have: count OR timeout window which will evict new window when number of elements reach a threshold OR collecting timeout occurs?

Reply | Threaded
Open this post in threaded view
|

Re: Count windows missing last elements?

Aljoscha Krettek
People have wondered about that a few times, yes. My opinion is that a stream is potentially infinite and processing only stops for anomalous reasons: when the job crashes, when stopping a job to later redeploy it. In those cases you would not want to flush out your data but keep them and restart from the same state when the job is restarted.

You can implement the behavior by writing a custom Trigger that behaves like the count trigger but also fires when receiving a Long.MAX_VALUE watermark. A watermark of Long.MAX_VALUE signifies that a source has stopped processing for natural reasons.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <[hidden email]> wrote:
Thanks,

I wonder wouldn't it be good to have a built-in such functionality. At least when incoming stream is finished - flush remaining elements.

On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, you can achieve this by writing a custom Trigger that can trigger both on the count or after a long-enough timeout. It would be a combination of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you could look to those to get started.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <[hidden email]> wrote:
I have a pretty big but final stream and I need to be able to window it by number of elements.
In this case from my observations flink can 'skip' the latest chunk of data if it has lower amount of elements than window size:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 35).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
System.out.println(Joiner.on(',').join(values));
}
}).print();

env.execute("yoyoyo");

Output:
0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19
20,21,22,23,24,25,26,27,28,29

I.e. elements from 10 to 35 are not being processed.

Does it make sense to have: count OR timeout window which will evict new window when number of elements reach a threshold OR collecting timeout occurs?

Reply | Threaded
Open this post in threaded view
|

Re: Count windows missing last elements?

Kostya Kulagin
Maybe if it is not the first time it worth considering adding this thing as an option? ;-)

My usecase - I have a pretty big amount of data basically for ETL. It is finite but it is big. I see it more as a stream not as a dataset. Also I would re-use the same code for infinite stream later...
And I do not much care about exact window size - it is just for performance reasons I create a windows.

Anyways - that you for the responses!


On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <[hidden email]> wrote:
People have wondered about that a few times, yes. My opinion is that a stream is potentially infinite and processing only stops for anomalous reasons: when the job crashes, when stopping a job to later redeploy it. In those cases you would not want to flush out your data but keep them and restart from the same state when the job is restarted.

You can implement the behavior by writing a custom Trigger that behaves like the count trigger but also fires when receiving a Long.MAX_VALUE watermark. A watermark of Long.MAX_VALUE signifies that a source has stopped processing for natural reasons.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <[hidden email]> wrote:
Thanks,

I wonder wouldn't it be good to have a built-in such functionality. At least when incoming stream is finished - flush remaining elements.

On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, you can achieve this by writing a custom Trigger that can trigger both on the count or after a long-enough timeout. It would be a combination of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you could look to those to get started.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <[hidden email]> wrote:
I have a pretty big but final stream and I need to be able to window it by number of elements.
In this case from my observations flink can 'skip' the latest chunk of data if it has lower amount of elements than window size:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 35).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
System.out.println(Joiner.on(',').join(values));
}
}).print();

env.execute("yoyoyo");

Output:
0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19
20,21,22,23,24,25,26,27,28,29

I.e. elements from 10 to 35 are not being processed.

Does it make sense to have: count OR timeout window which will evict new window when number of elements reach a threshold OR collecting timeout occurs?


Reply | Threaded
Open this post in threaded view
|

Re: Count windows missing last elements?

Aljoscha Krettek
Hi,
if you are doing the windows not for their actual semantics I would suggest not using count based windows and also not using the *All windows. The *All windows are all non-parallel, i.e. you always only get one parallel instance of your window operator even if you have a huge cluster.

Also, in most cases it is better to not use a plain WindowFunction with apply because all elements have to be buffered so that they can be passed as an Iterable, Iterable<Long> in your example. If you can, I would suggest to use a ReduceFunction or FoldFunction or an apply() with an incremental aggregation function: apply(ReduceFunction, WindowFunction) or apply(FoldFunction, WindowFunction). These allow incremental aggregation of the result as elements arrive and don't require buffering of all elements until the window fires.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 16:53 Kostya Kulagin <[hidden email]> wrote:
Maybe if it is not the first time it worth considering adding this thing as an option? ;-)

My usecase - I have a pretty big amount of data basically for ETL. It is finite but it is big. I see it more as a stream not as a dataset. Also I would re-use the same code for infinite stream later...
And I do not much care about exact window size - it is just for performance reasons I create a windows.

Anyways - that you for the responses!


On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <[hidden email]> wrote:
People have wondered about that a few times, yes. My opinion is that a stream is potentially infinite and processing only stops for anomalous reasons: when the job crashes, when stopping a job to later redeploy it. In those cases you would not want to flush out your data but keep them and restart from the same state when the job is restarted.

You can implement the behavior by writing a custom Trigger that behaves like the count trigger but also fires when receiving a Long.MAX_VALUE watermark. A watermark of Long.MAX_VALUE signifies that a source has stopped processing for natural reasons.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <[hidden email]> wrote:
Thanks,

I wonder wouldn't it be good to have a built-in such functionality. At least when incoming stream is finished - flush remaining elements.

On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, you can achieve this by writing a custom Trigger that can trigger both on the count or after a long-enough timeout. It would be a combination of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you could look to those to get started.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <[hidden email]> wrote:
I have a pretty big but final stream and I need to be able to window it by number of elements.
In this case from my observations flink can 'skip' the latest chunk of data if it has lower amount of elements than window size:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 35).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
System.out.println(Joiner.on(',').join(values));
}
}).print();

env.execute("yoyoyo");

Output:
0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19
20,21,22,23,24,25,26,27,28,29

I.e. elements from 10 to 35 are not being processed.

Does it make sense to have: count OR timeout window which will evict new window when number of elements reach a threshold OR collecting timeout occurs?


Reply | Threaded
Open this post in threaded view
|

Re: Count windows missing last elements?

Kostya Kulagin
Thanks for reply.

Maybe I would need some advise in this case. My situation: we have a stream of data, generally speaking <Long;String> tuples where long is a unique key (ie there are no tuples with the same key)

I need to filter out all tuples that do not match certain lucene query.

Creating lucene index on one entry is too expensive and I cannot guess what load in terms of number of entries per second would be. Idea was to group entries by count, create index, filter and stream remaining tuples for further processing.

As a sample application if we replace lucene indexing with something like String's 'contains' method source would look like this:


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Long, String>> source = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
@Override
public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
LongStream.range(0, 30).forEach(l -> {
ctx.collect(Tuple2.of(l, "This is " + l));
});
}

@Override
public void cancel() {

}
});

And I need lets say to window tuples and preserve only those which
value.contains("3").
There are no grouping by key since basically all keys are different. I might not know everything about flink yet but for this particular example - does what you were saying make sense?


Thanks!
Kostya






On Thu, Apr 21, 2016 at 11:02 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
if you are doing the windows not for their actual semantics I would suggest not using count based windows and also not using the *All windows. The *All windows are all non-parallel, i.e. you always only get one parallel instance of your window operator even if you have a huge cluster.

Also, in most cases it is better to not use a plain WindowFunction with apply because all elements have to be buffered so that they can be passed as an Iterable, Iterable<Long> in your example. If you can, I would suggest to use a ReduceFunction or FoldFunction or an apply() with an incremental aggregation function: apply(ReduceFunction, WindowFunction) or apply(FoldFunction, WindowFunction). These allow incremental aggregation of the result as elements arrive and don't require buffering of all elements until the window fires.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 16:53 Kostya Kulagin <[hidden email]> wrote:
Maybe if it is not the first time it worth considering adding this thing as an option? ;-)

My usecase - I have a pretty big amount of data basically for ETL. It is finite but it is big. I see it more as a stream not as a dataset. Also I would re-use the same code for infinite stream later...
And I do not much care about exact window size - it is just for performance reasons I create a windows.

Anyways - that you for the responses!


On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <[hidden email]> wrote:
People have wondered about that a few times, yes. My opinion is that a stream is potentially infinite and processing only stops for anomalous reasons: when the job crashes, when stopping a job to later redeploy it. In those cases you would not want to flush out your data but keep them and restart from the same state when the job is restarted.

You can implement the behavior by writing a custom Trigger that behaves like the count trigger but also fires when receiving a Long.MAX_VALUE watermark. A watermark of Long.MAX_VALUE signifies that a source has stopped processing for natural reasons.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <[hidden email]> wrote:
Thanks,

I wonder wouldn't it be good to have a built-in such functionality. At least when incoming stream is finished - flush remaining elements.

On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, you can achieve this by writing a custom Trigger that can trigger both on the count or after a long-enough timeout. It would be a combination of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you could look to those to get started.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <[hidden email]> wrote:
I have a pretty big but final stream and I need to be able to window it by number of elements.
In this case from my observations flink can 'skip' the latest chunk of data if it has lower amount of elements than window size:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 35).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
System.out.println(Joiner.on(',').join(values));
}
}).print();

env.execute("yoyoyo");

Output:
0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19
20,21,22,23,24,25,26,27,28,29

I.e. elements from 10 to 35 are not being processed.

Does it make sense to have: count OR timeout window which will evict new window when number of elements reach a threshold OR collecting timeout occurs?



Reply | Threaded
Open this post in threaded view
|

Re: Count windows missing last elements?

Aljoscha Krettek
Hi,
I'm afraid I don't understand your use case yet. In you example you want to preserve only the elements where the string value contains a "3"? This can be done using a filter, as in

source.filter( value -> value.f1.contains("3") )

This is probably too easy, though, and I'm misunderstanding the problem.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 18:26 Kostya Kulagin <[hidden email]> wrote:
Thanks for reply.

Maybe I would need some advise in this case. My situation: we have a stream of data, generally speaking <Long;String> tuples where long is a unique key (ie there are no tuples with the same key)

I need to filter out all tuples that do not match certain lucene query.

Creating lucene index on one entry is too expensive and I cannot guess what load in terms of number of entries per second would be. Idea was to group entries by count, create index, filter and stream remaining tuples for further processing.

As a sample application if we replace lucene indexing with something like String's 'contains' method source would look like this:


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Long, String>> source = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
@Override
public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
LongStream.range(0, 30).forEach(l -> {
ctx.collect(Tuple2.of(l, "This is " + l));

});
}

@Override
public void cancel() {

}
});

And I need lets say to window tuples and preserve only those which 
value.contains("3").
There are no grouping by key since basically all keys are different. I might not know everything about flink yet but for this particular example - does what you were saying make sense?


Thanks!
Kostya






On Thu, Apr 21, 2016 at 11:02 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
if you are doing the windows not for their actual semantics I would suggest not using count based windows and also not using the *All windows. The *All windows are all non-parallel, i.e. you always only get one parallel instance of your window operator even if you have a huge cluster.

Also, in most cases it is better to not use a plain WindowFunction with apply because all elements have to be buffered so that they can be passed as an Iterable, Iterable<Long> in your example. If you can, I would suggest to use a ReduceFunction or FoldFunction or an apply() with an incremental aggregation function: apply(ReduceFunction, WindowFunction) or apply(FoldFunction, WindowFunction). These allow incremental aggregation of the result as elements arrive and don't require buffering of all elements until the window fires.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 16:53 Kostya Kulagin <[hidden email]> wrote:
Maybe if it is not the first time it worth considering adding this thing as an option? ;-)

My usecase - I have a pretty big amount of data basically for ETL. It is finite but it is big. I see it more as a stream not as a dataset. Also I would re-use the same code for infinite stream later...
And I do not much care about exact window size - it is just for performance reasons I create a windows.

Anyways - that you for the responses!


On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <[hidden email]> wrote:
People have wondered about that a few times, yes. My opinion is that a stream is potentially infinite and processing only stops for anomalous reasons: when the job crashes, when stopping a job to later redeploy it. In those cases you would not want to flush out your data but keep them and restart from the same state when the job is restarted.

You can implement the behavior by writing a custom Trigger that behaves like the count trigger but also fires when receiving a Long.MAX_VALUE watermark. A watermark of Long.MAX_VALUE signifies that a source has stopped processing for natural reasons.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <[hidden email]> wrote:
Thanks,

I wonder wouldn't it be good to have a built-in such functionality. At least when incoming stream is finished - flush remaining elements.

On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, you can achieve this by writing a custom Trigger that can trigger both on the count or after a long-enough timeout. It would be a combination of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you could look to those to get started.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <[hidden email]> wrote:
I have a pretty big but final stream and I need to be able to window it by number of elements.
In this case from my observations flink can 'skip' the latest chunk of data if it has lower amount of elements than window size:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 35).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
System.out.println(Joiner.on(',').join(values));
}
}).print();

env.execute("yoyoyo");

Output:
0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19
20,21,22,23,24,25,26,27,28,29

I.e. elements from 10 to 35 are not being processed.

Does it make sense to have: count OR timeout window which will evict new window when number of elements reach a threshold OR collecting timeout occurs?



Reply | Threaded
Open this post in threaded view
|

Re: Count windows missing last elements?

Kostya Kulagin
No problems at all, there is not much flink people and a lot of asking guys - it should be hard to understand each person's issues :)


Yes, it is not as easy as 'contains' operator: I need to collect some amount of tuples in order to create a in-memory lucene index. After that I will filter entries basing on some predefined query.

So in a simplified case -
   -> for a window of tuples (preferably based on elements count)
   -> apply some operation to all elements in a window (create an index in my case, but lets say strings concatenation would work as well, i.e any operation that involves all window's tuples and produces some resulting data would work)
  -> filter each of this window's elements basing on resulting data of this all-window-elements operation
  -> emit filtered tuples

It might be a bit hard to understand. If it is - nevermind.



On Fri, Apr 22, 2016 at 9:27 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I'm afraid I don't understand your use case yet. In you example you want to preserve only the elements where the string value contains a "3"? This can be done using a filter, as in

source.filter( value -> value.f1.contains("3") )

This is probably too easy, though, and I'm misunderstanding the problem.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 18:26 Kostya Kulagin <[hidden email]> wrote:
Thanks for reply.

Maybe I would need some advise in this case. My situation: we have a stream of data, generally speaking <Long;String> tuples where long is a unique key (ie there are no tuples with the same key)

I need to filter out all tuples that do not match certain lucene query.

Creating lucene index on one entry is too expensive and I cannot guess what load in terms of number of entries per second would be. Idea was to group entries by count, create index, filter and stream remaining tuples for further processing.

As a sample application if we replace lucene indexing with something like String's 'contains' method source would look like this:


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Long, String>> source = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
@Override
public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
LongStream.range(0, 30).forEach(l -> {
ctx.collect(Tuple2.of(l, "This is " + l));

});
}

@Override
public void cancel() {

}
});

And I need lets say to window tuples and preserve only those which 
value.contains("3").
There are no grouping by key since basically all keys are different. I might not know everything about flink yet but for this particular example - does what you were saying make sense?


Thanks!
Kostya






On Thu, Apr 21, 2016 at 11:02 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
if you are doing the windows not for their actual semantics I would suggest not using count based windows and also not using the *All windows. The *All windows are all non-parallel, i.e. you always only get one parallel instance of your window operator even if you have a huge cluster.

Also, in most cases it is better to not use a plain WindowFunction with apply because all elements have to be buffered so that they can be passed as an Iterable, Iterable<Long> in your example. If you can, I would suggest to use a ReduceFunction or FoldFunction or an apply() with an incremental aggregation function: apply(ReduceFunction, WindowFunction) or apply(FoldFunction, WindowFunction). These allow incremental aggregation of the result as elements arrive and don't require buffering of all elements until the window fires.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 16:53 Kostya Kulagin <[hidden email]> wrote:
Maybe if it is not the first time it worth considering adding this thing as an option? ;-)

My usecase - I have a pretty big amount of data basically for ETL. It is finite but it is big. I see it more as a stream not as a dataset. Also I would re-use the same code for infinite stream later...
And I do not much care about exact window size - it is just for performance reasons I create a windows.

Anyways - that you for the responses!


On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <[hidden email]> wrote:
People have wondered about that a few times, yes. My opinion is that a stream is potentially infinite and processing only stops for anomalous reasons: when the job crashes, when stopping a job to later redeploy it. In those cases you would not want to flush out your data but keep them and restart from the same state when the job is restarted.

You can implement the behavior by writing a custom Trigger that behaves like the count trigger but also fires when receiving a Long.MAX_VALUE watermark. A watermark of Long.MAX_VALUE signifies that a source has stopped processing for natural reasons.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <[hidden email]> wrote:
Thanks,

I wonder wouldn't it be good to have a built-in such functionality. At least when incoming stream is finished - flush remaining elements.

On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, you can achieve this by writing a custom Trigger that can trigger both on the count or after a long-enough timeout. It would be a combination of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you could look to those to get started.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <[hidden email]> wrote:
I have a pretty big but final stream and I need to be able to window it by number of elements.
In this case from my observations flink can 'skip' the latest chunk of data if it has lower amount of elements than window size:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 35).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
System.out.println(Joiner.on(',').join(values));
}
}).print();

env.execute("yoyoyo");

Output:
0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19
20,21,22,23,24,25,26,27,28,29

I.e. elements from 10 to 35 are not being processed.

Does it make sense to have: count OR timeout window which will evict new window when number of elements reach a threshold OR collecting timeout occurs?




Reply | Threaded
Open this post in threaded view
|

Re: Count windows missing last elements?

Kostya Kulagin
In reply to this post by Aljoscha Krettek
I was trying to implement this (force flink to handle all values from input) but had no success...
Probably I am not getting smth with flink windowing mechanism
I've created my 'finishing' trigger which is basically a copy of purging trigger

But was not able to make it work:

https://gist.github.com/krolen/9e6ba8b14c54554bfbc10fdfa6fe7308

I was never able to see numbers from 30 to 34 in result.
What am I doing wrong?


On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <[hidden email]> wrote:
People have wondered about that a few times, yes. My opinion is that a stream is potentially infinite and processing only stops for anomalous reasons: when the job crashes, when stopping a job to later redeploy it. In those cases you would not want to flush out your data but keep them and restart from the same state when the job is restarted.

You can implement the behavior by writing a custom Trigger that behaves like the count trigger but also fires when receiving a Long.MAX_VALUE watermark. A watermark of Long.MAX_VALUE signifies that a source has stopped processing for natural reasons.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <[hidden email]> wrote:
Thanks,

I wonder wouldn't it be good to have a built-in such functionality. At least when incoming stream is finished - flush remaining elements.

On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, you can achieve this by writing a custom Trigger that can trigger both on the count or after a long-enough timeout. It would be a combination of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you could look to those to get started.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <[hidden email]> wrote:
I have a pretty big but final stream and I need to be able to window it by number of elements.
In this case from my observations flink can 'skip' the latest chunk of data if it has lower amount of elements than window size:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 35).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
System.out.println(Joiner.on(',').join(values));
}
}).print();

env.execute("yoyoyo");

Output:
0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19
20,21,22,23,24,25,26,27,28,29

I.e. elements from 10 to 35 are not being processed.

Does it make sense to have: count OR timeout window which will evict new window when number of elements reach a threshold OR collecting timeout occurs?


Reply | Threaded
Open this post in threaded view
|

Re: Count windows missing last elements?

Kostya Kulagin
I finally was able to do that. Kinda ugly, but works:

https://gist.github.com/krolen/ed1344e4d7be5b2116061685268651f5



On Fri, Apr 22, 2016 at 6:14 PM, Konstantin Kulagin <[hidden email]> wrote:
I was trying to implement this (force flink to handle all values from input) but had no success...
Probably I am not getting smth with flink windowing mechanism
I've created my 'finishing' trigger which is basically a copy of purging trigger

But was not able to make it work:

https://gist.github.com/krolen/9e6ba8b14c54554bfbc10fdfa6fe7308

I was never able to see numbers from 30 to 34 in result.
What am I doing wrong?


On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <[hidden email]> wrote:
People have wondered about that a few times, yes. My opinion is that a stream is potentially infinite and processing only stops for anomalous reasons: when the job crashes, when stopping a job to later redeploy it. In those cases you would not want to flush out your data but keep them and restart from the same state when the job is restarted.

You can implement the behavior by writing a custom Trigger that behaves like the count trigger but also fires when receiving a Long.MAX_VALUE watermark. A watermark of Long.MAX_VALUE signifies that a source has stopped processing for natural reasons.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <[hidden email]> wrote:
Thanks,

I wonder wouldn't it be good to have a built-in such functionality. At least when incoming stream is finished - flush remaining elements.

On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, you can achieve this by writing a custom Trigger that can trigger both on the count or after a long-enough timeout. It would be a combination of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you could look to those to get started.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <[hidden email]> wrote:
I have a pretty big but final stream and I need to be able to window it by number of elements.
In this case from my observations flink can 'skip' the latest chunk of data if it has lower amount of elements than window size:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 35).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
System.out.println(Joiner.on(',').join(values));
}
}).print();

env.execute("yoyoyo");

Output:
0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19
20,21,22,23,24,25,26,27,28,29

I.e. elements from 10 to 35 are not being processed.

Does it make sense to have: count OR timeout window which will evict new window when number of elements reach a threshold OR collecting timeout occurs?



Reply | Threaded
Open this post in threaded view
|

Re: Count windows missing last elements?

Aljoscha Krettek
Yes, this looks correct for a Counting Trigger that flushes when the sources finish. Could you also solve your filtering problem with this or is this still an open issue?

Cheers,
Aljoscha

On Sat, 23 Apr 2016 at 16:57 Konstantin Kulagin <[hidden email]> wrote:
I finally was able to do that. Kinda ugly, but works:

https://gist.github.com/krolen/ed1344e4d7be5b2116061685268651f5



On Fri, Apr 22, 2016 at 6:14 PM, Konstantin Kulagin <[hidden email]> wrote:
I was trying to implement this (force flink to handle all values from input) but had no success...
Probably I am not getting smth with flink windowing mechanism
I've created my 'finishing' trigger which is basically a copy of purging trigger

But was not able to make it work:

https://gist.github.com/krolen/9e6ba8b14c54554bfbc10fdfa6fe7308

I was never able to see numbers from 30 to 34 in result.
What am I doing wrong?


On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <[hidden email]> wrote:
People have wondered about that a few times, yes. My opinion is that a stream is potentially infinite and processing only stops for anomalous reasons: when the job crashes, when stopping a job to later redeploy it. In those cases you would not want to flush out your data but keep them and restart from the same state when the job is restarted.

You can implement the behavior by writing a custom Trigger that behaves like the count trigger but also fires when receiving a Long.MAX_VALUE watermark. A watermark of Long.MAX_VALUE signifies that a source has stopped processing for natural reasons.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <[hidden email]> wrote:
Thanks,

I wonder wouldn't it be good to have a built-in such functionality. At least when incoming stream is finished - flush remaining elements.

On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, you can achieve this by writing a custom Trigger that can trigger both on the count or after a long-enough timeout. It would be a combination of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you could look to those to get started.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <[hidden email]> wrote:
I have a pretty big but final stream and I need to be able to window it by number of elements.
In this case from my observations flink can 'skip' the latest chunk of data if it has lower amount of elements than window size:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 35).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
System.out.println(Joiner.on(',').join(values));
}
}).print();

env.execute("yoyoyo");

Output:
0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19
20,21,22,23,24,25,26,27,28,29

I.e. elements from 10 to 35 are not being processed.

Does it make sense to have: count OR timeout window which will evict new window when number of elements reach a threshold OR collecting timeout occurs?



Reply | Threaded
Open this post in threaded view
|

Re: Count windows missing last elements?

Kostya Kulagin
Thanks!

Now I can call myself a super flink developer :)

As for the issue - I am still trying to figure out ways to do that. I've raised a question in this thread:

Thanks for your help!


On Mon, Apr 25, 2016 at 9:26 AM, Aljoscha Krettek <[hidden email]> wrote:
Yes, this looks correct for a Counting Trigger that flushes when the sources finish. Could you also solve your filtering problem with this or is this still an open issue?

Cheers,
Aljoscha

On Sat, 23 Apr 2016 at 16:57 Konstantin Kulagin <[hidden email]> wrote:
I finally was able to do that. Kinda ugly, but works:

https://gist.github.com/krolen/ed1344e4d7be5b2116061685268651f5



On Fri, Apr 22, 2016 at 6:14 PM, Konstantin Kulagin <[hidden email]> wrote:
I was trying to implement this (force flink to handle all values from input) but had no success...
Probably I am not getting smth with flink windowing mechanism
I've created my 'finishing' trigger which is basically a copy of purging trigger

But was not able to make it work:

https://gist.github.com/krolen/9e6ba8b14c54554bfbc10fdfa6fe7308

I was never able to see numbers from 30 to 34 in result.
What am I doing wrong?


On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <[hidden email]> wrote:
People have wondered about that a few times, yes. My opinion is that a stream is potentially infinite and processing only stops for anomalous reasons: when the job crashes, when stopping a job to later redeploy it. In those cases you would not want to flush out your data but keep them and restart from the same state when the job is restarted.

You can implement the behavior by writing a custom Trigger that behaves like the count trigger but also fires when receiving a Long.MAX_VALUE watermark. A watermark of Long.MAX_VALUE signifies that a source has stopped processing for natural reasons.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <[hidden email]> wrote:
Thanks,

I wonder wouldn't it be good to have a built-in such functionality. At least when incoming stream is finished - flush remaining elements.

On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, you can achieve this by writing a custom Trigger that can trigger both on the count or after a long-enough timeout. It would be a combination of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you could look to those to get started.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <[hidden email]> wrote:
I have a pretty big but final stream and I need to be able to window it by number of elements.
In this case from my observations flink can 'skip' the latest chunk of data if it has lower amount of elements than window size:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 35).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
System.out.println(Joiner.on(',').join(values));
}
}).print();

env.execute("yoyoyo");

Output:
0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19
20,21,22,23,24,25,26,27,28,29

I.e. elements from 10 to 35 are not being processed.

Does it make sense to have: count OR timeout window which will evict new window when number of elements reach a threshold OR collecting timeout occurs?