The Apache Flink community is very happy to announce the release of Apache Flink 1.3.3, which is the third bugfix release for the Apache Flink 1.3 series. Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications. The release is available for download at: https://flink.apache.org/downloads.html Please check out the release blog post for an overview of the improvements for this bugfix release: https://flink.apache.org/news/2018/03/15/release-1.3.3.html The full release notes are available in Jira: https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12341142 We would like to thank all contributors of the Apache Flink community who made this release possible! Cheers, Gordon |
Thanks for managing this release Gordon! Cheers, Fabian2018-03-15 21:24 GMT+01:00 Tzu-Li (Gordon) Tai <[hidden email]>: The Apache Flink community is very happy to announce the release of Apache Flink 1.3.3, which is the third bugfix release for the Apache Flink 1.3 series. |
Thanks for managing the release Gordon and also thanks to the community! Cheers, Till On Fri, Mar 16, 2018 at 9:05 AM, Fabian Hueske <[hidden email]> wrote:
|
This release fixed a quite critical bug that could lead to loss of checkpoint state: https://issues.apache.org/jira/browse/FLINK-7783
We recommend all users on Flink 1.3.2 to upgrade to 1.3.3 On Fri, Mar 16, 2018 at 10:31 AM, Till Rohrmann <[hidden email]> wrote:
|
Hi all,
I am using Flink 1.3.1 and I have found a strange behavior on running the following logic:
Strangely this does not happen when the union between the two datasets is not computed. In this case the first dataset produces elements only with distinct values of "field1", while second dataset produces only records with empty field "value1". Debugging the code, it seems that the map function used to convent the last merged dataset into json strings starts before the reduction functions terminates. This seems to produce duplicates. Here is my pseudocode: DataSet<POJO> subjects = read from csv... DataSet<POJO> subjectsWithCondition = subjects.filter(new FilterFunction<POJO>(){ @Override public boolean filter(POJO subject) throws Exception { return subject.getField("field1") != ""; } }).groupBy("field1").reduce(new ReduceFunction<P>() { @Override public Soggetto reduce(POJO subject1, POJO subject2) { POJO ret = subject1; return ret; } }); DataSet<POJO> subjectsWithoutCondition = subjects.filter(new FilterFunction<POJO>(){ @Override public boolean filter(POJO subject) throws Exception { return subject.getField("field1") == ""; } }).groupBy("field2").reduce(new ReduceFunction<P>() { @Override public Soggetto reduce(POJO subject1, POJO subject2) { POJO ret = subject1; return ret; } }); DataSet<POJO> allSubjects = subjectsWithCondition.union(subjectsWithoutCondition); DataSet<String> jsonSubjects = allSubjects.map(new RichMapFunction<POJO, String>() { private static final long serialVersionUID = 1L; ObjectMapper mapper = new ObjectMapper(); @Override public String map(POJO subject) throws Exception { return mapper.writeValueAsString(subject); } }); jsonSubjects.writeAsText("/tmp/subjects/", WriteMode.OVERWRITE); env.execute("JSON generation"); What is the problem? Did I made some mistake on filtering,grouping or reducing logic? Thanks in advance, Simone. |
Hi, Just a guest, but string compare in Java should be using equals method, not == operator. Regards, Kien
On 3/16/2018 9:47 PM, simone wrote:
subject.getField("field1") == ""; |
Sorry, I translated the code into pseudocode too fast. That is
indeed an equals. On 16/03/2018 15:58, Kien Truong wrote:
|
Any help on this? This thing is very strange..the "manual" union of the output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer? Best, Flavio On Fri, Mar 16, 2018 at 4:01 PM, simone <[hidden email]> wrote:
|
Hi, Union is actually a very simple operator (not even an operator in Flink terms). It just merges to inputs. There is no additional logic involved. Therefore, it should also not emit records before either of both ReduceFunctions sorted its data. Once the data has been sorted for the ReduceFunction, the data is reduced and emitted in a pipelined fashion, i.e., once the first record is reduced, it is forwarded into the MapFunction (passing the unioned inputs). So it is not unexpected that Map starts processing before the ReduceFunction terminated. If yes, try to disable it. If you want to reuse objects, you have to be careful in how you implement your functions. If no, can you share the plan (ExecutionEnvironment. [1] https://ci.apache.org/ 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <[hidden email]>:
|
Hi Fabian, reuse is not enabled. I attach the plan of the execution. Thanks, On 19/03/2018 11:36, Fabian Hueske
wrote:
plan.txt (156K) Download Attachment |
Hi Fabian, I have an update. Forcing a re-balance after the union (id 5,86 of the previous plan), the output meets the expectations: DataSet<POJO> ds3 = ds1.union(ds2); The new produced plan (and the old one) is attached to this mail. I still don't understand why, without rebalancing, the execution have that strange behavior. Any idea? Thanks, On 19/03/2018 12:04, simone wrote:
|
In reply to this post by simone
HI Simone, Looking at the plan, I don't see why this should be happening. The pseudo code looks fine as well.2018-03-19 12:04 GMT+01:00 simone <[hidden email]>:
|
Ah, thanks for the update! I'll have a look at that. 2018-03-19 15:13 GMT+01:00 Fabian Hueske <[hidden email]>:
|
Hmmm, I still don't see the problem. IMO, the result should be correct for both plans. The data is replicated, filtered, reduced, and unioned. There is nothing in between the filter and reduce, that could cause incorrect behavior. 2018-03-19 15:15 GMT+01:00 Fabian Hueske <[hidden email]>:
|
Hi Fabian, This simple code reproduces the behavior -> https://github.com/xseris/Flink-test-union Thanks, Simone. On 19/03/2018 15:44, Fabian Hueske
wrote:
|
In reply to this post by Stephan Ewen
Hi everyone, Thanks, but I don’t see the binaries for 1.3.3 being pushed anywhere in the Maven repositories [1]. Can we expect them to show up over there as well eventually? Kind regards, -Phil On Fri, Mar 16, 2018 at 3:36 PM, Stephan Ewen <[hidden email]> wrote:
"We cannot change the cards we are dealt, just how we play the hand." - Randy Pausch
|
Whoops, looks like we forgot to push the release button :)
Thank you for notifying us. The artifacts should be available soon. On 20.03.2018 11:35, Philip Luppens wrote: > Hi everyone, > > Thanks, but I don’t see the binaries for 1.3.3 being pushed anywhere in the > Maven repositories [1]. Can we expect them to show up over there as well > eventually? > > [1] https://repo.maven.apache.org/maven2/org/apache/flink/flink-java/ > > Kind regards, > > -Phil > > > On Fri, Mar 16, 2018 at 3:36 PM, Stephan Ewen <[hidden email]> wrote: > >> This release fixed a quite critical bug that could lead to loss of >> checkpoint state: https://issues.apache.org/jira/browse/FLINK-7783 >> >> We recommend all users on Flink 1.3.2 to upgrade to 1.3.3 >> >> >> On Fri, Mar 16, 2018 at 10:31 AM, Till Rohrmann <[hidden email]> >> wrote: >> >>> Thanks for managing the release Gordon and also thanks to the community! >>> >>> Cheers, >>> Till >>> >>> On Fri, Mar 16, 2018 at 9:05 AM, Fabian Hueske <[hidden email]> >>> wrote: >>> >>>> Thanks for managing this release Gordon! >>>> >>>> Cheers, Fabian >>>> >>>> 2018-03-15 21:24 GMT+01:00 Tzu-Li (Gordon) Tai <[hidden email]>: >>>> >>>>> The Apache Flink community is very happy to announce the release of >>>>> Apache Flink 1.3.3, which is the third bugfix release for the Apache Flink >>>>> 1.3 series. >>>>> >>>>> Apache Flink® is an open-source stream processing framework for >>>>> distributed, high-performing, always-available, and accurate data streaming >>>>> applications. >>>>> >>>>> The release is available for download at: >>>>> https://flink.apache.org/downloads.html >>>>> >>>>> Please check out the release blog post for an overview of the >>>>> improvements for this bugfix release: >>>>> https://flink.apache.org/news/2018/03/15/release-1.3.3.html >>>>> >>>>> The full release notes are available in Jira: >>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?proje >>>>> ctId=12315522&version=12341142 >>>>> >>>>> We would like to thank all contributors of the Apache Flink community >>>>> who made this release possible! >>>>> >>>>> Cheers, >>>>> Gordon >>>>> >>>>> > |
In reply to this post by simone
Hi Simone and Flavio, I created FLINK-9031 [1] for this issue.[1] https://issues.apache.org/jira/browse/FLINK-9031 2018-03-19 16:35 GMT+01:00 simone <[hidden email]>:
|
To diagnose that, can you please check the following:
- Change the Person data type to be immutable (final fields, no setters, set fields in constructor instead). Does that make the problem go away? - Change the Person data type to not be a POJO by adding a dummy fields that is never used, but does not have a getter/setter. Does that make the problem go away?
If either of that is the case, it must be a mutability bug somewhere in either accidental object reuse or accidental serializer sharing. On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <[hidden email]> wrote:
|
Hi all, an update: following Stephan directives on how to diagnose the issue, making Person immutable, the problem does not occur. Simone. On 20/03/2018 20:20, Stephan Ewen
wrote:
|
Free forum by Nabble | Edit this page |