[ANNOUNCE] Apache Flink 1.3.3 released

classic Classic list List threaded Threaded
24 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior on filter, group and reduce DataSets

Fabian Hueske-2
Hi,

I've opened a pull request [1] that should fix the problem.
It would be great if you could try change and report back whether it fixes the problem.

Thank you,
Fabian

[1] https://github.com/apache/flink/pull/5742

2018-03-21 9:49 GMT+01:00 simone <[hidden email]>:

Hi all,

an update: following Stephan directives on how to diagnose the issue, making Person immutable, the problem does not occur.

Simone.


On 20/03/2018 20:20, Stephan Ewen wrote:
To diagnose that, can you please check the following:

  - Change the Person data type to be immutable (final fields, no setters, set fields in constructor instead). Does that make the problem go away?

  - Change the Person data type to not be a POJO by adding a dummy fields that is never used, but does not have a getter/setter. Does that make the problem go away?

If either of that is the case, it must be a mutability bug somewhere in either accidental object reuse or accidental serializer sharing.


On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <[hidden email]> wrote:
Hi Simone and Flavio,

I created FLINK-9031 [1] for this issue.
Please have a look and add any detail that you think could help to resolve the problem.

Thanks,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9031

2018-03-19 16:35 GMT+01:00 simone <[hidden email]>:

Hi Fabian,

This simple code reproduces the behavior -> https://github.com/xseris/Flink-test-union

Thanks, Simone.


On 19/03/2018 15:44, Fabian Hueske wrote:
Hmmm, I still don't see the problem.
IMO, the result should be correct for both plans. The data is replicated, filtered, reduced, and unioned.
There is nothing in between the filter and reduce, that could cause incorrect behavior.

The good thing is, the optimizer seems to be fine. The bad thing is, it is either the Flink runtime code or your functions.
Given that one plan produces good results, it might be the Flink runtime code.

Coming back to my previous question.
Can you provide a minimal program to reproduce the issue?

Thanks, Fabian

2018-03-19 15:15 GMT+01:00 Fabian Hueske <[hidden email]>:
Ah, thanks for the update!
I'll have a look at that.

2018-03-19 15:13 GMT+01:00 Fabian Hueske <[hidden email]>:
HI Simone,

Looking at the plan, I don't see why this should be happening. The pseudo code looks fine as well.
Any chance that you can create a minimal program to reproduce the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone <[hidden email]>:

Hi Fabian,

reuse is not enabled. I attach the plan of the execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:
Hi,

Union is actually a very simple operator (not even an operator in Flink terms). It just merges to inputs. There is no additional logic involved.
Therefore, it should also not emit records before either of both ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the data is reduced and emitted in a pipelined fashion, i.e., once the first record is reduced, it is forwarded into the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before the ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects, you have to be careful in how you implement your functions.
If no, can you share the plan (ExecutionEnvironment.getExecutionPlan()) that was generated for the program?

Thanks,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions



2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Any help on this? This thing is very strange..the "manual" union of the output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone <[hidden email]> wrote:

Sorry, I translated the code into pseudocode too fast. That is indeed an equals.


On 16/03/2018 15:58, Kien Truong wrote:

Hi,

Just a guest, but string compare in Java should be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:
subject.getField("field1") == "";













Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior on filter, group and reduce DataSets

Fabian Hueske-2
Hi,

That was a bit too early.
I found an issue with my approach. Will come back once I solved that.

Best, Fabian

2018-03-21 23:45 GMT+01:00 Fabian Hueske <[hidden email]>:
Hi,

I've opened a pull request [1] that should fix the problem.
It would be great if you could try change and report back whether it fixes the problem.

Thank you,
Fabian

[1] https://github.com/apache/flink/pull/5742

2018-03-21 9:49 GMT+01:00 simone <[hidden email]>:

Hi all,

an update: following Stephan directives on how to diagnose the issue, making Person immutable, the problem does not occur.

Simone.


On 20/03/2018 20:20, Stephan Ewen wrote:
To diagnose that, can you please check the following:

  - Change the Person data type to be immutable (final fields, no setters, set fields in constructor instead). Does that make the problem go away?

  - Change the Person data type to not be a POJO by adding a dummy fields that is never used, but does not have a getter/setter. Does that make the problem go away?

If either of that is the case, it must be a mutability bug somewhere in either accidental object reuse or accidental serializer sharing.


On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <[hidden email]> wrote:
Hi Simone and Flavio,

I created FLINK-9031 [1] for this issue.
Please have a look and add any detail that you think could help to resolve the problem.

Thanks,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9031

2018-03-19 16:35 GMT+01:00 simone <[hidden email]>:

Hi Fabian,

This simple code reproduces the behavior -> https://github.com/xseris/Flink-test-union

Thanks, Simone.


On 19/03/2018 15:44, Fabian Hueske wrote:
Hmmm, I still don't see the problem.
IMO, the result should be correct for both plans. The data is replicated, filtered, reduced, and unioned.
There is nothing in between the filter and reduce, that could cause incorrect behavior.

The good thing is, the optimizer seems to be fine. The bad thing is, it is either the Flink runtime code or your functions.
Given that one plan produces good results, it might be the Flink runtime code.

Coming back to my previous question.
Can you provide a minimal program to reproduce the issue?

Thanks, Fabian

2018-03-19 15:15 GMT+01:00 Fabian Hueske <[hidden email]>:
Ah, thanks for the update!
I'll have a look at that.

2018-03-19 15:13 GMT+01:00 Fabian Hueske <[hidden email]>:
HI Simone,

Looking at the plan, I don't see why this should be happening. The pseudo code looks fine as well.
Any chance that you can create a minimal program to reproduce the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone <[hidden email]>:

Hi Fabian,

reuse is not enabled. I attach the plan of the execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:
Hi,

Union is actually a very simple operator (not even an operator in Flink terms). It just merges to inputs. There is no additional logic involved.
Therefore, it should also not emit records before either of both ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the data is reduced and emitted in a pipelined fashion, i.e., once the first record is reduced, it is forwarded into the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before the ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects, you have to be careful in how you implement your functions.
If no, can you share the plan (ExecutionEnvironment.getExecutionPlan()) that was generated for the program?

Thanks,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions



2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Any help on this? This thing is very strange..the "manual" union of the output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone <[hidden email]> wrote:

Sorry, I translated the code into pseudocode too fast. That is indeed an equals.


On 16/03/2018 15:58, Kien Truong wrote:

Hi,

Just a guest, but string compare in Java should be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:
subject.getField("field1") == "";














Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior on filter, group and reduce DataSets

simone

Hi Fabian,

any update on this? Did you fix it?

Best, Simone.


On 22/03/2018 00:24, Fabian Hueske wrote:
Hi,

That was a bit too early.
I found an issue with my approach. Will come back once I solved that.

Best, Fabian

2018-03-21 23:45 GMT+01:00 Fabian Hueske <[hidden email]>:
Hi,

I've opened a pull request [1] that should fix the problem.
It would be great if you could try change and report back whether it fixes the problem.

Thank you,
Fabian

[1] https://github.com/apache/flink/pull/5742

2018-03-21 9:49 GMT+01:00 simone <[hidden email]>:

Hi all,

an update: following Stephan directives on how to diagnose the issue, making Person immutable, the problem does not occur.

Simone.


On 20/03/2018 20:20, Stephan Ewen wrote:
To diagnose that, can you please check the following:

  - Change the Person data type to be immutable (final fields, no setters, set fields in constructor instead). Does that make the problem go away?

  - Change the Person data type to not be a POJO by adding a dummy fields that is never used, but does not have a getter/setter. Does that make the problem go away?

If either of that is the case, it must be a mutability bug somewhere in either accidental object reuse or accidental serializer sharing.


On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <[hidden email]> wrote:
Hi Simone and Flavio,

I created FLINK-9031 [1] for this issue.
Please have a look and add any detail that you think could help to resolve the problem.

Thanks,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9031

2018-03-19 16:35 GMT+01:00 simone <[hidden email]>:

Hi Fabian,

This simple code reproduces the behavior -> https://github.com/xseris/Flink-test-union

Thanks, Simone.


On 19/03/2018 15:44, Fabian Hueske wrote:
Hmmm, I still don't see the problem.
IMO, the result should be correct for both plans. The data is replicated, filtered, reduced, and unioned.
There is nothing in between the filter and reduce, that could cause incorrect behavior.

The good thing is, the optimizer seems to be fine. The bad thing is, it is either the Flink runtime code or your functions.
Given that one plan produces good results, it might be the Flink runtime code.

Coming back to my previous question.
Can you provide a minimal program to reproduce the issue?

Thanks, Fabian

2018-03-19 15:15 GMT+01:00 Fabian Hueske <[hidden email]>:
Ah, thanks for the update!
I'll have a look at that.

2018-03-19 15:13 GMT+01:00 Fabian Hueske <[hidden email]>:
HI Simone,

Looking at the plan, I don't see why this should be happening. The pseudo code looks fine as well.
Any chance that you can create a minimal program to reproduce the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone <[hidden email]>:

Hi Fabian,

reuse is not enabled. I attach the plan of the execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:
Hi,

Union is actually a very simple operator (not even an operator in Flink terms). It just merges to inputs. There is no additional logic involved.
Therefore, it should also not emit records before either of both ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the data is reduced and emitted in a pipelined fashion, i.e., once the first record is reduced, it is forwarded into the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before the ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects, you have to be careful in how you implement your functions.
If no, can you share the plan (ExecutionEnvironment.getExecutionPlan()) that was generated for the program?

Thanks,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions



2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Any help on this? This thing is very strange..the "manual" union of the output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone <[hidden email]> wrote:

Sorry, I translated the code into pseudocode too fast. That is indeed an equals.


On 16/03/2018 15:58, Kien Truong wrote:

Hi,

Just a guest, but string compare in Java should be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:
subject.getField("field1") == "";















Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior on filter, group and reduce DataSets

Fabian Hueske-2
Hi,
Yes, I've updated the PR.
It needs a review and should be included in Flink 1.5.

Cheers, Fabian

simone <[hidden email]> schrieb am Mo., 26. März 2018, 12:01:

Hi Fabian,

any update on this? Did you fix it?

Best, Simone.


On 22/03/2018 00:24, Fabian Hueske wrote:
Hi,

That was a bit too early.
I found an issue with my approach. Will come back once I solved that.

Best, Fabian

2018-03-21 23:45 GMT+01:00 Fabian Hueske <[hidden email]>:
Hi,

I've opened a pull request [1] that should fix the problem.
It would be great if you could try change and report back whether it fixes the problem.

Thank you,
Fabian

[1] https://github.com/apache/flink/pull/5742

2018-03-21 9:49 GMT+01:00 simone <[hidden email]>:

Hi all,

an update: following Stephan directives on how to diagnose the issue, making Person immutable, the problem does not occur.

Simone.


On 20/03/2018 20:20, Stephan Ewen wrote:
To diagnose that, can you please check the following:

  - Change the Person data type to be immutable (final fields, no setters, set fields in constructor instead). Does that make the problem go away?

  - Change the Person data type to not be a POJO by adding a dummy fields that is never used, but does not have a getter/setter. Does that make the problem go away?

If either of that is the case, it must be a mutability bug somewhere in either accidental object reuse or accidental serializer sharing.


On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <[hidden email]> wrote:
Hi Simone and Flavio,

I created FLINK-9031 [1] for this issue.
Please have a look and add any detail that you think could help to resolve the problem.

Thanks,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9031

2018-03-19 16:35 GMT+01:00 simone <[hidden email]>:

Hi Fabian,

This simple code reproduces the behavior -> https://github.com/xseris/Flink-test-union

Thanks, Simone.


On 19/03/2018 15:44, Fabian Hueske wrote:
Hmmm, I still don't see the problem.
IMO, the result should be correct for both plans. The data is replicated, filtered, reduced, and unioned.
There is nothing in between the filter and reduce, that could cause incorrect behavior.

The good thing is, the optimizer seems to be fine. The bad thing is, it is either the Flink runtime code or your functions.
Given that one plan produces good results, it might be the Flink runtime code.

Coming back to my previous question.
Can you provide a minimal program to reproduce the issue?

Thanks, Fabian

2018-03-19 15:15 GMT+01:00 Fabian Hueske <[hidden email]>:
Ah, thanks for the update!
I'll have a look at that.

2018-03-19 15:13 GMT+01:00 Fabian Hueske <[hidden email]>:
HI Simone,

Looking at the plan, I don't see why this should be happening. The pseudo code looks fine as well.
Any chance that you can create a minimal program to reproduce the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone <[hidden email]>:

Hi Fabian,

reuse is not enabled. I attach the plan of the execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:
Hi,

Union is actually a very simple operator (not even an operator in Flink terms). It just merges to inputs. There is no additional logic involved.
Therefore, it should also not emit records before either of both ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the data is reduced and emitted in a pipelined fashion, i.e., once the first record is reduced, it is forwarded into the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before the ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects, you have to be careful in how you implement your functions.
If no, can you share the plan (ExecutionEnvironment.getExecutionPlan()) that was generated for the program?

Thanks,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions



2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Any help on this? This thing is very strange..the "manual" union of the output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone <[hidden email]> wrote:

Sorry, I translated the code into pseudocode too fast. That is indeed an equals.


On 16/03/2018 15:58, Kien Truong wrote:

Hi,

Just a guest, but string compare in Java should be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:
subject.getField("field1") == "";















12