Hello,
I’m trying to convert some of our larger stateful computations into something that aligns more with the Flink windowing framework, and particularly, start using “event time” instead of “ingest time” as a time characteristics. My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka source), and while my data is generally time-ordered, there are some upstream races, so I’m attempting to assign timestamps and watermarks using BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When I assign timestamps directly in the Kafka sources (I’m also connecting two Kafka streams here) using FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my extractor has to do a bunch of “faking” because not every record that is produced will have a valid timestamp - for example, a record that can’t be parsed won’t. When I assign timestamps downstream, after filtering the stream down to just records that are going to be windowed, I see errors in my Flink job: java.io.IOException: Exception while applying AggregateFunction in aggregating state at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217) at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341) at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105) ... 6 more I am calling aggregate() on my windows, but otherwise I see very little information that I can use to dig into this issue. Can anyone give me any insight into what is going wrong here? I’d much prefer assigning timestamps after filtering, rather than in the Kafka source, because I can filter down to only records that I know will have timestamps. When experimenting with the lateness in my timestamp/watermark assigner, I also saw a similarly opaque exception: java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ArrayIndexOutOfBoundsException: 183 at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302) at com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70) at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469) at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420) at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405) at org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110) … Any tips? Thanks, Andrew -- *Confidentiality Notice: The information contained in this e-mail and any attachments may be confidential. If you are not an intended recipient, you are hereby notified that any dissemination, distribution or copying of this e-mail is strictly prohibited. If you have received this e-mail in error, please notify the sender and permanently delete the e-mail and any attachments immediately. You should not retain, copy or use this e-mail or any attachment for any purpose, nor disclose all or any part of the contents to any other person. Thank you.* |
Hi Andrew, which Flink version do you use? This sounds a bit like https://issues.apache.org/jira/browse/FLINK-8836. Cheers, Konstantin On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts <[hidden email]> wrote: Hello, -- Konstantin Knauf | Solutions Architect +49 160 91394525 Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Data Artisans GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen |
This is with flink 1.6.4. I was on 1.6.2 and saw Kryo issues in many more circumstances.
*Confidentiality Notice: The information contained in this e-mail and any attachments may be confidential. If you are not an intended recipient, you are hereby notified that any dissemination, distribution or copying of this e-mail is strictly prohibited. If you have received this e-mail in error, please notify the sender and permanently delete the e-mail and any attachments immediately. You should not retain, copy or use this e-mail or any attachment for any purpose, nor disclose all or any part of the contents to any other person. Thank you.* |
Hi Andrew, generally, this looks like a concurrency problem. Are you using asynchronous checkpointing? If so, could you check if this issue also occurs with synchronous checkpointing. There have been reports recently, that there might be a problem with some Kryo types. Can you set the logging level to DEBUG? We have some checks enabled in that case in the Kryo serializer to verify that the KryoSerializer is really concurrently accessed. Are you using any Scala types, in particular collections or "Try"? Cheers, Konstantin On Sat, Mar 9, 2019 at 6:22 AM Andrew Roberts <[hidden email]> wrote:
-- Konstantin Knauf | Solutions Architect +49 160 91394525 Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Data Artisans GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen |
Hi, I think this looks like the same problem as in this issue: https://issues.apache.org/jira/browse/FLINK-11420 Best, Stefan
|
Free forum by Nabble | Edit this page |