Values are missing, probably due parallelism?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Values are missing, probably due parallelism?

Kostya Kulagin
I think it has smth to do with parallelism and I probably do not have clear understanding how parallelism works in flink but in this example:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 29).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
for (Long value : values) {
if (value % 3 == 0) {
out.collect(value);
}
}
}
}).print();

env.execute("yoyoyo");
Why my output is like this:

4> 9
1> 0
1> 12
3> 6
3> 18
2> 3
2> 15

? I.e. where id s value of 24 for example? I expect to see it. What am I doing wrong?
Reply | Threaded
Open this post in threaded view
|

Re: Values are missing, probably due parallelism?

Aljoscha Krettek
Hi,
this is related to your other question about count windows. The source emits 29 values so we only have two count-windows with 10 elements each. The last window is never triggered.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin <[hidden email]> wrote:
I think it has smth to do with parallelism and I probably do not have clear understanding how parallelism works in flink but in this example:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 29).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
for (Long value : values) {
if (value % 3 == 0) {
out.collect(value);
}
}
}
}).print();

env.execute("yoyoyo");
Why my output is like this:

4> 9
1> 0
1> 12
3> 6
3> 18
2> 3
2> 15

? I.e. where id s value of 24 for example? I expect to see it. What am I doing wrong?
Reply | Threaded
Open this post in threaded view
|

Re: Values are missing, probably due parallelism?

Kostya Kulagin

Actually this is not true - the source emits 30 values since it is started with 0. If I change 29 to 33 result will be the same.
I can get all values if I play with parallelism. I.e putting parallel 1 before print.
Or if I change 29 to 39 ( I have 4 cors)
I can guess that there is smth wrong with threads. BTW in this case how threads are created and how data flows between?

On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" <[hidden email]> wrote:
Hi,
this is related to your other question about count windows. The source emits 29 values so we only have two count-windows with 10 elements each. The last window is never triggered.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin <[hidden email]> wrote:
I think it has smth to do with parallelism and I probably do not have clear understanding how parallelism works in flink but in this example:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 29).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
for (Long value : values) {
if (value % 3 == 0) {
out.collect(value);
}
}
}
}).print();

env.execute("yoyoyo");
Why my output is like this:

4> 9
1> 0
1> 12
3> 6
3> 18
2> 3
2> 15

? I.e. where id s value of 24 for example? I expect to see it. What am I doing wrong?
Reply | Threaded
Open this post in threaded view
|

Re: Values are missing, probably due parallelism?

Aljoscha Krettek
Hi,
which version of Flink are you using? Maybe there is a bug. I've tested it on the git master (1.1-SNAPSHOT) and it works fine with varying degrees of parallelism if I change the source to emit 30 elements:
LongStream.range(0, 30).forEach(ctx::collect);

(The second argument of LongStream.range(start, end) is exclusive)

Cheers,
Aljoscha



On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin <[hidden email]> wrote:

Actually this is not true - the source emits 30 values since it is started with 0. If I change 29 to 33 result will be the same.
I can get all values if I play with parallelism. I.e putting parallel 1 before print.
Or if I change 29 to 39 ( I have 4 cors)
I can guess that there is smth wrong with threads. BTW in this case how threads are created and how data flows between?

On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" <[hidden email]> wrote:
Hi,
this is related to your other question about count windows. The source emits 29 values so we only have two count-windows with 10 elements each. The last window is never triggered.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin <[hidden email]> wrote:
I think it has smth to do with parallelism and I probably do not have clear understanding how parallelism works in flink but in this example:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 29).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
for (Long value : values) {
if (value % 3 == 0) {
out.collect(value);
}
}
}
}).print();

env.execute("yoyoyo");
Why my output is like this:

4> 9
1> 0
1> 12
3> 6
3> 18
2> 3
2> 15

? I.e. where id s value of 24 for example? I expect to see it. What am I doing wrong?
Reply | Threaded
Open this post in threaded view
|

Re: Values are missing, probably due parallelism?

Kostya Kulagin
First of all you are right about number of elements, my bad and sorry for the confusion, I need to be better in calculations :)

However: if I change parallelism to. lets say 2 in windowing, i.e. instead of (of course I changed 29 to 30 as well :) )
}).print();

put

}).setParallelism(2).print();

at the very bottom - I am getting:

3> 15
3> 12
2> 9
2> 6
4> 18
04/21/2016 07:47:08 Sink: Unnamed(2/4) switched to FINISHED
04/21/2016 07:47:08 Source: Custom Source(1/1) switched to FINISHED
04/21/2016 07:47:08 Sink: Unnamed(4/4) switched to FINISHED
04/21/2016 07:47:08 Sink: Unnamed(3/4) switched to FINISHED
04/21/2016 07:47:08 TriggerWindow(GlobalWindows(), PurgingTrigger(CountTrigger(10)), AllWindowedStream.apply(AllWindowedStream.java:230))(2/2) switched to FINISHED
04/21/2016 07:47:08 TriggerWindow(GlobalWindows(), PurgingTrigger(CountTrigger(10)), AllWindowedStream.apply(AllWindowedStream.java:230))(1/2) switched to FINISHED
1> 3
1> 0
With default setting for parallelism it works fine, same as with value 3 and 1.

With 2, 4+ it does not work. With 4+ it simply prints nothing. I.e. it might be smth with how threads are finishing their execution?

I am using the latest prod version I've found in maven: 1.0.1.
Can snapshot versions be used in prod? I mean how well tested are those?

I will try the same on master branch later today.

Thanks!
Kostya


On Thu, Apr 21, 2016 at 6:38 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
which version of Flink are you using? Maybe there is a bug. I've tested it on the git master (1.1-SNAPSHOT) and it works fine with varying degrees of parallelism if I change the source to emit 30 elements:
LongStream.range(0, 30).forEach(ctx::collect);

(The second argument of LongStream.range(start, end) is exclusive)

Cheers,
Aljoscha



On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin <[hidden email]> wrote:

Actually this is not true - the source emits 30 values since it is started with 0. If I change 29 to 33 result will be the same.
I can get all values if I play with parallelism. I.e putting parallel 1 before print.
Or if I change 29 to 39 ( I have 4 cors)
I can guess that there is smth wrong with threads. BTW in this case how threads are created and how data flows between?

On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" <[hidden email]> wrote:
Hi,
this is related to your other question about count windows. The source emits 29 values so we only have two count-windows with 10 elements each. The last window is never triggered.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin <[hidden email]> wrote:
I think it has smth to do with parallelism and I probably do not have clear understanding how parallelism works in flink but in this example:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 29).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
for (Long value : values) {
if (value % 3 == 0) {
out.collect(value);
}
}
}
}).print();

env.execute("yoyoyo");
Why my output is like this:

4> 9
1> 0
1> 12
3> 6
3> 18
2> 3
2> 15

? I.e. where id s value of 24 for example? I expect to see it. What am I doing wrong?

Reply | Threaded
Open this post in threaded view
|

Re: Values are missing, probably due parallelism?

Aljoscha Krettek
Hi,
no worries, I also had to read the doc to figure it out. :-)

I now see what the problem is. The .countWindowAll().apply() pattern creates a WindowOperator with parallelism of 1 because the "count all" only works if one instance of the window operator sees all elements. When manually changing the parallelism it essentially becomes a "count per parallel instance" window operation and the elements form the source with parallelism 1 get distributed round-robin to the parallel instances of the count-window operator. This means, that it will take more elements emitted from the source before each instance of the window fires. It's a bit confusing but Flink does not allow forcing the parallelism to 1 right now.

About using the snapshot version, I would suggest you don't use it if you don't absolutely need one of the features in there that is not yet released. The build are still pretty stable, however.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 13:53 Kostya Kulagin <[hidden email]> wrote:
First of all you are right about number of elements, my bad and sorry for the confusion, I need to be better in calculations :)

However: if I change parallelism to. lets say 2 in windowing, i.e. instead of (of course I changed 29 to 30 as well :) )
}).print();

put

}).setParallelism(2).print();

at the very bottom - I am getting:

3> 15
3> 12
2> 9
2> 6
4> 18
04/21/2016 07:47:08 Sink: Unnamed(2/4) switched to FINISHED
04/21/2016 07:47:08 Source: Custom Source(1/1) switched to FINISHED
04/21/2016 07:47:08 Sink: Unnamed(4/4) switched to FINISHED
04/21/2016 07:47:08 Sink: Unnamed(3/4) switched to FINISHED
04/21/2016 07:47:08 TriggerWindow(GlobalWindows(), PurgingTrigger(CountTrigger(10)), AllWindowedStream.apply(AllWindowedStream.java:230))(2/2) switched to FINISHED
04/21/2016 07:47:08 TriggerWindow(GlobalWindows(), PurgingTrigger(CountTrigger(10)), AllWindowedStream.apply(AllWindowedStream.java:230))(1/2) switched to FINISHED
1> 3
1> 0
With default setting for parallelism it works fine, same as with value 3 and 1.

With 2, 4+ it does not work. With 4+ it simply prints nothing. I.e. it might be smth with how threads are finishing their execution?

I am using the latest prod version I've found in maven: 1.0.1.
Can snapshot versions be used in prod? I mean how well tested are those?

I will try the same on master branch later today.

Thanks!
Kostya


On Thu, Apr 21, 2016 at 6:38 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
which version of Flink are you using? Maybe there is a bug. I've tested it on the git master (1.1-SNAPSHOT) and it works fine with varying degrees of parallelism if I change the source to emit 30 elements:
LongStream.range(0, 30).forEach(ctx::collect);

(The second argument of LongStream.range(start, end) is exclusive)

Cheers,
Aljoscha



On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin <[hidden email]> wrote:

