Hi All,
I need to aggregate some field of the event, at first I use keyby(), but I found the flink performs very slow (even stop working out results) due to the number of keys is around half a million per min. So I use windowAll() instead, and flink works as expected then. The keyby() upon the field would generate unique key as the field value, so if the number of the uniqueness is huge, flink would have trouble both on cpu and memory. Is it considered in the design of flink? Since windowsAll() could be set parallelism, so I try to use key selector to use field hash but not value, that I hope it would decrease the number of the keys, but the flink throws key out-of-range exception. How to use key selector in correct way? In storm, we could achieve this goal at ease: use fieldGrouping to connect the spout and bolt. |
Hey Jinhua,
On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <[hidden email]> wrote: > The keyby() upon the field would generate unique key as the field > value, so if the number of the uniqueness is huge, flink would have > trouble both on cpu and memory. Is it considered in the design of > flink? Yes, keyBy hash partitions the data across the nodes of your Flink application and thus you can easily scale your application up if you need more processing power. I'm not sure that this is the problem in your case though. Can you provide some more details what you are doing exactly? Are you aggregating by time (for the keyBy you mention no windowing, but then you mention windowAll)? What kind of aggregation are you doing? If possible, feel free to share some code. > Since windowsAll() could be set parallelism, so I try to use key > selector to use field hash but not value, that I hope it would > decrease the number of the keys, but the flink throws key out-of-range > exception. How to use key selector in correct way? Can you paste the exact Exception you use? I think this might indicate that you don't correctly extract the key from your record, e.g. you extract a different key on sender and receiver. I'm sure we can figure this out after you provide more context. :-) – Ufuk |
Does keyby() on field generate the same number of key as the number of
uniqueness of the field? For example, if the field is valued in range {"a", "b", "c"}, then the number of keys is 3, correct? The field in my case has half of million uniqueness (ip addresses), so keyby() on field following with timeWindow() would generate half of million partitions? If I use key selector instead, e.g. .keyBy(new KeySelector<MyEvent, Long>() { public Long getKey(MyEvent ev) { return ev.hashCode() % 137L; } }) Then the number of partitions could be limited within 137, correct? 2017-12-28 17:33 GMT+08:00 Ufuk Celebi <[hidden email]>: > Hey Jinhua, > > On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <[hidden email]> wrote: >> The keyby() upon the field would generate unique key as the field >> value, so if the number of the uniqueness is huge, flink would have >> trouble both on cpu and memory. Is it considered in the design of >> flink? > > Yes, keyBy hash partitions the data across the nodes of your Flink > application and thus you can easily scale your application up if you > need more processing power. > > I'm not sure that this is the problem in your case though. Can you > provide some more details what you are doing exactly? Are you > aggregating by time (for the keyBy you mention no windowing, but then > you mention windowAll)? What kind of aggregation are you doing? If > possible, feel free to share some code. > >> Since windowsAll() could be set parallelism, so I try to use key >> selector to use field hash but not value, that I hope it would >> decrease the number of the keys, but the flink throws key out-of-range >> exception. How to use key selector in correct way? > > Can you paste the exact Exception you use? I think this might indicate > that you don't correctly extract the key from your record, e.g. you > extract a different key on sender and receiver. > > I'm sure we can figure this out after you provide more context. :-) > > – Ufuk |
In reply to this post by Ufuk Celebi
It's very strange, when I change the key selector to use random key,
the jvm reports oom. .keyBy(new KeySelector<MyEvent, Integer>() { public Integer getKey(MyEvent ev) { return ThreadLocalRandom.current().nextInt(1, 100);} }) Caused by: java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469) at com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230) at com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144) at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157) at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) Could anybody explain the internal of keyby()? 2017-12-28 17:33 GMT+08:00 Ufuk Celebi <[hidden email]>: > Hey Jinhua, > > On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <[hidden email]> wrote: >> The keyby() upon the field would generate unique key as the field >> value, so if the number of the uniqueness is huge, flink would have >> trouble both on cpu and memory. Is it considered in the design of >> flink? > > Yes, keyBy hash partitions the data across the nodes of your Flink > application and thus you can easily scale your application up if you > need more processing power. > > I'm not sure that this is the problem in your case though. Can you > provide some more details what you are doing exactly? Are you > aggregating by time (for the keyBy you mention no windowing, but then > you mention windowAll)? What kind of aggregation are you doing? If > possible, feel free to share some code. > >> Since windowsAll() could be set parallelism, so I try to use key >> selector to use field hash but not value, that I hope it would >> decrease the number of the keys, but the flink throws key out-of-range >> exception. How to use key selector in correct way? > > Can you paste the exact Exception you use? I think this might indicate > that you don't correctly extract the key from your record, e.g. you > extract a different key on sender and receiver. > > I'm sure we can figure this out after you provide more context. :-) > > – Ufuk |
I misuse the key selector. I checked the doc and found it must return
deterministic key, so using random is wrong, but I still could not understand why it would cause oom. 2017-12-28 21:57 GMT+08:00 Jinhua Luo <[hidden email]>: > It's very strange, when I change the key selector to use random key, > the jvm reports oom. > > .keyBy(new KeySelector<MyEvent, Integer>() { > public Integer getKey(MyEvent ev) { return > ThreadLocalRandom.current().nextInt(1, 100);} > }) > > Caused by: java.lang.OutOfMemoryError: Java heap space > at com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469) > at com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230) > at com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144) > at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818) > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) > at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157) > at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) > at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175) > at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) > at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > > Could anybody explain the internal of keyby()? > > 2017-12-28 17:33 GMT+08:00 Ufuk Celebi <[hidden email]>: >> Hey Jinhua, >> >> On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <[hidden email]> wrote: >>> The keyby() upon the field would generate unique key as the field >>> value, so if the number of the uniqueness is huge, flink would have >>> trouble both on cpu and memory. Is it considered in the design of >>> flink? >> >> Yes, keyBy hash partitions the data across the nodes of your Flink >> application and thus you can easily scale your application up if you >> need more processing power. >> >> I'm not sure that this is the problem in your case though. Can you >> provide some more details what you are doing exactly? Are you >> aggregating by time (for the keyBy you mention no windowing, but then >> you mention windowAll)? What kind of aggregation are you doing? If >> possible, feel free to share some code. >> >>> Since windowsAll() could be set parallelism, so I try to use key >>> selector to use field hash but not value, that I hope it would >>> decrease the number of the keys, but the flink throws key out-of-range >>> exception. How to use key selector in correct way? >> >> Can you paste the exact Exception you use? I think this might indicate >> that you don't correctly extract the key from your record, e.g. you >> extract a different key on sender and receiver. >> >> I'm sure we can figure this out after you provide more context. :-) >> >> – Ufuk |
I take time to read some source codes about the keyed stream
windowing, and I make below understanding: a) the keyed stream would be split and dispatched to downstream tasks in hash manner, and the hash base is the parallelism of the downstream operator: See org.apache.flink.runtime.state.KeyGroupRangeAssignment.computeKeyGroupForKeyHash(int, int): MathUtils.murmurHash(keyHash) % maxParallelism; That's what the doc said "hash partitioning". So the compiled execution graph already determines whose operator instance receive which key groups. b) with windowing, the key is used to index window states, so the window function would receive the deserialized value from its corresponding window state of some key. b.1) The element would be added into the state first: See org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(StreamRecord<IN>): windowState.add(element.getValue()); b.2) when the trigger fires the window, the value would be deserialized from the keyed state: ACC contents = windowState.get(); emitWindowContents(actualWindow, contents); For rocksdb backend, each input element would be taken back and forth from the disk in the processing. flink's keyed stream has the same functionality as storm's field grouping, and more complicated. Am I correct? But I still could not understand why keyby() stops flink from returning expected results. Let me explain my case more: I use kafka data source, which collects log lines of log files from tens of machines. The log line is in json format, which contains the "ip" field, the ip address of the user, so it could be valued in million of ip addresses of the Internet. The stream processing is expected to result in ip aggregation in {1 hour, 1 min} sliding window. If I use keyBy("ip"), then at first minutes, the flink could give me correct aggregation results, but soon later, no results produced, and flink seems busy doing something forever. I doubt if keyby() could handle huge keys like this case, and when I remove keyby().window().fold() and use windowAll().fold() instead (the latter fold operator uses hashmap to aggregate ip by itself), flink works. But as known, the windowAll() is not scale-able. Could flink developers help me on this topic, I prefer flink and I believe flink is one of best stream processing frameworks, but I am really frustrated that flink could be fulfill its feature just like the doc said. Thank you all. 2017-12-29 17:42 GMT+08:00 Jinhua Luo <[hidden email]>: > I misuse the key selector. I checked the doc and found it must return > deterministic key, so using random is wrong, but I still could not > understand why it would cause oom. > > > > 2017-12-28 21:57 GMT+08:00 Jinhua Luo <[hidden email]>: >> It's very strange, when I change the key selector to use random key, >> the jvm reports oom. >> >> .keyBy(new KeySelector<MyEvent, Integer>() { >> public Integer getKey(MyEvent ev) { return >> ThreadLocalRandom.current().nextInt(1, 100);} >> }) >> >> Caused by: java.lang.OutOfMemoryError: Java heap space >> at com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469) >> at com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230) >> at com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144) >> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818) >> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) >> at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157) >> at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) >> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) >> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175) >> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239) >> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) >> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) >> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) >> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) >> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) >> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >> >> Could anybody explain the internal of keyby()? >> >> 2017-12-28 17:33 GMT+08:00 Ufuk Celebi <[hidden email]>: >>> Hey Jinhua, >>> >>> On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <[hidden email]> wrote: >>>> The keyby() upon the field would generate unique key as the field >>>> value, so if the number of the uniqueness is huge, flink would have >>>> trouble both on cpu and memory. Is it considered in the design of >>>> flink? >>> >>> Yes, keyBy hash partitions the data across the nodes of your Flink >>> application and thus you can easily scale your application up if you >>> need more processing power. >>> >>> I'm not sure that this is the problem in your case though. Can you >>> provide some more details what you are doing exactly? Are you >>> aggregating by time (for the keyBy you mention no windowing, but then >>> you mention windowAll)? What kind of aggregation are you doing? If >>> possible, feel free to share some code. >>> >>>> Since windowsAll() could be set parallelism, so I try to use key >>>> selector to use field hash but not value, that I hope it would >>>> decrease the number of the keys, but the flink throws key out-of-range >>>> exception. How to use key selector in correct way? >>> >>> Can you paste the exact Exception you use? I think this might indicate >>> that you don't correctly extract the key from your record, e.g. you >>> extract a different key on sender and receiver. >>> >>> I'm sure we can figure this out after you provide more context. :-) >>> >>> – Ufuk |
> but soon later, no results produced, and flink seems busy doing something forever. Jinhua, don't know if you have checked these things. if not, maybe worth a look. have you tried to do a thread dump? How is the GC pause? do you see flink restart? check the exception tab in Flink web UI for your job. On Sun, Dec 31, 2017 at 6:20 AM, Jinhua Luo <[hidden email]> wrote: I take time to read some source codes about the keyed stream |
I checked the logs, but no information indicates what happens.
In fact, in the same app, there is another stream, but its kafka source is low traffic, and I aggregate some field of that source too, and flink gives correct results continuously. So I doubt if keyby() could not handle high traffic well (which affects the number of keys in the key partitions). 2018-01-01 2:04 GMT+08:00 Steven Wu <[hidden email]>: >> but soon later, no results produced, and flink seems busy doing something >> forever. > > Jinhua, don't know if you have checked these things. if not, maybe worth a > look. > > have you tried to do a thread dump? > How is the GC pause? > do you see flink restart? check the exception tab in Flink web UI for your > job. > > > > On Sun, Dec 31, 2017 at 6:20 AM, Jinhua Luo <[hidden email]> wrote: >> >> I take time to read some source codes about the keyed stream >> windowing, and I make below understanding: >> >> a) the keyed stream would be split and dispatched to downstream tasks >> in hash manner, and the hash base is the parallelism of the downstream >> operator: >> >> See >> org.apache.flink.runtime.state.KeyGroupRangeAssignment.computeKeyGroupForKeyHash(int, >> int): >> MathUtils.murmurHash(keyHash) % maxParallelism; >> >> That's what the doc said "hash partitioning". >> >> So the compiled execution graph already determines whose operator >> instance receive which key groups. >> >> b) with windowing, the key is used to index window states, so the >> window function would receive the deserialized value from its >> corresponding window state of some key. >> >> b.1) The element would be added into the state first: >> >> See >> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(StreamRecord<IN>): >> windowState.add(element.getValue()); >> >> b.2) when the trigger fires the window, the value would be >> deserialized from the keyed state: >> >> ACC contents = windowState.get(); >> emitWindowContents(actualWindow, contents); >> >> For rocksdb backend, each input element would be taken back and forth >> from the disk in the processing. >> >> flink's keyed stream has the same functionality as storm's field >> grouping, and more complicated. >> >> Am I correct? >> >> >> But I still could not understand why keyby() stops flink from >> returning expected results. >> >> Let me explain my case more: >> I use kafka data source, which collects log lines of log files from >> tens of machines. >> The log line is in json format, which contains the "ip" field, the ip >> address of the user, so it could be valued in million of ip addresses >> of the Internet. >> The stream processing is expected to result in ip aggregation in {1 >> hour, 1 min} sliding window. >> >> If I use keyBy("ip"), then at first minutes, the flink could give me >> correct aggregation results, but soon later, no results produced, and >> flink seems busy doing something forever. >> >> I doubt if keyby() could handle huge keys like this case, and when I >> remove keyby().window().fold() and use windowAll().fold() instead (the >> latter fold operator uses hashmap to aggregate ip by itself), flink >> works. But as known, the windowAll() is not scale-able. >> >> Could flink developers help me on this topic, I prefer flink and I >> believe flink is one of best stream processing frameworks, but I am >> really frustrated that flink could be fulfill its feature just like >> the doc said. >> >> Thank you all. >> >> >> 2017-12-29 17:42 GMT+08:00 Jinhua Luo <[hidden email]>: >> > I misuse the key selector. I checked the doc and found it must return >> > deterministic key, so using random is wrong, but I still could not >> > understand why it would cause oom. >> > >> > >> > >> > 2017-12-28 21:57 GMT+08:00 Jinhua Luo <[hidden email]>: >> >> It's very strange, when I change the key selector to use random key, >> >> the jvm reports oom. >> >> >> >> .keyBy(new KeySelector<MyEvent, Integer>() { >> >> public Integer getKey(MyEvent ev) { return >> >> ThreadLocalRandom.current().nextInt(1, 100);} >> >> }) >> >> >> >> Caused by: java.lang.OutOfMemoryError: Java heap space >> >> at >> >> com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469) >> >> at >> >> com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230) >> >> at >> >> com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144) >> >> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818) >> >> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) >> >> at >> >> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157) >> >> at >> >> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) >> >> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) >> >> at >> >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175) >> >> at >> >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239) >> >> at >> >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) >> >> at >> >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) >> >> at >> >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) >> >> at >> >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) >> >> at >> >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) >> >> at >> >> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >> >> >> >> Could anybody explain the internal of keyby()? >> >> >> >> 2017-12-28 17:33 GMT+08:00 Ufuk Celebi <[hidden email]>: >> >>> Hey Jinhua, >> >>> >> >>> On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <[hidden email]> >> >>> wrote: >> >>>> The keyby() upon the field would generate unique key as the field >> >>>> value, so if the number of the uniqueness is huge, flink would have >> >>>> trouble both on cpu and memory. Is it considered in the design of >> >>>> flink? >> >>> >> >>> Yes, keyBy hash partitions the data across the nodes of your Flink >> >>> application and thus you can easily scale your application up if you >> >>> need more processing power. >> >>> >> >>> I'm not sure that this is the problem in your case though. Can you >> >>> provide some more details what you are doing exactly? Are you >> >>> aggregating by time (for the keyBy you mention no windowing, but then >> >>> you mention windowAll)? What kind of aggregation are you doing? If >> >>> possible, feel free to share some code. >> >>> >> >>>> Since windowsAll() could be set parallelism, so I try to use key >> >>>> selector to use field hash but not value, that I hope it would >> >>>> decrease the number of the keys, but the flink throws key >> >>>> out-of-range >> >>>> exception. How to use key selector in correct way? >> >>> >> >>> Can you paste the exact Exception you use? I think this might indicate >> >>> that you don't correctly extract the key from your record, e.g. you >> >>> extract a different key on sender and receiver. >> >>> >> >>> I'm sure we can figure this out after you provide more context. :-) >> >>> >> >>> – Ufuk > > |
Hi Jinhua,
did you check the key group assignments? What is the distribution of "MathUtils.murmurHash(keyHash) % maxParallelism" on a sample of your data? This also depends on the hashCode on the output of your KeySelector. keyBy should handle high traffic well, but it is designed for key spaces with thousands or millions of values. If this is not the case, you need to introduce some more artifical key to spread the load more evenly. Regarding your OutOfMemoryError: I think you producing elements much faster than the following operators after keyBy process/discard the elements. Can you explain us your job in more detail? Are you using event-time? How do you aggregate elements of the windows? Regards, Timo Am 1/1/18 um 6:00 AM schrieb Jinhua Luo: > I checked the logs, but no information indicates what happens. > > In fact, in the same app, there is another stream, but its kafka > source is low traffic, and I aggregate some field of that source too, > and flink gives correct results continuously. > So I doubt if keyby() could not handle high traffic well (which > affects the number of keys in the key partitions). > > 2018-01-01 2:04 GMT+08:00 Steven Wu <[hidden email]>: >>> but soon later, no results produced, and flink seems busy doing something >>> forever. >> Jinhua, don't know if you have checked these things. if not, maybe worth a >> look. >> >> have you tried to do a thread dump? >> How is the GC pause? >> do you see flink restart? check the exception tab in Flink web UI for your >> job. >> >> >> >> On Sun, Dec 31, 2017 at 6:20 AM, Jinhua Luo <[hidden email]> wrote: >>> I take time to read some source codes about the keyed stream >>> windowing, and I make below understanding: >>> >>> a) the keyed stream would be split and dispatched to downstream tasks >>> in hash manner, and the hash base is the parallelism of the downstream >>> operator: >>> >>> See >>> org.apache.flink.runtime.state.KeyGroupRangeAssignment.computeKeyGroupForKeyHash(int, >>> int): >>> MathUtils.murmurHash(keyHash) % maxParallelism; >>> >>> That's what the doc said "hash partitioning". >>> >>> So the compiled execution graph already determines whose operator >>> instance receive which key groups. >>> >>> b) with windowing, the key is used to index window states, so the >>> window function would receive the deserialized value from its >>> corresponding window state of some key. >>> >>> b.1) The element would be added into the state first: >>> >>> See >>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(StreamRecord<IN>): >>> windowState.add(element.getValue()); >>> >>> b.2) when the trigger fires the window, the value would be >>> deserialized from the keyed state: >>> >>> ACC contents = windowState.get(); >>> emitWindowContents(actualWindow, contents); >>> >>> For rocksdb backend, each input element would be taken back and forth >>> from the disk in the processing. >>> >>> flink's keyed stream has the same functionality as storm's field >>> grouping, and more complicated. >>> >>> Am I correct? >>> >>> >>> But I still could not understand why keyby() stops flink from >>> returning expected results. >>> >>> Let me explain my case more: >>> I use kafka data source, which collects log lines of log files from >>> tens of machines. >>> The log line is in json format, which contains the "ip" field, the ip >>> address of the user, so it could be valued in million of ip addresses >>> of the Internet. >>> The stream processing is expected to result in ip aggregation in {1 >>> hour, 1 min} sliding window. >>> >>> If I use keyBy("ip"), then at first minutes, the flink could give me >>> correct aggregation results, but soon later, no results produced, and >>> flink seems busy doing something forever. >>> >>> I doubt if keyby() could handle huge keys like this case, and when I >>> remove keyby().window().fold() and use windowAll().fold() instead (the >>> latter fold operator uses hashmap to aggregate ip by itself), flink >>> works. But as known, the windowAll() is not scale-able. >>> >>> Could flink developers help me on this topic, I prefer flink and I >>> believe flink is one of best stream processing frameworks, but I am >>> really frustrated that flink could be fulfill its feature just like >>> the doc said. >>> >>> Thank you all. >>> >>> >>> 2017-12-29 17:42 GMT+08:00 Jinhua Luo <[hidden email]>: >>>> I misuse the key selector. I checked the doc and found it must return >>>> deterministic key, so using random is wrong, but I still could not >>>> understand why it would cause oom. >>>> >>>> >>>> >>>> 2017-12-28 21:57 GMT+08:00 Jinhua Luo <[hidden email]>: >>>>> It's very strange, when I change the key selector to use random key, >>>>> the jvm reports oom. >>>>> >>>>> .keyBy(new KeySelector<MyEvent, Integer>() { >>>>> public Integer getKey(MyEvent ev) { return >>>>> ThreadLocalRandom.current().nextInt(1, 100);} >>>>> }) >>>>> >>>>> Caused by: java.lang.OutOfMemoryError: Java heap space >>>>> at >>>>> com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469) >>>>> at >>>>> com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230) >>>>> at >>>>> com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144) >>>>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818) >>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) >>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) >>>>> at >>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >>>>> >>>>> Could anybody explain the internal of keyby()? >>>>> >>>>> 2017-12-28 17:33 GMT+08:00 Ufuk Celebi <[hidden email]>: >>>>>> Hey Jinhua, >>>>>> >>>>>> On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <[hidden email]> >>>>>> wrote: >>>>>>> The keyby() upon the field would generate unique key as the field >>>>>>> value, so if the number of the uniqueness is huge, flink would have >>>>>>> trouble both on cpu and memory. Is it considered in the design of >>>>>>> flink? >>>>>> Yes, keyBy hash partitions the data across the nodes of your Flink >>>>>> application and thus you can easily scale your application up if you >>>>>> need more processing power. >>>>>> >>>>>> I'm not sure that this is the problem in your case though. Can you >>>>>> provide some more details what you are doing exactly? Are you >>>>>> aggregating by time (for the keyBy you mention no windowing, but then >>>>>> you mention windowAll)? What kind of aggregation are you doing? If >>>>>> possible, feel free to share some code. >>>>>> >>>>>>> Since windowsAll() could be set parallelism, so I try to use key >>>>>>> selector to use field hash but not value, that I hope it would >>>>>>> decrease the number of the keys, but the flink throws key >>>>>>> out-of-range >>>>>>> exception. How to use key selector in correct way? >>>>>> Can you paste the exact Exception you use? I think this might indicate >>>>>> that you don't correctly extract the key from your record, e.g. you >>>>>> extract a different key on sender and receiver. >>>>>> >>>>>> I'm sure we can figure this out after you provide more context. :-) >>>>>> >>>>>> – Ufuk >> |
Side note: Sliding windows can be quite expensive if the slide is small compared to the size. Flink will treat each "slide" as a separate window, so in your case you will get 60 * num_keys windows, which can become quite big.
Best, Aljoscha > On 2. Jan 2018, at 17:41, Timo Walther <[hidden email]> wrote: > > Hi Jinhua, > > did you check the key group assignments? What is the distribution of "MathUtils.murmurHash(keyHash) % maxParallelism" on a sample of your data? This also depends on the hashCode on the output of your KeySelector. > > keyBy should handle high traffic well, but it is designed for key spaces with thousands or millions of values. If this is not the case, you need to introduce some more artifical key to spread the load more evenly. > > Regarding your OutOfMemoryError: I think you producing elements much faster than the following operators after keyBy process/discard the elements. Can you explain us your job in more detail? Are you using event-time? How do you aggregate elements of the windows? > > Regards, > Timo > > > > Am 1/1/18 um 6:00 AM schrieb Jinhua Luo: >> I checked the logs, but no information indicates what happens. >> >> In fact, in the same app, there is another stream, but its kafka >> source is low traffic, and I aggregate some field of that source too, >> and flink gives correct results continuously. >> So I doubt if keyby() could not handle high traffic well (which >> affects the number of keys in the key partitions). >> >> 2018-01-01 2:04 GMT+08:00 Steven Wu <[hidden email]>: >>>> but soon later, no results produced, and flink seems busy doing something >>>> forever. >>> Jinhua, don't know if you have checked these things. if not, maybe worth a >>> look. >>> >>> have you tried to do a thread dump? >>> How is the GC pause? >>> do you see flink restart? check the exception tab in Flink web UI for your >>> job. >>> >>> >>> >>> On Sun, Dec 31, 2017 at 6:20 AM, Jinhua Luo <[hidden email]> wrote: >>>> I take time to read some source codes about the keyed stream >>>> windowing, and I make below understanding: >>>> >>>> a) the keyed stream would be split and dispatched to downstream tasks >>>> in hash manner, and the hash base is the parallelism of the downstream >>>> operator: >>>> >>>> See >>>> org.apache.flink.runtime.state.KeyGroupRangeAssignment.computeKeyGroupForKeyHash(int, >>>> int): >>>> MathUtils.murmurHash(keyHash) % maxParallelism; >>>> >>>> That's what the doc said "hash partitioning". >>>> >>>> So the compiled execution graph already determines whose operator >>>> instance receive which key groups. >>>> >>>> b) with windowing, the key is used to index window states, so the >>>> window function would receive the deserialized value from its >>>> corresponding window state of some key. >>>> >>>> b.1) The element would be added into the state first: >>>> >>>> See >>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(StreamRecord<IN>): >>>> windowState.add(element.getValue()); >>>> >>>> b.2) when the trigger fires the window, the value would be >>>> deserialized from the keyed state: >>>> >>>> ACC contents = windowState.get(); >>>> emitWindowContents(actualWindow, contents); >>>> >>>> For rocksdb backend, each input element would be taken back and forth >>>> from the disk in the processing. >>>> >>>> flink's keyed stream has the same functionality as storm's field >>>> grouping, and more complicated. >>>> >>>> Am I correct? >>>> >>>> >>>> But I still could not understand why keyby() stops flink from >>>> returning expected results. >>>> >>>> Let me explain my case more: >>>> I use kafka data source, which collects log lines of log files from >>>> tens of machines. >>>> The log line is in json format, which contains the "ip" field, the ip >>>> address of the user, so it could be valued in million of ip addresses >>>> of the Internet. >>>> The stream processing is expected to result in ip aggregation in {1 >>>> hour, 1 min} sliding window. >>>> >>>> If I use keyBy("ip"), then at first minutes, the flink could give me >>>> correct aggregation results, but soon later, no results produced, and >>>> flink seems busy doing something forever. >>>> >>>> I doubt if keyby() could handle huge keys like this case, and when I >>>> remove keyby().window().fold() and use windowAll().fold() instead (the >>>> latter fold operator uses hashmap to aggregate ip by itself), flink >>>> works. But as known, the windowAll() is not scale-able. >>>> >>>> Could flink developers help me on this topic, I prefer flink and I >>>> believe flink is one of best stream processing frameworks, but I am >>>> really frustrated that flink could be fulfill its feature just like >>>> the doc said. >>>> >>>> Thank you all. >>>> >>>> >>>> 2017-12-29 17:42 GMT+08:00 Jinhua Luo <[hidden email]>: >>>>> I misuse the key selector. I checked the doc and found it must return >>>>> deterministic key, so using random is wrong, but I still could not >>>>> understand why it would cause oom. >>>>> >>>>> >>>>> >>>>> 2017-12-28 21:57 GMT+08:00 Jinhua Luo <[hidden email]>: >>>>>> It's very strange, when I change the key selector to use random key, >>>>>> the jvm reports oom. >>>>>> >>>>>> .keyBy(new KeySelector<MyEvent, Integer>() { >>>>>> public Integer getKey(MyEvent ev) { return >>>>>> ThreadLocalRandom.current().nextInt(1, 100);} >>>>>> }) >>>>>> >>>>>> Caused by: java.lang.OutOfMemoryError: Java heap space >>>>>> at >>>>>> com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469) >>>>>> at >>>>>> com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230) >>>>>> at >>>>>> com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144) >>>>>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818) >>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) >>>>>> at >>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157) >>>>>> at >>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) >>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >>>>>> >>>>>> Could anybody explain the internal of keyby()? >>>>>> >>>>>> 2017-12-28 17:33 GMT+08:00 Ufuk Celebi <[hidden email]>: >>>>>>> Hey Jinhua, >>>>>>> >>>>>>> On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <[hidden email]> >>>>>>> wrote: >>>>>>>> The keyby() upon the field would generate unique key as the field >>>>>>>> value, so if the number of the uniqueness is huge, flink would have >>>>>>>> trouble both on cpu and memory. Is it considered in the design of >>>>>>>> flink? >>>>>>> Yes, keyBy hash partitions the data across the nodes of your Flink >>>>>>> application and thus you can easily scale your application up if you >>>>>>> need more processing power. >>>>>>> >>>>>>> I'm not sure that this is the problem in your case though. Can you >>>>>>> provide some more details what you are doing exactly? Are you >>>>>>> aggregating by time (for the keyBy you mention no windowing, but then >>>>>>> you mention windowAll)? What kind of aggregation are you doing? If >>>>>>> possible, feel free to share some code. >>>>>>> >>>>>>>> Since windowsAll() could be set parallelism, so I try to use key >>>>>>>> selector to use field hash but not value, that I hope it would >>>>>>>> decrease the number of the keys, but the flink throws key >>>>>>>> out-of-range >>>>>>>> exception. How to use key selector in correct way? >>>>>>> Can you paste the exact Exception you use? I think this might indicate >>>>>>> that you don't correctly extract the key from your record, e.g. you >>>>>>> extract a different key on sender and receiver. >>>>>>> >>>>>>> I'm sure we can figure this out after you provide more context. :-) >>>>>>> >>>>>>> – Ufuk >>> > |
In reply to this post by Timo Walther
The app is very simple, please see the code snippet:
https://gist.github.com/kingluo/e06381d930f34600e42b050fef6baedd I rerun the app, but it's weird that it can continuously produce the results now. But it have two new issues: a) memory usage too high, it uses about 8 GB heap memory! why? Because the traffic is too high? b) the redis async io is likely to be timedout and fails the whole pipeline. 2018-01-03 0:41 GMT+08:00 Timo Walther <[hidden email]>: > Hi Jinhua, > > did you check the key group assignments? What is the distribution of > "MathUtils.murmurHash(keyHash) % maxParallelism" on a sample of your data? > This also depends on the hashCode on the output of your KeySelector. > > keyBy should handle high traffic well, but it is designed for key spaces > with thousands or millions of values. If this is not the case, you need to > introduce some more artifical key to spread the load more evenly. > > Regarding your OutOfMemoryError: I think you producing elements much faster > than the following operators after keyBy process/discard the elements. Can > you explain us your job in more detail? Are you using event-time? How do you > aggregate elements of the windows? > > Regards, > Timo > > > > Am 1/1/18 um 6:00 AM schrieb Jinhua Luo: > >> I checked the logs, but no information indicates what happens. >> >> In fact, in the same app, there is another stream, but its kafka >> source is low traffic, and I aggregate some field of that source too, >> and flink gives correct results continuously. >> So I doubt if keyby() could not handle high traffic well (which >> affects the number of keys in the key partitions). >> >> 2018-01-01 2:04 GMT+08:00 Steven Wu <[hidden email]>: >>>> >>>> but soon later, no results produced, and flink seems busy doing >>>> something >>>> forever. >>> >>> Jinhua, don't know if you have checked these things. if not, maybe worth >>> a >>> look. >>> >>> have you tried to do a thread dump? >>> How is the GC pause? >>> do you see flink restart? check the exception tab in Flink web UI for >>> your >>> job. >>> >>> >>> >>> On Sun, Dec 31, 2017 at 6:20 AM, Jinhua Luo <[hidden email]> wrote: >>>> >>>> I take time to read some source codes about the keyed stream >>>> windowing, and I make below understanding: >>>> >>>> a) the keyed stream would be split and dispatched to downstream tasks >>>> in hash manner, and the hash base is the parallelism of the downstream >>>> operator: >>>> >>>> See >>>> >>>> org.apache.flink.runtime.state.KeyGroupRangeAssignment.computeKeyGroupForKeyHash(int, >>>> int): >>>> MathUtils.murmurHash(keyHash) % maxParallelism; >>>> >>>> That's what the doc said "hash partitioning". >>>> >>>> So the compiled execution graph already determines whose operator >>>> instance receive which key groups. >>>> >>>> b) with windowing, the key is used to index window states, so the >>>> window function would receive the deserialized value from its >>>> corresponding window state of some key. >>>> >>>> b.1) The element would be added into the state first: >>>> >>>> See >>>> >>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(StreamRecord<IN>): >>>> windowState.add(element.getValue()); >>>> >>>> b.2) when the trigger fires the window, the value would be >>>> deserialized from the keyed state: >>>> >>>> ACC contents = windowState.get(); >>>> emitWindowContents(actualWindow, contents); >>>> >>>> For rocksdb backend, each input element would be taken back and forth >>>> from the disk in the processing. >>>> >>>> flink's keyed stream has the same functionality as storm's field >>>> grouping, and more complicated. >>>> >>>> Am I correct? >>>> >>>> >>>> But I still could not understand why keyby() stops flink from >>>> returning expected results. >>>> >>>> Let me explain my case more: >>>> I use kafka data source, which collects log lines of log files from >>>> tens of machines. >>>> The log line is in json format, which contains the "ip" field, the ip >>>> address of the user, so it could be valued in million of ip addresses >>>> of the Internet. >>>> The stream processing is expected to result in ip aggregation in {1 >>>> hour, 1 min} sliding window. >>>> >>>> If I use keyBy("ip"), then at first minutes, the flink could give me >>>> correct aggregation results, but soon later, no results produced, and >>>> flink seems busy doing something forever. >>>> >>>> I doubt if keyby() could handle huge keys like this case, and when I >>>> remove keyby().window().fold() and use windowAll().fold() instead (the >>>> latter fold operator uses hashmap to aggregate ip by itself), flink >>>> works. But as known, the windowAll() is not scale-able. >>>> >>>> Could flink developers help me on this topic, I prefer flink and I >>>> believe flink is one of best stream processing frameworks, but I am >>>> really frustrated that flink could be fulfill its feature just like >>>> the doc said. >>>> >>>> Thank you all. >>>> >>>> >>>> 2017-12-29 17:42 GMT+08:00 Jinhua Luo <[hidden email]>: >>>>> >>>>> I misuse the key selector. I checked the doc and found it must return >>>>> deterministic key, so using random is wrong, but I still could not >>>>> understand why it would cause oom. >>>>> >>>>> >>>>> >>>>> 2017-12-28 21:57 GMT+08:00 Jinhua Luo <[hidden email]>: >>>>>> >>>>>> It's very strange, when I change the key selector to use random key, >>>>>> the jvm reports oom. >>>>>> >>>>>> .keyBy(new KeySelector<MyEvent, Integer>() { >>>>>> public Integer getKey(MyEvent ev) { return >>>>>> ThreadLocalRandom.current().nextInt(1, 100);} >>>>>> }) >>>>>> >>>>>> Caused by: java.lang.OutOfMemoryError: Java heap space >>>>>> at >>>>>> >>>>>> com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469) >>>>>> at >>>>>> com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230) >>>>>> at >>>>>> com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144) >>>>>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818) >>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) >>>>>> at >>>>>> >>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157) >>>>>> at >>>>>> >>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) >>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) >>>>>> at >>>>>> >>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175) >>>>>> at >>>>>> >>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239) >>>>>> at >>>>>> >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) >>>>>> at >>>>>> >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) >>>>>> at >>>>>> >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) >>>>>> at >>>>>> >>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) >>>>>> at >>>>>> >>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) >>>>>> at >>>>>> >>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >>>>>> >>>>>> Could anybody explain the internal of keyby()? >>>>>> >>>>>> 2017-12-28 17:33 GMT+08:00 Ufuk Celebi <[hidden email]>: >>>>>>> >>>>>>> Hey Jinhua, >>>>>>> >>>>>>> On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <[hidden email]> >>>>>>> wrote: >>>>>>>> >>>>>>>> The keyby() upon the field would generate unique key as the field >>>>>>>> value, so if the number of the uniqueness is huge, flink would have >>>>>>>> trouble both on cpu and memory. Is it considered in the design of >>>>>>>> flink? >>>>>>> >>>>>>> Yes, keyBy hash partitions the data across the nodes of your Flink >>>>>>> application and thus you can easily scale your application up if you >>>>>>> need more processing power. >>>>>>> >>>>>>> I'm not sure that this is the problem in your case though. Can you >>>>>>> provide some more details what you are doing exactly? Are you >>>>>>> aggregating by time (for the keyBy you mention no windowing, but then >>>>>>> you mention windowAll)? What kind of aggregation are you doing? If >>>>>>> possible, feel free to share some code. >>>>>>> >>>>>>>> Since windowsAll() could be set parallelism, so I try to use key >>>>>>>> selector to use field hash but not value, that I hope it would >>>>>>>> decrease the number of the keys, but the flink throws key >>>>>>>> out-of-range >>>>>>>> exception. How to use key selector in correct way? >>>>>>> >>>>>>> Can you paste the exact Exception you use? I think this might >>>>>>> indicate >>>>>>> that you don't correctly extract the key from your record, e.g. you >>>>>>> extract a different key on sender and receiver. >>>>>>> >>>>>>> I'm sure we can figure this out after you provide more context. :-) >>>>>>> >>>>>>> – Ufuk >>> >>> > |
Memory usage should grow linearly with the number of windows you have active at any given time, the number of keys and the number of different window operations you have.
Regarding the async I/O writing to redis, I see that you give a capacity of 10000 which means that there will possibly be 10000 concurrent connections to Redis. This might be a bit to much so could you try reducing that to avoid timeouts? > On 4. Jan 2018, at 09:39, Jinhua Luo <[hidden email]> wrote: > > The app is very simple, please see the code snippet: > > https://gist.github.com/kingluo/e06381d930f34600e42b050fef6baedd > > > I rerun the app, but it's weird that it can continuously produce the > results now. > > But it have two new issues: > a) memory usage too high, it uses about 8 GB heap memory! why? Because > the traffic is too high? > b) the redis async io is likely to be timedout and fails the whole pipeline. > > > > 2018-01-03 0:41 GMT+08:00 Timo Walther <[hidden email]>: >> Hi Jinhua, >> >> did you check the key group assignments? What is the distribution of >> "MathUtils.murmurHash(keyHash) % maxParallelism" on a sample of your data? >> This also depends on the hashCode on the output of your KeySelector. >> >> keyBy should handle high traffic well, but it is designed for key spaces >> with thousands or millions of values. If this is not the case, you need to >> introduce some more artifical key to spread the load more evenly. >> >> Regarding your OutOfMemoryError: I think you producing elements much faster >> than the following operators after keyBy process/discard the elements. Can >> you explain us your job in more detail? Are you using event-time? How do you >> aggregate elements of the windows? >> >> Regards, >> Timo >> >> >> >> Am 1/1/18 um 6:00 AM schrieb Jinhua Luo: >> >>> I checked the logs, but no information indicates what happens. >>> >>> In fact, in the same app, there is another stream, but its kafka >>> source is low traffic, and I aggregate some field of that source too, >>> and flink gives correct results continuously. >>> So I doubt if keyby() could not handle high traffic well (which >>> affects the number of keys in the key partitions). >>> >>> 2018-01-01 2:04 GMT+08:00 Steven Wu <[hidden email]>: >>>>> >>>>> but soon later, no results produced, and flink seems busy doing >>>>> something >>>>> forever. >>>> >>>> Jinhua, don't know if you have checked these things. if not, maybe worth >>>> a >>>> look. >>>> >>>> have you tried to do a thread dump? >>>> How is the GC pause? >>>> do you see flink restart? check the exception tab in Flink web UI for >>>> your >>>> job. >>>> >>>> >>>> >>>> On Sun, Dec 31, 2017 at 6:20 AM, Jinhua Luo <[hidden email]> wrote: >>>>> >>>>> I take time to read some source codes about the keyed stream >>>>> windowing, and I make below understanding: >>>>> >>>>> a) the keyed stream would be split and dispatched to downstream tasks >>>>> in hash manner, and the hash base is the parallelism of the downstream >>>>> operator: >>>>> >>>>> See >>>>> >>>>> org.apache.flink.runtime.state.KeyGroupRangeAssignment.computeKeyGroupForKeyHash(int, >>>>> int): >>>>> MathUtils.murmurHash(keyHash) % maxParallelism; >>>>> >>>>> That's what the doc said "hash partitioning". >>>>> >>>>> So the compiled execution graph already determines whose operator >>>>> instance receive which key groups. >>>>> >>>>> b) with windowing, the key is used to index window states, so the >>>>> window function would receive the deserialized value from its >>>>> corresponding window state of some key. >>>>> >>>>> b.1) The element would be added into the state first: >>>>> >>>>> See >>>>> >>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(StreamRecord<IN>): >>>>> windowState.add(element.getValue()); >>>>> >>>>> b.2) when the trigger fires the window, the value would be >>>>> deserialized from the keyed state: >>>>> >>>>> ACC contents = windowState.get(); >>>>> emitWindowContents(actualWindow, contents); >>>>> >>>>> For rocksdb backend, each input element would be taken back and forth >>>>> from the disk in the processing. >>>>> >>>>> flink's keyed stream has the same functionality as storm's field >>>>> grouping, and more complicated. >>>>> >>>>> Am I correct? >>>>> >>>>> >>>>> But I still could not understand why keyby() stops flink from >>>>> returning expected results. >>>>> >>>>> Let me explain my case more: >>>>> I use kafka data source, which collects log lines of log files from >>>>> tens of machines. >>>>> The log line is in json format, which contains the "ip" field, the ip >>>>> address of the user, so it could be valued in million of ip addresses >>>>> of the Internet. >>>>> The stream processing is expected to result in ip aggregation in {1 >>>>> hour, 1 min} sliding window. >>>>> >>>>> If I use keyBy("ip"), then at first minutes, the flink could give me >>>>> correct aggregation results, but soon later, no results produced, and >>>>> flink seems busy doing something forever. >>>>> >>>>> I doubt if keyby() could handle huge keys like this case, and when I >>>>> remove keyby().window().fold() and use windowAll().fold() instead (the >>>>> latter fold operator uses hashmap to aggregate ip by itself), flink >>>>> works. But as known, the windowAll() is not scale-able. >>>>> >>>>> Could flink developers help me on this topic, I prefer flink and I >>>>> believe flink is one of best stream processing frameworks, but I am >>>>> really frustrated that flink could be fulfill its feature just like >>>>> the doc said. >>>>> >>>>> Thank you all. >>>>> >>>>> >>>>> 2017-12-29 17:42 GMT+08:00 Jinhua Luo <[hidden email]>: >>>>>> >>>>>> I misuse the key selector. I checked the doc and found it must return >>>>>> deterministic key, so using random is wrong, but I still could not >>>>>> understand why it would cause oom. >>>>>> >>>>>> >>>>>> >>>>>> 2017-12-28 21:57 GMT+08:00 Jinhua Luo <[hidden email]>: >>>>>>> >>>>>>> It's very strange, when I change the key selector to use random key, >>>>>>> the jvm reports oom. >>>>>>> >>>>>>> .keyBy(new KeySelector<MyEvent, Integer>() { >>>>>>> public Integer getKey(MyEvent ev) { return >>>>>>> ThreadLocalRandom.current().nextInt(1, 100);} >>>>>>> }) >>>>>>> >>>>>>> Caused by: java.lang.OutOfMemoryError: Java heap space >>>>>>> at >>>>>>> >>>>>>> com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144) >>>>>>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818) >>>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) >>>>>>> at >>>>>>> >>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157) >>>>>>> at >>>>>>> >>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) >>>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) >>>>>>> at >>>>>>> >>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175) >>>>>>> at >>>>>>> >>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239) >>>>>>> at >>>>>>> >>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) >>>>>>> at >>>>>>> >>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) >>>>>>> at >>>>>>> >>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) >>>>>>> at >>>>>>> >>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) >>>>>>> at >>>>>>> >>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) >>>>>>> at >>>>>>> >>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >>>>>>> >>>>>>> Could anybody explain the internal of keyby()? >>>>>>> >>>>>>> 2017-12-28 17:33 GMT+08:00 Ufuk Celebi <[hidden email]>: >>>>>>>> >>>>>>>> Hey Jinhua, >>>>>>>> >>>>>>>> On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <[hidden email]> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>> The keyby() upon the field would generate unique key as the field >>>>>>>>> value, so if the number of the uniqueness is huge, flink would have >>>>>>>>> trouble both on cpu and memory. Is it considered in the design of >>>>>>>>> flink? >>>>>>>> >>>>>>>> Yes, keyBy hash partitions the data across the nodes of your Flink >>>>>>>> application and thus you can easily scale your application up if you >>>>>>>> need more processing power. >>>>>>>> >>>>>>>> I'm not sure that this is the problem in your case though. Can you >>>>>>>> provide some more details what you are doing exactly? Are you >>>>>>>> aggregating by time (for the keyBy you mention no windowing, but then >>>>>>>> you mention windowAll)? What kind of aggregation are you doing? If >>>>>>>> possible, feel free to share some code. >>>>>>>> >>>>>>>>> Since windowsAll() could be set parallelism, so I try to use key >>>>>>>>> selector to use field hash but not value, that I hope it would >>>>>>>>> decrease the number of the keys, but the flink throws key >>>>>>>>> out-of-range >>>>>>>>> exception. How to use key selector in correct way? >>>>>>>> >>>>>>>> Can you paste the exact Exception you use? I think this might >>>>>>>> indicate >>>>>>>> that you don't correctly extract the key from your record, e.g. you >>>>>>>> extract a different key on sender and receiver. >>>>>>>> >>>>>>>> I'm sure we can figure this out after you provide more context. :-) >>>>>>>> >>>>>>>> – Ufuk >>>> >>>> >> |
2018-01-04 21:04 GMT+08:00 Aljoscha Krettek <[hidden email]>:
> Memory usage should grow linearly with the number of windows you have active at any given time, the number of keys and the number of different window operations you have. But the memory usage is still too much, especially when the incremental aggregation is used. > Regarding the async I/O writing to redis, I see that you give a capacity of 10000 which means that there will possibly be 10000 concurrent connections to Redis. This might be a bit to much so could you try reducing that to avoid timeouts? It's not related to that part. In fact, I commented the async io codes and test, the memory usage is almost the same. And, on the contrary, I need to increase the concurrency number, because I have totally millions of aggregation results to sent per min! If the number is low, it would trigger timeout (yes, even the timeout value is 30 seconds, I think it's related to the single connection model of lettuce lib). |
I mean the timeout should likely happens in the sending queue of the
redis lib if the concurrency number is low. ---------org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(StreamRecord<IN>)---- public void processElement(StreamRecord<IN> element) throws Exception { final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element); if (timeout > 0L) { // register a timeout for this AsyncStreamRecordBufferEntry long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime(); final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer( timeoutTimestamp, new ProcessingTimeCallback() { @Override public void onProcessingTime(long timestamp) throws Exception { streamRecordBufferEntry.completeExceptionally( new TimeoutException("Async function call has timed out.")); } }); // Cancel the timer once we've completed the stream record buffer entry. This will remove // the register trigger task streamRecordBufferEntry.onComplete( (StreamElementQueueEntry<Collection<OUT>> value) -> { timerFuture.cancel(true); }, executor); } addAsyncBufferEntry(streamRecordBufferEntry); userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry); } ------------- The timer would be set even the entry wait for the free queue slot. It seems a bug? Because if timeout happens before the addAsyncBufferEntry returns, and it would step into asyncInvoke anyways, but the "future" would be failed afterwards immediately. 2018-01-04 21:31 GMT+08:00 Jinhua Luo <[hidden email]>: > 2018-01-04 21:04 GMT+08:00 Aljoscha Krettek <[hidden email]>: >> Memory usage should grow linearly with the number of windows you have active at any given time, the number of keys and the number of different window operations you have. > > But the memory usage is still too much, especially when the > incremental aggregation is used. > >> Regarding the async I/O writing to redis, I see that you give a capacity of 10000 which means that there will possibly be 10000 concurrent connections to Redis. This might be a bit to much so could you try reducing that to avoid timeouts? > > It's not related to that part. In fact, I commented the async io codes > and test, the memory usage is almost the same. > > And, on the contrary, I need to increase the concurrency number, > because I have totally millions of aggregation results to sent per > min! > If the number is low, it would trigger timeout (yes, even the timeout > value is 30 seconds, I think it's related to the single connection > model of lettuce lib). |
Free forum by Nabble | Edit this page |