Hi,
We found out there is a new stable version released: 1.1.0 but we can not find any release note. Do anyone know where to find it? We are experience some change of behavior, I’m not sure if it is related. Thanks Andrew
Confidentiality Notice: This e-mail transmission may contain confidential or legally privileged information that is intended only for the individual or entity named in the e-mail address. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or reliance upon the contents of this e-mail is strictly prohibited and may be unlawful. If you have received this e-mail in error, please notify the sender immediately by return e-mail and delete all copies of this message. |
Hi Andrew, here is the release announcement, with a list of all changes: http://flink.apache.org/news/2016/08/08/release-1.1.0.html, http://flink.apache.org/blog/release_1.1.0-changelog.html What does the chart say? Are the results different? is Flink faster or slower now? Regards, Robert On Tue, Aug 9, 2016 at 11:32 AM, Andrew Ge Wu <[hidden email]> wrote:
|
Hi Robert
Thanks for the quick reply, I guess I’m one of the early birds. Yes, it is much slower, I’m not sure why, I copied slaves, masters, log4j.properties and flink-conf.yaml directly from 1.0.3 I have parallelization 1 on my sources, I can increase that to achieve the same speed, but I’m interested to know why is that. Thanks! Andrew
Confidentiality Notice: This e-mail transmission may contain confidential or legally privileged information that is intended only for the individual or entity named in the e-mail address. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or reliance upon the contents of this e-mail is strictly prohibited and may be unlawful. If you have received this e-mail in error, please notify the sender immediately by return e-mail and delete all copies of this message. |
Which source are you using? On Tue, Aug 9, 2016 at 11:50 AM, Andrew Ge Wu <[hidden email]> wrote:
|
We wrote our own source. I noticed our back pressure changed from ok to high after relance().timeWindowAll(), if there is no obvious change on that, the problem can be in our function after this.
Confidentiality Notice: This e-mail transmission may contain confidential or legally privileged information that is intended only for the individual or entity named in the e-mail address. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or reliance upon the contents of this e-mail is strictly prohibited and may be unlawful. If you have received this e-mail in error, please notify the sender immediately by return e-mail and delete all copies of this message. |
In reply to this post by rmetzger0
I rolled back to 1.0.3
If I understand this correctly, the peak when topology starts is because it is trying to fill all the buffers, but I can not see that in 1.1.0.
Confidentiality Notice: This e-mail transmission may contain confidential or legally privileged information that is intended only for the individual or entity named in the e-mail address. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or reliance upon the contents of this e-mail is strictly prohibited and may be unlawful. If you have received this e-mail in error, please notify the sender immediately by return e-mail and delete all copies of this message. |
Hi, could you maybe post how exactly you specify the window? Also, did you set a "stream time characteristic", for example EventTime? That could help us pinpoint the problem. Cheers, Aljoscha On Tue, 9 Aug 2016 at 12:42 Andrew Ge Wu <[hidden email]> wrote:
|
Hi Aljoscha
Plan attached, there are split streams and union operations around, but here is how windows are created
Confidentiality Notice: This e-mail transmission may contain confidential or legally privileged information that is intended only for the individual or entity named in the e-mail address. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or reliance upon the contents of this e-mail is strictly prohibited and may be unlawful. If you have received this e-mail in error, please notify the sender immediately by return e-mail and delete all copies of this message. Let me know if I’m doing something out of ordinary here. Thanks! Andrew
PastedGraphic-6.png (69K) Download Attachment plan.json (3K) Download Attachment PastedGraphic-3.png (42K) Download Attachment PastedGraphic-3.png (42K) Download Attachment |
Hi, are you setting a StreamTimeCharacteristic, i.e. env.setStreamTimeCharacteristic? Cheers, Aljoscha On Tue, 9 Aug 2016 at 14:52 Andrew Ge Wu <[hidden email]> wrote:
|
Oh sorry missed that part, no, Im not explicitly set that.
Confidentiality Notice: This e-mail transmission may contain confidential or legally privileged information that is intended only for the individual or entity named in the e-mail address. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or reliance upon the contents of this e-mail is strictly prohibited and may be unlawful. If you have received this e-mail in error, please notify the sender immediately by return e-mail and delete all copies of this message. |
Hi, could you maybe send us the output of "env.getExecutionPlan()". This would help us better understand which operators are used exactly. (You can of course remove any security sensitive stuff.) Cheers, Aljoscha On Tue, 9 Aug 2016 at 15:30 Andrew Ge Wu <[hidden email]> wrote:
|
Oh, are you by any chance specifying a custom state backend for your job? For example, RocksDBStateBackend. Cheers, Aljoscha On Wed, 10 Aug 2016 at 11:17 Aljoscha Krettek <[hidden email]> wrote:
|
Hi Aljoscha
We are not using state backend explicitly, recovery and state backend are pointed to file path. See attached json file
Confidentiality Notice: This e-mail transmission may contain confidential or legally privileged information that is intended only for the individual or entity named in the e-mail address. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or reliance upon the contents of this e-mail is strictly prohibited and may be unlawful. If you have received this e-mail in error, please notify the sender immediately by return e-mail and delete all copies of this message. Thanks for the help. Best regards Andrew
plan.json (3K) Download Attachment |
Hi Andrew! Here is the reason for what is happening with your job: You have used some sort of undocumented and unofficial corner case behavior of Flink 1.0.0, namely, using parallel windowAll(). Initially, windowAll() was supposed to not be parallel, but the system did not prevent to set a parallelism. In Flink 1.0.0 it just happened that a parallel windowAll() behaved like a "window over stream partition". In Flink 1.1.0, the parallel windowAll() really sends all data to one of the parallel operators, and the others are idle. Admittedly, Flink 1.1.0 should simply not allow to set a parallelism on windowAll() - we will fix that. What we need to figure out now is how to have an adequate replacement for the "window over stream partition" use case. I think we need to add an explicit "windowPartition()" function for that case. Until then, you could stay on Flink 1.0.3 or you can try and use instead of "windowAll()" a "keyBy().window()" operator and use an incrementing number%24 as a key (would not be perfectly balanced, but a temporary workaround): stream .keyBy(new KeySelector<SocialData, Integer>() { private int key; @Override public Integer getKey(SocialData data) { if (++key >= 24) { key = 0; } return key; } }) .timeWindow(Time.milliseconds(100)) .apply(...) Sorry for the inconvenience! Greetings, Stephan On Wed, Aug 10, 2016 at 1:15 PM, Andrew Ge Wu <[hidden email]> wrote:
|
Hi Stephan
Thanks for the explanation! We will stick to 1.0.3 to keep our code clean. In the workaround case, how does key selector instantiated? One instance per window operator? By the way is there a way to create a hybrid window of count and time, like 50 items or max process time 100ms? Thanks! Andrew
Confidentiality Notice: This e-mail transmission may contain confidential or legally privileged information that is intended only for the individual or entity named in the e-mail address. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or reliance upon the contents of this e-mail is strictly prohibited and may be unlawful. If you have received this e-mail in error, please notify the sender immediately by return e-mail and delete all copies of this message. |
Hi! In the above example the keySelector would run once before and once inside the window operator. In that sense, the version below is a better way to do it. You can also create windows of 50 or max 100 ms by writing your own trigger. Have a look at the count trigger. You can augment it by scheduling a time callback for 100ms to trigger the window. The better version of the "random key" program: stream .map(new MapFunction<SocialData, Tuple2<SocialData, Integer>>() { private int key; @Override public Tuple2<SocialData, Integer>map(SocialData data) { if (++key >= 24) { key = 0; } return new Tuple2<>(key, data); } }) .keyBy(0) .timeWindow(Time.milliseconds( .apply(...) Greetings, Stephan On Wed, Aug 10, 2016 at 3:54 PM, Andrew Ge Wu <[hidden email]> wrote:
|
I’ll wait for your the next great release. cheers! Andrew
Confidentiality Notice: This e-mail transmission may contain confidential or legally privileged information that is intended only for the individual or entity named in the e-mail address. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or reliance upon the contents of this e-mail is strictly prohibited and may be unlawful. If you have received this e-mail in error, please notify the sender immediately by return e-mail and delete all copies of this message. |
Free forum by Nabble | Edit this page |