Actually this is not true - the source emits 30 values since it is started with 0. If I change 29 to 33 result will be the same.
I can get all values if I play with parallelism. I.e putting parallel 1 before print.
Or if I change 29 to 39 ( I have 4 cors)
I can guess that there is smth wrong with threads. BTW in this case how threads are created and how data flows between?

On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" <[hidden email]> wrote:
Hi,
this is related to your other question about count windows. The source emits 29 values so we only have two count-windows with 10 elements each. The last window is never triggered.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin <[hidden email]> wrote:
I think it has smth to do with parallelism and I probably do not have clear understanding how parallelism works in flink but in this example:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 29).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
for (Long value : values) {
if (value % 3 == 0) {
out.collect(value);
}
}
}
}).print();

env.execute("yoyoyo");
Why my output is like this:

4> 9
1> 0
1> 12
3> 6
3> 18
2> 3
2> 15

? I.e. where id s value of 24 for example? I expect to see it. What am I doing wrong?

Reply | Threaded
Open this post in threaded view
|

Re: Values are missing, probably due parallelism?

Kostya Kulagin
Thanks, so you were right and it is really connected to not-finishing windows problem I've mentioned in the other post.
I don't really need parallelism of 1 for windows - I expect operation on windows be pretty expensive and I like an idea that I can "parallelize" it.

Thanks for the explanation!

On Thu, Apr 21, 2016 at 8:06 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
no worries, I also had to read the doc to figure it out. :-)

I now see what the problem is. The .countWindowAll().apply() pattern creates a WindowOperator with parallelism of 1 because the "count all" only works if one instance of the window operator sees all elements. When manually changing the parallelism it essentially becomes a "count per parallel instance" window operation and the elements form the source with parallelism 1 get distributed round-robin to the parallel instances of the count-window operator. This means, that it will take more elements emitted from the source before each instance of the window fires. It's a bit confusing but Flink does not allow forcing the parallelism to 1 right now.

About using the snapshot version, I would suggest you don't use it if you don't absolutely need one of the features in there that is not yet released. The build are still pretty stable, however.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 13:53 Kostya Kulagin <[hidden email]> wrote:
First of all you are right about number of elements, my bad and sorry for the confusion, I need to be better in calculations :)

However: if I change parallelism to. lets say 2 in windowing, i.e. instead of (of course I changed 29 to 30 as well :) )
}).print();

put

}).setParallelism(2).print();

at the very bottom - I am getting:

3> 15
3> 12
2> 9
2> 6
4> 18
04/21/2016 07:47:08 Sink: Unnamed(2/4) switched to FINISHED
04/21/2016 07:47:08 Source: Custom Source(1/1) switched to FINISHED
04/21/2016 07:47:08 Sink: Unnamed(4/4) switched to FINISHED
04/21/2016 07:47:08 Sink: Unnamed(3/4) switched to FINISHED
04/21/2016 07:47:08 TriggerWindow(GlobalWindows(), PurgingTrigger(CountTrigger(10)), AllWindowedStream.apply(AllWindowedStream.java:230))(2/2) switched to FINISHED
04/21/2016 07:47:08 TriggerWindow(GlobalWindows(), PurgingTrigger(CountTrigger(10)), AllWindowedStream.apply(AllWindowedStream.java:230))(1/2) switched to FINISHED
1> 3
1> 0
With default setting for parallelism it works fine, same as with value 3 and 1.

With 2, 4+ it does not work. With 4+ it simply prints nothing. I.e. it might be smth with how threads are finishing their execution?

I am using the latest prod version I've found in maven: 1.0.1.
Can snapshot versions be used in prod? I mean how well tested are those?

I will try the same on master branch later today.

Thanks!
Kostya


On Thu, Apr 21, 2016 at 6:38 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
which version of Flink are you using? Maybe there is a bug. I've tested it on the git master (1.1-SNAPSHOT) and it works fine with varying degrees of parallelism if I change the source to emit 30 elements:
LongStream.range(0, 30).forEach(ctx::collect);

(The second argument of LongStream.range(start, end) is exclusive)

Cheers,
Aljoscha



On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin <[hidden email]> wrote:

Actually this is not true - the source emits 30 values since it is started with 0. If I change 29 to 33 result will be the same.
I can get all values if I play with parallelism. I.e putting parallel 1 before print.
Or if I change 29 to 39 ( I have 4 cors)
I can guess that there is smth wrong with threads. BTW in this case how threads are created and how data flows between?

On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" <[hidden email]> wrote:
Hi,
this is related to your other question about count windows. The source emits 29 values so we only have two count-windows with 10 elements each. The last window is never triggered.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin <[hidden email]> wrote:
I think it has smth to do with parallelism and I probably do not have clear understanding how parallelism works in flink but in this example:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 29).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
for (Long value : values) {
if (value % 3 == 0) {
out.collect(value);
}
}
}
}).print();

env.execute("yoyoyo");
Why my output is like this:

4> 9
1> 0
1> 12
3> 6
3> 18
2> 3
2> 15

? I.e. where id s value of 24 for example? I expect to see it. What am I doing wrong?