Hi, we get the following NullPointerException after ~50 minutes when running a streaming job with windowing and state that reads data from Kafka and writes the result to local FS.There are around 170 million messages to be processed, Flink 0.10.1 stops at ~8 million. Flink runs locally, started with the "start-cluster-streaming.sh" script. 12/01/2015 15:06:24 Job execution switched to status RUNNING. 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched to SCHEDULED 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched to DEPLOYING 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to SCHEDULED 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to DEPLOYING 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched to RUNNING 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to RUNNING 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to CANCELED 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) switched to FAILED java.lang.Exception at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632) Any ideas on what could cause this behaviour? |
Hello, I am not sure if this message was received on the user list, if so I apologies for duplicate messages I have the following scenario ·
Reading a fixed set DataStream<String>
fixedset = env.readtextFile(… ·
Reading a continuous stream of data
DataStream<String>
stream = …. I would need that for each event read from the continuous stream to make some operations onit and on the
fixedsettoghether I have tried something like final myObject.referenceStaticSet = fixedset; stream.map(new MapFunction<String, String>() { @Override public String map(String arg0) throws Exception {
//for example: final string2add = arg0; //the goal of below function would be to add the string2add to the fixedset myObject.referenceStaticSet = myObject.referenceStaticSet.flatMap(new FlatMapFunction<String, String>() {
@Override public void flatMap(String arg0, Collector<String> arg1) //for example adding to the fixed set also the string2add object: arg1.collect(string2add); } … } However, I get an exception (Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException:
) that object is not serializable (Object MyClass$3@a71081 not serializable
) Looking into this I see that the issues is that the DataStream<> is not serializable. What would be the solution to this issue? As I mentioned before, I would like that for each event from the continuous stream to use the initial fixed set, add the event to it and apply an operation. Stephan was mentioning at some point some possibility to create a DataSet and launch a batch processing while operating in stream mode– in case this is possible, can you give me a reference for it, because it
might be the good solution to use in case. I am thinking that I could keep the fixed set as a DataSet and as each new event comes, transform it into a dataset and then join with reference set and apply an operation Regards, Dr. Radu Tudoran Research Engineer IT R&D Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail:
[hidden email] Mobile: +49 15209084330 Telephone: +49 891588344173
HUAWEI TECHNOLOGIES Duesseldorf GmbH This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use
of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the
sender by phone or email immediately and delete it! From: Vieru, Mihail [mailto:[hidden email]]
Hi, we get the following NullPointerException after ~50 minutes when running a streaming job with windowing and state that reads data from Kafka and writes the result to local FS. There are around 170 million messages to be processed, Flink 0.10.1 stops at ~8 million. Flink runs locally, started with the "start-cluster-streaming.sh" script.
Any ideas on what could cause this behaviour? Best, Mihail |
In reply to this post by Vieru, Mihail
Hi Mihail, the issue is actually a bug in Kafka. We have a JIRA in Flink for this as well: https://issues.apache.org/jira/browse/FLINK-3067 Sadly, we haven't released a fix for it yet. Flink 0.10.2 will contain a fix. Since the kafka connector is not contained in the flink binary, you can just set the version in your maven pom file to 0.10-SNAPSHOT. Maven will then download the code planned for the 0.10-SNAPSHOT release. On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail <[hidden email]> wrote:
|
Hi, I think Robert meant to write setting the connector dependency to 1.0-SNAPSHOT. Cheers, Gyula Robert Metzger <[hidden email]> ezt írta (időpont: 2015. dec. 1., K, 17:10):
|
Hi Gyula, no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the "release-0.10" branch to Apache's maven snapshot repository. I don't think Mihail's code will run when he's compiling it against 1.0-SNAPSHOT. On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <[hidden email]> wrote:
|
I know this has been fixed already but, out of curiosity, could you
point me to the Kafka JIRA issue for this bug? From the Flink issue it looks like this is a Zookeeper version mismatch. On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <[hidden email]> wrote: > Hi Gyula, > > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the > "release-0.10" branch to Apache's maven snapshot repository. > > > I don't think Mihail's code will run when he's compiling it against > 1.0-SNAPSHOT. > > > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <[hidden email]> wrote: >> >> Hi, >> >> I think Robert meant to write setting the connector dependency to >> 1.0-SNAPSHOT. >> >> Cheers, >> Gyula >> >> Robert Metzger <[hidden email]> ezt írta (időpont: 2015. dec. 1., K, >> 17:10): >>> >>> Hi Mihail, >>> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink for this as >>> well: https://issues.apache.org/jira/browse/FLINK-3067 >>> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will contain a >>> fix. >>> >>> Since the kafka connector is not contained in the flink binary, you can >>> just set the version in your maven pom file to 0.10-SNAPSHOT. Maven will >>> then download the code planned for the 0.10-SNAPSHOT release. >>> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail <[hidden email]> >>> wrote: >>>> >>>> Hi, >>>> >>>> we get the following NullPointerException after ~50 minutes when running >>>> a streaming job with windowing and state that reads data from Kafka and >>>> writes the result to local FS. >>>> There are around 170 million messages to be processed, Flink 0.10.1 >>>> stops at ~8 million. >>>> Flink runs locally, started with the "start-cluster-streaming.sh" >>>> script. >>>> >>>> 12/01/2015 15:06:24 Job execution switched to status RUNNING. >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched >>>> to SCHEDULED >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched >>>> to DEPLOYING >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >>>> SCHEDULED >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >>>> DEPLOYING >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched >>>> to RUNNING >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >>>> RUNNING >>>> 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce at >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >>>> CANCELED >>>> 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) switched >>>> to FAILED >>>> java.lang.Exception >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>> at java.lang.Thread.run(Thread.java:745) >>>> Caused by: java.lang.NullPointerException >>>> at >>>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) >>>> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) >>>> at >>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) >>>> at >>>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) >>>> at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) >>>> at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632) >>>> >>>> >>>> Any ideas on what could cause this behaviour? >>>> >>>> Best, >>>> Mihail >>> >>> > |
I think its this one https://issues.apache.org/jira/browse/KAFKA-824 On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <[hidden email]> wrote: I know this has been fixed already but, out of curiosity, could you |
Thanks! I've linked the issue in JIRA.
On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger <[hidden email]> wrote: > I think its this one https://issues.apache.org/jira/browse/KAFKA-824 > > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <[hidden email]> wrote: >> >> I know this has been fixed already but, out of curiosity, could you >> point me to the Kafka JIRA issue for this >> bug? From the Flink issue it looks like this is a Zookeeper version >> mismatch. >> >> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <[hidden email]> >> wrote: >> > Hi Gyula, >> > >> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the >> > "release-0.10" branch to Apache's maven snapshot repository. >> > >> > >> > I don't think Mihail's code will run when he's compiling it against >> > 1.0-SNAPSHOT. >> > >> > >> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <[hidden email]> wrote: >> >> >> >> Hi, >> >> >> >> I think Robert meant to write setting the connector dependency to >> >> 1.0-SNAPSHOT. >> >> >> >> Cheers, >> >> Gyula >> >> >> >> Robert Metzger <[hidden email]> ezt írta (időpont: 2015. dec. 1., >> >> K, >> >> 17:10): >> >>> >> >>> Hi Mihail, >> >>> >> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink for this >> >>> as >> >>> well: https://issues.apache.org/jira/browse/FLINK-3067 >> >>> >> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will contain >> >>> a >> >>> fix. >> >>> >> >>> Since the kafka connector is not contained in the flink binary, you >> >>> can >> >>> just set the version in your maven pom file to 0.10-SNAPSHOT. Maven >> >>> will >> >>> then download the code planned for the 0.10-SNAPSHOT release. >> >>> >> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail >> >>> <[hidden email]> >> >>> wrote: >> >>>> >> >>>> Hi, >> >>>> >> >>>> we get the following NullPointerException after ~50 minutes when >> >>>> running >> >>>> a streaming job with windowing and state that reads data from Kafka >> >>>> and >> >>>> writes the result to local FS. >> >>>> There are around 170 million messages to be processed, Flink 0.10.1 >> >>>> stops at ~8 million. >> >>>> Flink runs locally, started with the "start-cluster-streaming.sh" >> >>>> script. >> >>>> >> >>>> 12/01/2015 15:06:24 Job execution switched to status RUNNING. >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) >> >>>> switched >> >>>> to SCHEDULED >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) >> >>>> switched >> >>>> to DEPLOYING >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >> >>>> SCHEDULED >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >> >>>> DEPLOYING >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) >> >>>> switched >> >>>> to RUNNING >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >> >>>> RUNNING >> >>>> 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce at >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >> >>>> CANCELED >> >>>> 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) >> >>>> switched >> >>>> to FAILED >> >>>> java.lang.Exception >> >>>> at >> >>>> >> >>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >> >>>> at >> >>>> >> >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397) >> >>>> at >> >>>> >> >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >> >>>> at >> >>>> >> >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >> >>>> at >> >>>> >> >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >> >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> >>>> at java.lang.Thread.run(Thread.java:745) >> >>>> Caused by: java.lang.NullPointerException >> >>>> at >> >>>> >> >>>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) >> >>>> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) >> >>>> at >> >>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) >> >>>> at >> >>>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) >> >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) >> >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) >> >>>> at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) >> >>>> at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) >> >>>> at >> >>>> >> >>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) >> >>>> at >> >>>> >> >>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) >> >>>> at >> >>>> >> >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632) >> >>>> >> >>>> >> >>>> Any ideas on what could cause this behaviour? >> >>>> >> >>>> Best, >> >>>> Mihail >> >>> >> >>> >> > > > |
In reply to this post by Radu Tudoran
Hi Radu, both emails reached the mailing list :) You can not reference to DataSets or DataStreams from inside user defined functions. Both are just abstractions for a data set or stream, so the elements are not really inside the set. We don't have any support for mixing the DataSet and DataStream API. For your use case, I would recommend you to use a RichFlatMapFunction and in the open() call read the text file. On Tue, Dec 1, 2015 at 5:03 PM, Radu Tudoran <[hidden email]> wrote:
|
In reply to this post by Maximilian Michels
Thank you, Robert! The issue with Kafka is now solved with the 0.10-SNAPSHOT dependency. We have run into an OutOfMemory exception though, which appears to be related to the state. As my colleague, Javier Lopez, mentioned in a previous thread, state handling is crucial for our use case. And as the jobs are intended to run for months, stability plays an important role in choosing a stream processing framework.12/02/2015 10:03:53 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to FAILED java.lang.OutOfMemoryError: Java heap space at java.util.HashMap.resize(HashMap.java:703) at java.util.HashMap.putVal(HashMap.java:662) at java.util.HashMap.put(HashMap.java:611) at org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98) at de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121) at de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108) at org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196) at org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50) at org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) 2015-12-01 17:42 GMT+01:00 Maximilian Michels <[hidden email]>: Thanks! I've linked the issue in JIRA. |
Its good news that the issue has been resolved. Regarding the OOM, did you start Flink in the streaming mode? On Wed, Dec 2, 2015 at 10:18 AM, Vieru, Mihail <[hidden email]> wrote:
|
Yes, with the "start-cluster-streaming.sh" script. If the TaskManager gets 5GB of heap it manages to process ~100 million messages and then throws the above OOM.If it gets only 500MB it manages to process ~8 million and a somewhat misleading exception is thrown: 12/01/2015 19:14:07 Source: Custom Source -> Map -> Map(1/1) switched to FAILED java.lang.Exception: Java heap space at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.OutOfMemoryError: Java heap space at org.json.simple.parser.Yylex.<init>(Yylex.java:231) at org.json.simple.parser.JSONParser.<init>(JSONParser.java:34) at de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:70) at de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:65) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$SourceOutput.collect(SourceStreamTask.java:97) at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:92) at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:450) 2015-12-02 10:45 GMT+01:00 Robert Metzger <[hidden email]>:
|
Hi Mihail,
could you please give some information about the number of keys that you are expecting in the data and how big the elements are that you are processing in the window. Also, are there any other operations that could be taxing on Memory. I think the different exception you see for 500MB mem size is just because Java notices that it ran out of memory at a different part in the program. Cheers, Aljoscha > On 02 Dec 2015, at 10:57, Vieru, Mihail <[hidden email]> wrote: > > Yes, with the "start-cluster-streaming.sh" script. > If the TaskManager gets 5GB of heap it manages to process ~100 million messages and then throws the above OOM. > If it gets only 500MB it manages to process ~8 million and a somewhat misleading exception is thrown: > > 12/01/2015 19:14:07 Source: Custom Source -> Map -> Map(1/1) switched to FAILED > java.lang.Exception: Java heap space > at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399) > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.OutOfMemoryError: Java heap space > at org.json.simple.parser.Yylex.<init>(Yylex.java:231) > at org.json.simple.parser.JSONParser.<init>(JSONParser.java:34) > at de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:70) > at de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:65) > at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$SourceOutput.collect(SourceStreamTask.java:97) > at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:92) > at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:450) > > > > > 2015-12-02 10:45 GMT+01:00 Robert Metzger <[hidden email]>: > Its good news that the issue has been resolved. > > Regarding the OOM, did you start Flink in the streaming mode? > > On Wed, Dec 2, 2015 at 10:18 AM, Vieru, Mihail <[hidden email]> wrote: > Thank you, Robert! The issue with Kafka is now solved with the 0.10-SNAPSHOT dependency. > > We have run into an OutOfMemory exception though, which appears to be related to the state. As my colleague, Javier Lopez, mentioned in a previous thread, state handling is crucial for our use case. And as the jobs are intended to run for months, stability plays an important role in choosing a stream processing framework. > > 12/02/2015 10:03:53 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to FAILED > java.lang.OutOfMemoryError: Java heap space > at java.util.HashMap.resize(HashMap.java:703) > at java.util.HashMap.putVal(HashMap.java:662) > at java.util.HashMap.put(HashMap.java:611) > at org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98) > at de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121) > at de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108) > at org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196) > at org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50) > at org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210) > at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166) > at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > > > > > 2015-12-01 17:42 GMT+01:00 Maximilian Michels <[hidden email]>: > Thanks! I've linked the issue in JIRA. > > On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger <[hidden email]> wrote: > > I think its this one https://issues.apache.org/jira/browse/KAFKA-824 > > > > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <[hidden email]> wrote: > >> > >> I know this has been fixed already but, out of curiosity, could you > >> point me to the Kafka JIRA issue for this > >> bug? From the Flink issue it looks like this is a Zookeeper version > >> mismatch. > >> > >> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <[hidden email]> > >> wrote: > >> > Hi Gyula, > >> > > >> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the > >> > "release-0.10" branch to Apache's maven snapshot repository. > >> > > >> > > >> > I don't think Mihail's code will run when he's compiling it against > >> > 1.0-SNAPSHOT. > >> > > >> > > >> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <[hidden email]> wrote: > >> >> > >> >> Hi, > >> >> > >> >> I think Robert meant to write setting the connector dependency to > >> >> 1.0-SNAPSHOT. > >> >> > >> >> Cheers, > >> >> Gyula > >> >> > >> >> Robert Metzger <[hidden email]> ezt írta (időpont: 2015. dec. 1., > >> >> K, > >> >> 17:10): > >> >>> > >> >>> Hi Mihail, > >> >>> > >> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink for this > >> >>> as > >> >>> well: https://issues.apache.org/jira/browse/FLINK-3067 > >> >>> > >> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will contain > >> >>> a > >> >>> fix. > >> >>> > >> >>> Since the kafka connector is not contained in the flink binary, you > >> >>> can > >> >>> just set the version in your maven pom file to 0.10-SNAPSHOT. Maven > >> >>> will > >> >>> then download the code planned for the 0.10-SNAPSHOT release. > >> >>> > >> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail > >> >>> <[hidden email]> > >> >>> wrote: > >> >>>> > >> >>>> Hi, > >> >>>> > >> >>>> we get the following NullPointerException after ~50 minutes when > >> >>>> running > >> >>>> a streaming job with windowing and state that reads data from Kafka > >> >>>> and > >> >>>> writes the result to local FS. > >> >>>> There are around 170 million messages to be processed, Flink 0.10.1 > >> >>>> stops at ~8 million. > >> >>>> Flink runs locally, started with the "start-cluster-streaming.sh" > >> >>>> script. > >> >>>> > >> >>>> 12/01/2015 15:06:24 Job execution switched to status RUNNING. > >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) > >> >>>> switched > >> >>>> to SCHEDULED > >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) > >> >>>> switched > >> >>>> to DEPLOYING > >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > >> >>>> SCHEDULED > >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > >> >>>> DEPLOYING > >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) > >> >>>> switched > >> >>>> to RUNNING > >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > >> >>>> RUNNING > >> >>>> 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce at > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > >> >>>> CANCELED > >> >>>> 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) > >> >>>> switched > >> >>>> to FAILED > >> >>>> java.lang.Exception > >> >>>> at > >> >>>> > >> >>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) > >> >>>> at > >> >>>> > >> >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397) > >> >>>> at > >> >>>> > >> >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) > >> >>>> at > >> >>>> > >> >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) > >> >>>> at > >> >>>> > >> >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) > >> >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > >> >>>> at java.lang.Thread.run(Thread.java:745) > >> >>>> Caused by: java.lang.NullPointerException > >> >>>> at > >> >>>> > >> >>>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) > >> >>>> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) > >> >>>> at > >> >>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) > >> >>>> at > >> >>>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) > >> >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) > >> >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) > >> >>>> at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) > >> >>>> at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) > >> >>>> at > >> >>>> > >> >>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) > >> >>>> at > >> >>>> > >> >>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) > >> >>>> at > >> >>>> > >> >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632) > >> >>>> > >> >>>> > >> >>>> Any ideas on what could cause this behaviour? > >> >>>> > >> >>>> Best, > >> >>>> Mihail > >> >>> > >> >>> > >> > > > > > > > > |
Hi Aljoscha, we have no upper bound for the number of expected keys. The max size for an element is 1 KB.There
are 2 Maps, a KeyBy, a timeWindow, a Reduce and a writeAsText operators
in the job. In the first Map we parse the contained JSON object in each
element and forward it as a Flink Tuple. In the Reduce we update the state for each key. That's about it. 2015-12-02 11:09 GMT+01:00 Aljoscha Krettek <[hidden email]>: Hi Mihail, |
Hi,
I am working on a use case that involves storing state for billions of keys. For this we use a MySql state backend that will write each key-value state to MySql server so it will only hold a limited set of key-value pairs on heap while maintaining the processing guarantees. This will keep our streaming job from running out of memory as most of the state is off heap. I am not sure if this is relevant to your use case but if the state size grows indefinitely you might want to give it a try. I will write a detailed guide in some days but if you want to get started check this one out: There are some pending improvements that I will commit in the next days that will increase the performance of the MySql adapter Let me know if you are interested in this! Cheers, Gyula Vieru, Mihail <[hidden email]> ezt írta (időpont: 2015. dec. 2., Sze, 11:26):
|
Mihail! The Flink windows are currently in-memory only. There are plans to relax that, but for the time being, having enough memory in the cluster is important. @Gyula: I think window state is currently also limited when using the SqlStateBackend, by the size of a row in the database (because windows are not key/value state currently) Here are some simple rules-of-thumb to work with: 1) For windows, the number of expected keys can be without bound. It is important to have a rough upper bound for the number of "active keys at a certain time". For example, if you have your time windows (let's say by 10 minutes or so), it only matters how many keys you have within each 10 minute interval. Those define how much memory you need. 2) If you work with the "OperatorState" abstraction, then you need to think about cleanup a bit. The OperatorState keeps state currently for as long until you set the state for the key to "null". This manual state is explicitly designed to allow you to keep state across windows and across very long time. On the flip side, you need to manage the amount of state you store, by releasing state for keys. 3) If a certain key space grows infinite, you should "scope the state by time". A pragmatic solution for that is to define a session window: - The session length defines after what inactivity the state is cleaned (let's say 1h session length or so) - The trigger implements this session (there are a few mails on this list already that explain how to do this) and take care of evaluating on every element. - A count(1) evictor makes sure only one element is ever stored Greetings, Stephan On Wed, Dec 2, 2015 at 11:37 AM, Gyula Fóra <[hidden email]> wrote:
|
Hi Gyula, Hi Stephan, thank you for your replies. We need a state which grows indefinitely for the following use case. An event is created when a customer places an order. Another event is created when the order is sent. These events typically occur within days. We need to catch the cases when the said events occur over a specified time period to raise an alarm. So having a window of a couple of days is not feasible. Thus we need the state. I believe having a different state backend would circumvent the OOM issue. We were thinking of Redis for performance reasons. MySQL might do as well, if it doesn't slow down the processing too much. Are there limitations for SqlStateBackend when working with state only? When would the window state limitation occur? Cheers, Mihail 2015-12-02 13:38 GMT+01:00 Stephan Ewen <[hidden email]>:
|
Hi Mihail, not sure if I correctly got your requirements, but you can define windows on a keyed stream. This basically means that you partition the stream, for example by order-id, and compute windows over the keyed stream. This will create one (or more, depending on the window type) window for each key. All windows are independent from each other and can be evaluated and closed without interfering with other windows. Windows for keys that have been closed do not allocate any memory until another element with the same key arrives. 2015-12-02 18:11 GMT+01:00 Vieru, Mihail <[hidden email]>:
|
In reply to this post by Vieru, Mihail
Hi Mihail! Do I understand you correctly that the use case is to raise an alarm if an order has not been processed within a certain time period (certain number of days) ? If that is the case, the use case is actually perfect for a special form of session windows that monitor such timeouts. I have prototyped a sample application for a different use case, but it should fit your use case as well: In that example, the timeout is 5 seconds, but there is no reason why the timeout could not be multiple days. Windows may be very long - no problem. Unlike many other streaming systems, each key has an individual window, so one key's session window may start at one point in time, and the other key's session window at a very different point. One window may finish within in a few hours (fast processed order), one window see the timout after three days (order that was not processed in time). Greetings, Stephan On Wed, Dec 2, 2015 at 6:11 PM, Vieru, Mihail <[hidden email]> wrote:
|
A bit of extra information on the example where I posted the link: The example checks whether two events follow each other within a certain time: - The first event in the example is called "compute.instance.create.start" (in your case, it would be the event that an order was placed) - The second event is called "trove.instance.create" - (in your case that the package was sent) What the timeout window does is the following: - It triggers either on the second event, or after the timeout is expired - The window function checks if the last event was the correct second event. If yes, it sends a Result(OK), if not it sends a Result(TIMEOUT). Hope that this helps you build your application! On Wed, Dec 2, 2015 at 6:25 PM, Stephan Ewen <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |