[ANNOUNCE] Apache Flink 1.3.3 released

classic Classic list List threaded Threaded
24 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[ANNOUNCE] Apache Flink 1.3.3 released

Tzu-Li (Gordon) Tai
The Apache Flink community is very happy to announce the release of Apache Flink 1.3.3, which is the third bugfix release for the Apache Flink 1.3 series. 

Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications. 

The release is available for download at: 
https://flink.apache.org/downloads.html 

Please check out the release blog post for an overview of the improvements for this bugfix release: 
https://flink.apache.org/news/2018/03/15/release-1.3.3.html

We would like to thank all contributors of the Apache Flink community who made this release possible! 

Cheers, 
Gordon 

Reply | Threaded
Open this post in threaded view
|

Re: [ANNOUNCE] Apache Flink 1.3.3 released

Fabian Hueske
Thanks for managing this release Gordon!

Cheers, Fabian

2018-03-15 21:24 GMT+01:00 Tzu-Li (Gordon) Tai <[hidden email]>:
The Apache Flink community is very happy to announce the release of Apache Flink 1.3.3, which is the third bugfix release for the Apache Flink 1.3 series. 

Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications. 

The release is available for download at: 
https://flink.apache.org/downloads.html 

Please check out the release blog post for an overview of the improvements for this bugfix release: 
https://flink.apache.org/news/2018/03/15/release-1.3.3.html

The full release notes are available in Jira: 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12341142

We would like to thank all contributors of the Apache Flink community who made this release possible! 

Cheers, 
Gordon 


Reply | Threaded
Open this post in threaded view
|

Re: [ANNOUNCE] Apache Flink 1.3.3 released

Till Rohrmann
Thanks for managing the release Gordon and also thanks to the community!

Cheers,
Till

On Fri, Mar 16, 2018 at 9:05 AM, Fabian Hueske <[hidden email]> wrote:
Thanks for managing this release Gordon!

Cheers, Fabian

2018-03-15 21:24 GMT+01:00 Tzu-Li (Gordon) Tai <[hidden email]>:
The Apache Flink community is very happy to announce the release of Apache Flink 1.3.3, which is the third bugfix release for the Apache Flink 1.3 series. 

Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications. 

The release is available for download at: 
https://flink.apache.org/downloads.html 

Please check out the release blog post for an overview of the improvements for this bugfix release: 
https://flink.apache.org/news/2018/03/15/release-1.3.3.html

The full release notes are available in Jira: 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12341142

We would like to thank all contributors of the Apache Flink community who made this release possible! 

Cheers, 
Gordon 



Reply | Threaded
Open this post in threaded view
|

Re: [ANNOUNCE] Apache Flink 1.3.3 released

Stephan Ewen
This release fixed a quite critical bug that could lead to loss of checkpoint state: https://issues.apache.org/jira/browse/FLINK-7783

We recommend all users on Flink 1.3.2 to upgrade to 1.3.3


On Fri, Mar 16, 2018 at 10:31 AM, Till Rohrmann <[hidden email]> wrote:
Thanks for managing the release Gordon and also thanks to the community!

Cheers,
Till

On Fri, Mar 16, 2018 at 9:05 AM, Fabian Hueske <[hidden email]> wrote:
Thanks for managing this release Gordon!

Cheers, Fabian

2018-03-15 21:24 GMT+01:00 Tzu-Li (Gordon) Tai <[hidden email]>:
The Apache Flink community is very happy to announce the release of Apache Flink 1.3.3, which is the third bugfix release for the Apache Flink 1.3 series. 

Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications. 

The release is available for download at: 
https://flink.apache.org/downloads.html 

Please check out the release blog post for an overview of the improvements for this bugfix release: 
https://flink.apache.org/news/2018/03/15/release-1.3.3.html

The full release notes are available in Jira: 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12341142

We would like to thank all contributors of the Apache Flink community who made this release possible! 

Cheers, 
Gordon 




Reply | Threaded
Open this post in threaded view
|

Strange behavior on filter, group and reduce DataSets

simone
Hi all,
I am using Flink 1.3.1 and I have found a strange behavior on running the following logic:
  1. Read data from file and store into DataSet<POJO>
  2. Split dataset in two, by checking if "field1" of POJOs is empty or not, so that the first dataset contains only elements with non empty "field1", and the second dataset will contain the other elements.
  3. Each dataset is then grouped by, one by "field1" and other by another field, and subsequently reduced.
  4. The 2 datasets are merged together by union.
  5. The final dataset is written as json.
What I was expected, from output, was to find only one element with a specific value of "field1" because:
  1. Reducing the first dataset grouped by "field1" should generate only one element with a specific value of "field1".
  2. The second dataset should contain only elements with empty "field1".
  3. Making an union of them should not duplicate any record.
This does not happen. When i read the generated jsons i see some duplicate (non empty) values of "field1".
Strangely this does not happen when the union between the two datasets is not computed. In this case the first dataset produces elements only with distinct values of "field1", while second dataset produces only records with empty field "value1".

Debugging the code, it seems that the map function used to convent the last merged dataset into json strings starts before the reduction functions terminates. This seems to produce duplicates.
Here is my pseudocode:

DataSet<POJO> subjects = read from csv...

DataSet<POJO> subjectsWithCondition = subjects.filter(new FilterFunction<POJO>(){
    @Override
    public boolean filter(POJO subject) throws Exception {
        return subject.getField("field1") != "";
    }
}).groupBy("field1").reduce(new ReduceFunction<P>() {
    @Override
    public Soggetto reduce(POJO subject1, POJO subject2) {
        POJO ret = subject1;
        return ret;
    }
});

DataSet<POJO> subjectsWithoutCondition = subjects.filter(new FilterFunction<POJO>(){
    @Override
    public boolean filter(POJO subject) throws Exception {
        return subject.getField("field1") == "";
    }
}).groupBy("field2").reduce(new ReduceFunction<P>() {
    @Override
    public Soggetto reduce(POJO subject1, POJO subject2) {
        POJO ret = subject1;
        return ret;
    }
});

DataSet<POJO> allSubjects = subjectsWithCondition.union(subjectsWithoutCondition);

DataSet<String> jsonSubjects = allSubjects.map(new RichMapFunction<POJO, String>() {
    private static final long serialVersionUID = 1L;
    ObjectMapper mapper = new ObjectMapper();

    @Override
    public String map(POJO subject) throws Exception {
        return mapper.writeValueAsString(subject);
    }
});

jsonSubjects.writeAsText("/tmp/subjects/", WriteMode.OVERWRITE);
env.execute("JSON generation");

What is the problem? Did I made some mistake on filtering,grouping or reducing logic?
Thanks in advance,
Simone.
Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior on filter, group and reduce DataSets

Kien Truong

Hi,

Just a guest, but string compare in Java should be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:
subject.getField("field1") == "";
Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior on filter, group and reduce DataSets

simone

Sorry, I translated the code into pseudocode too fast. That is indeed an equals.


On 16/03/2018 15:58, Kien Truong wrote:

Hi,

Just a guest, but string compare in Java should be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:
subject.getField("field1") == "";

Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior on filter, group and reduce DataSets

Flavio Pompermaier
Any help on this? This thing is very strange..the "manual" union of the output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone <[hidden email]> wrote:

Sorry, I translated the code into pseudocode too fast. That is indeed an equals.


On 16/03/2018 15:58, Kien Truong wrote:

Hi,

Just a guest, but string compare in Java should be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:
subject.getField("field1") == "";



Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior on filter, group and reduce DataSets

Fabian Hueske-2
Hi,

Union is actually a very simple operator (not even an operator in Flink terms). It just merges to inputs. There is no additional logic involved.
Therefore, it should also not emit records before either of both ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the data is reduced and emitted in a pipelined fashion, i.e., once the first record is reduced, it is forwarded into the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before the ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects, you have to be careful in how you implement your functions.
If no, can you share the plan (ExecutionEnvironment.getExecutionPlan()) that was generated for the program?

Thanks,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions



2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Any help on this? This thing is very strange..the "manual" union of the output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone <[hidden email]> wrote:

Sorry, I translated the code into pseudocode too fast. That is indeed an equals.


On 16/03/2018 15:58, Kien Truong wrote:

Hi,

Just a guest, but string compare in Java should be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:
subject.getField("field1") == "";




Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior on filter, group and reduce DataSets

simone

Hi Fabian,

reuse is not enabled. I attach the plan of the execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:
Hi,

Union is actually a very simple operator (not even an operator in Flink terms). It just merges to inputs. There is no additional logic involved.
Therefore, it should also not emit records before either of both ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the data is reduced and emitted in a pipelined fashion, i.e., once the first record is reduced, it is forwarded into the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before the ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects, you have to be careful in how you implement your functions.
If no, can you share the plan (ExecutionEnvironment.getExecutionPlan()) that was generated for the program?

Thanks,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions



2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Any help on this? This thing is very strange..the "manual" union of the output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone <[hidden email]> wrote:

Sorry, I translated the code into pseudocode too fast. That is indeed an equals.


On 16/03/2018 15:58, Kien Truong wrote:

Hi,

Just a guest, but string compare in Java should be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:
subject.getField("field1") == "";






plan.txt (156K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior on filter, group and reduce DataSets

simone

Hi Fabian,

I have an update. Forcing a re-balance after the union (id 5,86 of the previous plan), the output meets the expectations:

DataSet<POJO> ds3 = ds1.union(ds2);
ds3 = ds3.rebalance();

The new produced plan (and the old one) is attached to this mail. I still don't understand why, without rebalancing, the execution have that strange behavior. Any idea?

Thanks,
Simone.


On 19/03/2018 12:04, simone wrote:

Hi Fabian,

reuse is not enabled. I attach the plan of the execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:
Hi,

Union is actually a very simple operator (not even an operator in Flink terms). It just merges to inputs. There is no additional logic involved.
Therefore, it should also not emit records before either of both ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the data is reduced and emitted in a pipelined fashion, i.e., once the first record is reduced, it is forwarded into the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before the ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects, you have to be careful in how you implement your functions.
If no, can you share the plan (ExecutionEnvironment.getExecutionPlan()) that was generated for the program?

Thanks,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions



2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Any help on this? This thing is very strange..the "manual" union of the output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone <[hidden email]> wrote:

Sorry, I translated the code into pseudocode too fast. That is indeed an equals.


On 16/03/2018 15:58, Kien Truong wrote:

Hi,

Just a guest, but string compare in Java should be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:
subject.getField("field1") == "";







oldplan.txt (156K) Download Attachment
newplan.txt (156K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior on filter, group and reduce DataSets

Fabian Hueske-2
In reply to this post by simone
HI Simone,

Looking at the plan, I don't see why this should be happening. The pseudo code looks fine as well.
Any chance that you can create a minimal program to reproduce the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone <[hidden email]>:

Hi Fabian,

reuse is not enabled. I attach the plan of the execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:
Hi,

Union is actually a very simple operator (not even an operator in Flink terms). It just merges to inputs. There is no additional logic involved.
Therefore, it should also not emit records before either of both ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the data is reduced and emitted in a pipelined fashion, i.e., once the first record is reduced, it is forwarded into the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before the ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects, you have to be careful in how you implement your functions.
If no, can you share the plan (ExecutionEnvironment.getExecutionPlan()) that was generated for the program?

Thanks,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions



2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Any help on this? This thing is very strange..the "manual" union of the output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone <[hidden email]> wrote:

Sorry, I translated the code into pseudocode too fast. That is indeed an equals.


On 16/03/2018 15:58, Kien Truong wrote:

Hi,

Just a guest, but string compare in Java should be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:
subject.getField("field1") == "";






Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior on filter, group and reduce DataSets

Fabian Hueske-2
Ah, thanks for the update!
I'll have a look at that.

2018-03-19 15:13 GMT+01:00 Fabian Hueske <[hidden email]>:
HI Simone,

Looking at the plan, I don't see why this should be happening. The pseudo code looks fine as well.
Any chance that you can create a minimal program to reproduce the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone <[hidden email]>:

Hi Fabian,

reuse is not enabled. I attach the plan of the execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:
Hi,

Union is actually a very simple operator (not even an operator in Flink terms). It just merges to inputs. There is no additional logic involved.
Therefore, it should also not emit records before either of both ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the data is reduced and emitted in a pipelined fashion, i.e., once the first record is reduced, it is forwarded into the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before the ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects, you have to be careful in how you implement your functions.
If no, can you share the plan (ExecutionEnvironment.getExecutionPlan()) that was generated for the program?

Thanks,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions



2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Any help on this? This thing is very strange..the "manual" union of the output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone <[hidden email]> wrote:

Sorry, I translated the code into pseudocode too fast. That is indeed an equals.


On 16/03/2018 15:58, Kien Truong wrote:

Hi,

Just a guest, but string compare in Java should be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:
subject.getField("field1") == "";







Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior on filter, group and reduce DataSets

Fabian Hueske-2
Hmmm, I still don't see the problem.
IMO, the result should be correct for both plans. The data is replicated, filtered, reduced, and unioned.
There is nothing in between the filter and reduce, that could cause incorrect behavior.

The good thing is, the optimizer seems to be fine. The bad thing is, it is either the Flink runtime code or your functions.
Given that one plan produces good results, it might be the Flink runtime code.

Coming back to my previous question.
Can you provide a minimal program to reproduce the issue?

Thanks, Fabian

2018-03-19 15:15 GMT+01:00 Fabian Hueske <[hidden email]>:
Ah, thanks for the update!
I'll have a look at that.

2018-03-19 15:13 GMT+01:00 Fabian Hueske <[hidden email]>:
HI Simone,

Looking at the plan, I don't see why this should be happening. The pseudo code looks fine as well.
Any chance that you can create a minimal program to reproduce the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone <[hidden email]>:

Hi Fabian,

reuse is not enabled. I attach the plan of the execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:
Hi,

Union is actually a very simple operator (not even an operator in Flink terms). It just merges to inputs. There is no additional logic involved.
Therefore, it should also not emit records before either of both ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the data is reduced and emitted in a pipelined fashion, i.e., once the first record is reduced, it is forwarded into the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before the ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects, you have to be careful in how you implement your functions.
If no, can you share the plan (ExecutionEnvironment.getExecutionPlan()) that was generated for the program?

Thanks,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions



2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Any help on this? This thing is very strange..the "manual" union of the output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone <[hidden email]> wrote:

Sorry, I translated the code into pseudocode too fast. That is indeed an equals.


On 16/03/2018 15:58, Kien Truong wrote:

Hi,

Just a guest, but string compare in Java should be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:
subject.getField("field1") == "";








Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior on filter, group and reduce DataSets

simone

Hi Fabian,

This simple code reproduces the behavior -> https://github.com/xseris/Flink-test-union

Thanks, Simone.


On 19/03/2018 15:44, Fabian Hueske wrote:
Hmmm, I still don't see the problem.
IMO, the result should be correct for both plans. The data is replicated, filtered, reduced, and unioned.
There is nothing in between the filter and reduce, that could cause incorrect behavior.

The good thing is, the optimizer seems to be fine. The bad thing is, it is either the Flink runtime code or your functions.
Given that one plan produces good results, it might be the Flink runtime code.

Coming back to my previous question.
Can you provide a minimal program to reproduce the issue?

Thanks, Fabian

2018-03-19 15:15 GMT+01:00 Fabian Hueske <[hidden email]>:
Ah, thanks for the update!
I'll have a look at that.

2018-03-19 15:13 GMT+01:00 Fabian Hueske <[hidden email]>:
HI Simone,

Looking at the plan, I don't see why this should be happening. The pseudo code looks fine as well.
Any chance that you can create a minimal program to reproduce the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone <[hidden email]>:

Hi Fabian,

reuse is not enabled. I attach the plan of the execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:
Hi,

Union is actually a very simple operator (not even an operator in Flink terms). It just merges to inputs. There is no additional logic involved.
Therefore, it should also not emit records before either of both ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the data is reduced and emitted in a pipelined fashion, i.e., once the first record is reduced, it is forwarded into the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before the ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects, you have to be careful in how you implement your functions.
If no, can you share the plan (ExecutionEnvironment.getExecutionPlan()) that was generated for the program?

Thanks,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions



2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Any help on this? This thing is very strange..the "manual" union of the output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone <[hidden email]> wrote:

Sorry, I translated the code into pseudocode too fast. That is indeed an equals.


On 16/03/2018 15:58, Kien Truong wrote:

Hi,

Just a guest, but string compare in Java should be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:
subject.getField("field1") == "";









Reply | Threaded
Open this post in threaded view
|

Re: [ANNOUNCE] Apache Flink 1.3.3 released

Philip Luppens
In reply to this post by Stephan Ewen
Hi everyone,

Thanks, but I don’t see the binaries for 1.3.3 being pushed anywhere in the Maven repositories [1]. Can we expect them to show up over there as well eventually?


Kind regards,

-Phil


On Fri, Mar 16, 2018 at 3:36 PM, Stephan Ewen <[hidden email]> wrote:
This release fixed a quite critical bug that could lead to loss of checkpoint state: https://issues.apache.org/jira/browse/FLINK-7783

We recommend all users on Flink 1.3.2 to upgrade to 1.3.3


On Fri, Mar 16, 2018 at 10:31 AM, Till Rohrmann <[hidden email]> wrote:
Thanks for managing the release Gordon and also thanks to the community!

Cheers,
Till

On Fri, Mar 16, 2018 at 9:05 AM, Fabian Hueske <[hidden email]> wrote:
Thanks for managing this release Gordon!

Cheers, Fabian

2018-03-15 21:24 GMT+01:00 Tzu-Li (Gordon) Tai <[hidden email]>:
The Apache Flink community is very happy to announce the release of Apache Flink 1.3.3, which is the third bugfix release for the Apache Flink 1.3 series. 

Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications. 

The release is available for download at: 
https://flink.apache.org/downloads.html 

Please check out the release blog post for an overview of the improvements for this bugfix release: 
https://flink.apache.org/news/2018/03/15/release-1.3.3.html

The full release notes are available in Jira: 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12341142

We would like to thank all contributors of the Apache Flink community who made this release possible! 

Cheers, 
Gordon 







--
"We cannot change the cards we are dealt, just how we play the hand." - Randy Pausch
Reply | Threaded
Open this post in threaded view
|

Re: [ANNOUNCE] Apache Flink 1.3.3 released

Chesnay Schepler
Whoops, looks like we forgot to push the release button :)

Thank you for notifying us. The artifacts should be available soon.

On 20.03.2018 11:35, Philip Luppens wrote:

> Hi everyone,
>
> Thanks, but I don’t see the binaries for 1.3.3 being pushed anywhere in the
> Maven repositories [1]. Can we expect them to show up over there as well
> eventually?
>
> [1] https://repo.maven.apache.org/maven2/org/apache/flink/flink-java/
>
> Kind regards,
>
> -Phil
>
>
> On Fri, Mar 16, 2018 at 3:36 PM, Stephan Ewen <[hidden email]> wrote:
>
>> This release fixed a quite critical bug that could lead to loss of
>> checkpoint state: https://issues.apache.org/jira/browse/FLINK-7783
>>
>> We recommend all users on Flink 1.3.2 to upgrade to 1.3.3
>>
>>
>> On Fri, Mar 16, 2018 at 10:31 AM, Till Rohrmann <[hidden email]>
>> wrote:
>>
>>> Thanks for managing the release Gordon and also thanks to the community!
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Mar 16, 2018 at 9:05 AM, Fabian Hueske <[hidden email]>
>>> wrote:
>>>
>>>> Thanks for managing this release Gordon!
>>>>
>>>> Cheers, Fabian
>>>>
>>>> 2018-03-15 21:24 GMT+01:00 Tzu-Li (Gordon) Tai <[hidden email]>:
>>>>
>>>>> The Apache Flink community is very happy to announce the release of
>>>>> Apache Flink 1.3.3, which is the third bugfix release for the Apache Flink
>>>>> 1.3 series.
>>>>>
>>>>> Apache Flink® is an open-source stream processing framework for
>>>>> distributed, high-performing, always-available, and accurate data streaming
>>>>> applications.
>>>>>
>>>>> The release is available for download at:
>>>>> https://flink.apache.org/downloads.html
>>>>>
>>>>> Please check out the release blog post for an overview of the
>>>>> improvements for this bugfix release:
>>>>> https://flink.apache.org/news/2018/03/15/release-1.3.3.html
>>>>>
>>>>> The full release notes are available in Jira:
>>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?proje
>>>>> ctId=12315522&version=12341142
>>>>>
>>>>> We would like to thank all contributors of the Apache Flink community
>>>>> who made this release possible!
>>>>>
>>>>> Cheers,
>>>>> Gordon
>>>>>
>>>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior on filter, group and reduce DataSets

Fabian Hueske-2
In reply to this post by simone
Hi Simone and Flavio,

I created FLINK-9031 [1] for this issue.
Please have a look and add any detail that you think could help to resolve the problem.

Thanks,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9031

2018-03-19 16:35 GMT+01:00 simone <[hidden email]>:

Hi Fabian,

This simple code reproduces the behavior -> https://github.com/xseris/Flink-test-union

Thanks, Simone.


On 19/03/2018 15:44, Fabian Hueske wrote:
Hmmm, I still don't see the problem.
IMO, the result should be correct for both plans. The data is replicated, filtered, reduced, and unioned.
There is nothing in between the filter and reduce, that could cause incorrect behavior.

The good thing is, the optimizer seems to be fine. The bad thing is, it is either the Flink runtime code or your functions.
Given that one plan produces good results, it might be the Flink runtime code.

Coming back to my previous question.
Can you provide a minimal program to reproduce the issue?

Thanks, Fabian

2018-03-19 15:15 GMT+01:00 Fabian Hueske <[hidden email]>:
Ah, thanks for the update!
I'll have a look at that.

2018-03-19 15:13 GMT+01:00 Fabian Hueske <[hidden email]>:
HI Simone,

Looking at the plan, I don't see why this should be happening. The pseudo code looks fine as well.
Any chance that you can create a minimal program to reproduce the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone <[hidden email]>:

Hi Fabian,

reuse is not enabled. I attach the plan of the execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:
Hi,

Union is actually a very simple operator (not even an operator in Flink terms). It just merges to inputs. There is no additional logic involved.
Therefore, it should also not emit records before either of both ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the data is reduced and emitted in a pipelined fashion, i.e., once the first record is reduced, it is forwarded into the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before the ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects, you have to be careful in how you implement your functions.
If no, can you share the plan (ExecutionEnvironment.getExecutionPlan()) that was generated for the program?

Thanks,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions



2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Any help on this? This thing is very strange..the "manual" union of the output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone <[hidden email]> wrote:

Sorry, I translated the code into pseudocode too fast. That is indeed an equals.


On 16/03/2018 15:58, Kien Truong wrote:

Hi,

Just a guest, but string compare in Java should be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:
subject.getField("field1") == "";










Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior on filter, group and reduce DataSets

Stephan Ewen
To diagnose that, can you please check the following:

  - Change the Person data type to be immutable (final fields, no setters, set fields in constructor instead). Does that make the problem go away?

  - Change the Person data type to not be a POJO by adding a dummy fields that is never used, but does not have a getter/setter. Does that make the problem go away?

If either of that is the case, it must be a mutability bug somewhere in either accidental object reuse or accidental serializer sharing.


On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <[hidden email]> wrote:
Hi Simone and Flavio,

I created FLINK-9031 [1] for this issue.
Please have a look and add any detail that you think could help to resolve the problem.

Thanks,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9031

2018-03-19 16:35 GMT+01:00 simone <[hidden email]>:

Hi Fabian,

This simple code reproduces the behavior -> https://github.com/xseris/Flink-test-union

Thanks, Simone.


On 19/03/2018 15:44, Fabian Hueske wrote:
Hmmm, I still don't see the problem.
IMO, the result should be correct for both plans. The data is replicated, filtered, reduced, and unioned.
There is nothing in between the filter and reduce, that could cause incorrect behavior.

The good thing is, the optimizer seems to be fine. The bad thing is, it is either the Flink runtime code or your functions.
Given that one plan produces good results, it might be the Flink runtime code.

Coming back to my previous question.
Can you provide a minimal program to reproduce the issue?

Thanks, Fabian

2018-03-19 15:15 GMT+01:00 Fabian Hueske <[hidden email]>:
Ah, thanks for the update!
I'll have a look at that.

2018-03-19 15:13 GMT+01:00 Fabian Hueske <[hidden email]>:
HI Simone,

Looking at the plan, I don't see why this should be happening. The pseudo code looks fine as well.
Any chance that you can create a minimal program to reproduce the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone <[hidden email]>:

Hi Fabian,

reuse is not enabled. I attach the plan of the execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:
Hi,

Union is actually a very simple operator (not even an operator in Flink terms). It just merges to inputs. There is no additional logic involved.
Therefore, it should also not emit records before either of both ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the data is reduced and emitted in a pipelined fashion, i.e., once the first record is reduced, it is forwarded into the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before the ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects, you have to be careful in how you implement your functions.
If no, can you share the plan (ExecutionEnvironment.getExecutionPlan()) that was generated for the program?

Thanks,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions



2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Any help on this? This thing is very strange..the "manual" union of the output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone <[hidden email]> wrote:

Sorry, I translated the code into pseudocode too fast. That is indeed an equals.


On 16/03/2018 15:58, Kien Truong wrote:

Hi,

Just a guest, but string compare in Java should be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:
subject.getField("field1") == "";











Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior on filter, group and reduce DataSets

simone

Hi all,

an update: following Stephan directives on how to diagnose the issue, making Person immutable, the problem does not occur.

Simone.


On 20/03/2018 20:20, Stephan Ewen wrote:
To diagnose that, can you please check the following:

  - Change the Person data type to be immutable (final fields, no setters, set fields in constructor instead). Does that make the problem go away?

  - Change the Person data type to not be a POJO by adding a dummy fields that is never used, but does not have a getter/setter. Does that make the problem go away?

If either of that is the case, it must be a mutability bug somewhere in either accidental object reuse or accidental serializer sharing.


On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <[hidden email]> wrote:
Hi Simone and Flavio,

I created FLINK-9031 [1] for this issue.
Please have a look and add any detail that you think could help to resolve the problem.

Thanks,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9031

2018-03-19 16:35 GMT+01:00 simone <[hidden email]>:

Hi Fabian,

This simple code reproduces the behavior -> https://github.com/xseris/Flink-test-union

Thanks, Simone.


On 19/03/2018 15:44, Fabian Hueske wrote:
Hmmm, I still don't see the problem.
IMO, the result should be correct for both plans. The data is replicated, filtered, reduced, and unioned.
There is nothing in between the filter and reduce, that could cause incorrect behavior.

The good thing is, the optimizer seems to be fine. The bad thing is, it is either the Flink runtime code or your functions.
Given that one plan produces good results, it might be the Flink runtime code.

Coming back to my previous question.
Can you provide a minimal program to reproduce the issue?

Thanks, Fabian

2018-03-19 15:15 GMT+01:00 Fabian Hueske <[hidden email]>:
Ah, thanks for the update!
I'll have a look at that.

2018-03-19 15:13 GMT+01:00 Fabian Hueske <[hidden email]>:
HI Simone,

Looking at the plan, I don't see why this should be happening. The pseudo code looks fine as well.
Any chance that you can create a minimal program to reproduce the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone <[hidden email]>:

Hi Fabian,

reuse is not enabled. I attach the plan of the execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:
Hi,

Union is actually a very simple operator (not even an operator in Flink terms). It just merges to inputs. There is no additional logic involved.
Therefore, it should also not emit records before either of both ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the data is reduced and emitted in a pipelined fashion, i.e., once the first record is reduced, it is forwarded into the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before the ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects, you have to be careful in how you implement your functions.
If no, can you share the plan (ExecutionEnvironment.getExecutionPlan()) that was generated for the program?

Thanks,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions



2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <[hidden email]>:
Any help on this? This thing is very strange..the "manual" union of the output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone <[hidden email]> wrote:

Sorry, I translated the code into pseudocode too fast. That is indeed an equals.


On 16/03/2018 15:58, Kien Truong wrote:

Hi,

Just a guest, but string compare in Java should be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:
subject.getField("field1") == "";












12