Thank you, Robert! The issue with Kafka is now solved with the 0.10-SNAPSHOT dependency.We have run into an OutOfMemory exception though, which appears to be related to the state. As my colleague, Javier Lopez, mentioned in a previous thread, state handling is crucial for our use case. And as the jobs are intended to run for months, stability plays an important role in choosing a stream processing framework.
12/02/2015 10:03:53 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to FAILED
java.lang.OutOfMemoryError: Java heap space
at java.util.HashMap.resize(HashMap.java:703)
at java.util.HashMap.putVal(HashMap.java:662)
at java.util.HashMap.put(HashMap.java:611)
at org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
at de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121)
at de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108)
at org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196)
at org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50)
at org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)2015-12-01 17:42 GMT+01:00 Maximilian Michels <[hidden email]>:Thanks! I've linked the issue in JIRA.
On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger <[hidden email]> wrote:
> I think its this one https://issues.apache.org/jira/browse/KAFKA-824
>
> On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <[hidden email]> wrote:
>>
>> I know this has been fixed already but, out of curiosity, could you
>> point me to the Kafka JIRA issue for this
>> bug? From the Flink issue it looks like this is a Zookeeper version
>> mismatch.
>>
>> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <[hidden email]>
>> wrote:
>> > Hi Gyula,
>> >
>> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the
>> > "release-0.10" branch to Apache's maven snapshot repository.
>> >
>> >
>> > I don't think Mihail's code will run when he's compiling it against
>> > 1.0-SNAPSHOT.
>> >
>> >
>> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <[hidden email]> wrote:
>> >>
>> >> Hi,
>> >>
>> >> I think Robert meant to write setting the connector dependency to
>> >> 1.0-SNAPSHOT.
>> >>
>> >> Cheers,
>> >> Gyula
>> >>
>> >> Robert Metzger <[hidden email]> ezt írta (időpont: 2015. dec. 1.,
>> >> K,
>> >> 17:10):
>> >>>
>> >>> Hi Mihail,
>> >>>
>> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink for this
>> >>> as
>> >>> well: https://issues.apache.org/jira/browse/FLINK-3067
>> >>>
>> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will contain
>> >>> a
>> >>> fix.
>> >>>
>> >>> Since the kafka connector is not contained in the flink binary, you
>> >>> can
>> >>> just set the version in your maven pom file to 0.10-SNAPSHOT. Maven
>> >>> will
>> >>> then download the code planned for the 0.10-SNAPSHOT release.
>> >>>
>> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail
>> >>> <[hidden email]>
>> >>> wrote:
>> >>>>
>> >>>> Hi,
>> >>>>
>> >>>> we get the following NullPointerException after ~50 minutes when
>> >>>> running
>> >>>> a streaming job with windowing and state that reads data from Kafka
>> >>>> and
>> >>>> writes the result to local FS.
>> >>>> There are around 170 million messages to be processed, Flink 0.10.1
>> >>>> stops at ~8 million.
>> >>>> Flink runs locally, started with the "start-cluster-streaming.sh"
>> >>>> script.
>> >>>>
>> >>>> 12/01/2015 15:06:24 Job execution switched to status RUNNING.
>> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1)
>> >>>> switched
>> >>>> to SCHEDULED
>> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1)
>> >>>> switched
>> >>>> to DEPLOYING
>> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at
>> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>> >>>> SCHEDULED
>> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at
>> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>> >>>> DEPLOYING
>> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1)
>> >>>> switched
>> >>>> to RUNNING
>> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at
>> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>> >>>> RUNNING
>> >>>> 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce at
>> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>> >>>> CANCELED
>> >>>> 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1)
>> >>>> switched
>> >>>> to FAILED
>> >>>> java.lang.Exception
>> >>>> at
>> >>>>
>> >>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>> >>>> at
>> >>>>
>> >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>> >>>> at
>> >>>>
>> >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>> >>>> at
>> >>>>
>> >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>> >>>> at
>> >>>>
>> >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>> >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> >>>> at java.lang.Thread.run(Thread.java:745)
>> >>>> Caused by: java.lang.NullPointerException
>> >>>> at
>> >>>>
>> >>>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>> >>>> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>> >>>> at
>> >>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>> >>>> at
>> >>>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>> >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>> >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>> >>>> at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>> >>>> at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>> >>>> at
>> >>>>
>> >>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>> >>>> at
>> >>>>
>> >>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>> >>>> at
>> >>>>
>> >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>> >>>>
>> >>>>
>> >>>> Any ideas on what could cause this behaviour?
>> >>>>
>> >>>> Best,
>> >>>> Mihail
>> >>>
>> >>>
>> >
>
>
Free forum by Nabble | Edit this page |