Understanding timestamp and watermark assignment errors

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Understanding timestamp and watermark assignment errors

Andrew Roberts
Hello,

I’m trying to convert some of our larger stateful computations into something that aligns more with the Flink windowing framework, and particularly, start using “event time” instead of “ingest time” as a time characteristics.

My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka source), and while my data is generally time-ordered, there are some upstream races, so I’m attempting to assign timestamps and watermarks using BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When I assign timestamps directly in the Kafka sources (I’m also connecting two Kafka streams here) using FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my extractor has to do a bunch of “faking” because not every record that is produced will have a valid timestamp - for example, a record that can’t be parsed won’t.

When I assign timestamps downstream, after filtering the stream down to just records that are going to be windowed, I see errors in my Flink job:

java.io.IOException: Exception while applying AggregateFunction in aggregating state
        at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
        at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
        ... 6 more

I am calling aggregate() on my windows, but otherwise I see very little information that I can use to dig into this issue. Can anyone give me any insight into what is going wrong here? I’d much prefer assigning timestamps after filtering, rather than in the Kafka source, because I can filter down to only records that I know will have timestamps.

When experimenting with the lateness in my timestamp/watermark assigner, I also saw a similarly opaque exception:

java.lang.RuntimeException: Exception occurred while processing valve output watermark:
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 183
        at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70)
        at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469)
        at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420)
        at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405)
        at org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110)
        …


Any tips?


Thanks,

Andrew
--
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*
Reply | Threaded
Open this post in threaded view
|

Re: Understanding timestamp and watermark assignment errors

Konstantin Knauf-2
Hi Andrew,

which Flink version do you use? This sounds a bit like https://issues.apache.org/jira/browse/FLINK-8836.

Cheers,

Konstantin

On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts <[hidden email]> wrote:
Hello,

I’m trying to convert some of our larger stateful computations into something that aligns more with the Flink windowing framework, and particularly, start using “event time” instead of “ingest time” as a time characteristics.

My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka source), and while my data is generally time-ordered, there are some upstream races, so I’m attempting to assign timestamps and watermarks using BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When I assign timestamps directly in the Kafka sources (I’m also connecting two Kafka streams here) using FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my extractor has to do a bunch of “faking” because not every record that is produced will have a valid timestamp - for example, a record that can’t be parsed won’t.

When I assign timestamps downstream, after filtering the stream down to just records that are going to be windowed, I see errors in my Flink job:

java.io.IOException: Exception while applying AggregateFunction in aggregating state
        at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
        at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
        ... 6 more

I am calling aggregate() on my windows, but otherwise I see very little information that I can use to dig into this issue. Can anyone give me any insight into what is going wrong here? I’d much prefer assigning timestamps after filtering, rather than in the Kafka source, because I can filter down to only records that I know will have timestamps.

When experimenting with the lateness in my timestamp/watermark assigner, I also saw a similarly opaque exception:

java.lang.RuntimeException: Exception occurred while processing valve output watermark:
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 183
        at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70)
        at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469)
        at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420)
        at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405)
        at org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110)
        …


Any tips?


Thanks,

Andrew
--
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: Understanding timestamp and watermark assignment errors

Andrew Roberts
This is with flink 1.6.4. I was on 1.6.2 and saw Kryo issues in many more circumstances. 

On Mar 8, 2019, at 4:25 PM, Konstantin Knauf <[hidden email]> wrote:

Hi Andrew,

which Flink version do you use? This sounds a bit like https://issues.apache.org/jira/browse/FLINK-8836.

Cheers,

Konstantin

On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts <[hidden email]> wrote:
Hello,

I’m trying to convert some of our larger stateful computations into something that aligns more with the Flink windowing framework, and particularly, start using “event time” instead of “ingest time” as a time characteristics.

My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka source), and while my data is generally time-ordered, there are some upstream races, so I’m attempting to assign timestamps and watermarks using BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When I assign timestamps directly in the Kafka sources (I’m also connecting two Kafka streams here) using FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my extractor has to do a bunch of “faking” because not every record that is produced will have a valid timestamp - for example, a record that can’t be parsed won’t.

When I assign timestamps downstream, after filtering the stream down to just records that are going to be windowed, I see errors in my Flink job:

java.io.IOException: Exception while applying AggregateFunction in aggregating state
        at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
        at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
        ... 6 more

I am calling aggregate() on my windows, but otherwise I see very little information that I can use to dig into this issue. Can anyone give me any insight into what is going wrong here? I’d much prefer assigning timestamps after filtering, rather than in the Kafka source, because I can filter down to only records that I know will have timestamps.

When experimenting with the lateness in my timestamp/watermark assigner, I also saw a similarly opaque exception:

java.lang.RuntimeException: Exception occurred while processing valve output watermark:
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 183
        at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70)
        at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469)
        at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420)
        at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405)
        at org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110)
        …


Any tips?


Thanks,

Andrew
--
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   

*Confidentiality Notice: The information contained in this e-mail and any
attachments may be confidential. If you are not an intended recipient, you
are hereby notified that any dissemination, distribution or copying of this
e-mail is strictly prohibited. If you have received this e-mail in error,
please notify the sender and permanently delete the e-mail and any
attachments immediately. You should not retain, copy or use this e-mail or
any attachment for any purpose, nor disclose all or any part of the
contents to any other person. Thank you.*
Reply | Threaded
Open this post in threaded view
|

Re: Understanding timestamp and watermark assignment errors

Konstantin Knauf-2
Hi Andrew,

generally, this looks like a concurrency problem.

Are you using asynchronous checkpointing? If so, could you check if this issue also occurs with synchronous checkpointing. There have been reports recently, that there might be a problem with some Kryo types.

Can you set the logging level to DEBUG? We have some checks enabled in that case in the Kryo serializer to verify that the KryoSerializer is really concurrently accessed.

Are you using any Scala types, in particular collections or "Try"?

Cheers,

Konstantin

On Sat, Mar 9, 2019 at 6:22 AM Andrew Roberts <[hidden email]> wrote:
This is with flink 1.6.4. I was on 1.6.2 and saw Kryo issues in many more circumstances. 

On Mar 8, 2019, at 4:25 PM, Konstantin Knauf <[hidden email]> wrote:

Hi Andrew,

which Flink version do you use? This sounds a bit like https://issues.apache.org/jira/browse/FLINK-8836.

Cheers,

Konstantin

On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts <[hidden email]> wrote:
Hello,

I’m trying to convert some of our larger stateful computations into something that aligns more with the Flink windowing framework, and particularly, start using “event time” instead of “ingest time” as a time characteristics.

My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka source), and while my data is generally time-ordered, there are some upstream races, so I’m attempting to assign timestamps and watermarks using BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When I assign timestamps directly in the Kafka sources (I’m also connecting two Kafka streams here) using FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my extractor has to do a bunch of “faking” because not every record that is produced will have a valid timestamp - for example, a record that can’t be parsed won’t.

When I assign timestamps downstream, after filtering the stream down to just records that are going to be windowed, I see errors in my Flink job:

java.io.IOException: Exception while applying AggregateFunction in aggregating state
        at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
        at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
        ... 6 more

I am calling aggregate() on my windows, but otherwise I see very little information that I can use to dig into this issue. Can anyone give me any insight into what is going wrong here? I’d much prefer assigning timestamps after filtering, rather than in the Kafka source, because I can filter down to only records that I know will have timestamps.

When experimenting with the lateness in my timestamp/watermark assigner, I also saw a similarly opaque exception:

java.lang.RuntimeException: Exception occurred while processing valve output watermark:
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 183
        at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70)
        at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469)
        at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420)
        at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405)
        at org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110)
        …


Any tips?


Thanks,

Andrew
--
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   

*Confidentiality Notice: The information contained in this e-mail and any
attachments may be confidential. If you are not an intended recipient, you
are hereby notified that any dissemination, distribution or copying of this
e-mail is strictly prohibited. If you have received this e-mail in error,
please notify the sender and permanently delete the e-mail and any
attachments immediately. You should not retain, copy or use this e-mail or
any attachment for any purpose, nor disclose all or any part of the
contents to any other person. Thank you.*


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: Understanding timestamp and watermark assignment errors

Stefan Richter-4
Hi,

I think this looks like the same problem as in this issue: https://issues.apache.org/jira/browse/FLINK-11420

Best,
Stefan


On 13. Mar 2019, at 09:41, Konstantin Knauf <[hidden email]> wrote:

Hi Andrew,

generally, this looks like a concurrency problem.

Are you using asynchronous checkpointing? If so, could you check if this issue also occurs with synchronous checkpointing. There have been reports recently, that there might be a problem with some Kryo types.

Can you set the logging level to DEBUG? We have some checks enabled in that case in the Kryo serializer to verify that the KryoSerializer is really concurrently accessed.

Are you using any Scala types, in particular collections or "Try"?

Cheers,

Konstantin

On Sat, Mar 9, 2019 at 6:22 AM Andrew Roberts <[hidden email]> wrote:
This is with flink 1.6.4. I was on 1.6.2 and saw Kryo issues in many more circumstances. 

On Mar 8, 2019, at 4:25 PM, Konstantin Knauf <[hidden email]> wrote:

Hi Andrew,

which Flink version do you use? This sounds a bit like https://issues.apache.org/jira/browse/FLINK-8836.

Cheers,

Konstantin

On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts <[hidden email]> wrote:
Hello,

I’m trying to convert some of our larger stateful computations into something that aligns more with the Flink windowing framework, and particularly, start using “event time” instead of “ingest time” as a time characteristics.

My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka source), and while my data is generally time-ordered, there are some upstream races, so I’m attempting to assign timestamps and watermarks using BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When I assign timestamps directly in the Kafka sources (I’m also connecting two Kafka streams here) using FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my extractor has to do a bunch of “faking” because not every record that is produced will have a valid timestamp - for example, a record that can’t be parsed won’t.

When I assign timestamps downstream, after filtering the stream down to just records that are going to be windowed, I see errors in my Flink job:

java.io.IOException: Exception while applying AggregateFunction in aggregating state
        at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
        at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
        ... 6 more

I am calling aggregate() on my windows, but otherwise I see very little information that I can use to dig into this issue. Can anyone give me any insight into what is going wrong here? I’d much prefer assigning timestamps after filtering, rather than in the Kafka source, because I can filter down to only records that I know will have timestamps.

When experimenting with the lateness in my timestamp/watermark assigner, I also saw a similarly opaque exception:

java.lang.RuntimeException: Exception occurred while processing valve output watermark:
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 183
        at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70)
        at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469)
        at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420)
        at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405)
        at org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110)
        …


Any tips?


Thanks,

Andrew
--
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


--
Konstantin Knauf | Solutions Architect
+49 160 91394525

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   

*Confidentiality Notice: The information contained in this e-mail and any
attachments may be confidential. If you are not an intended recipient, you
are hereby notified that any dissemination, distribution or copying of this
e-mail is strictly prohibited. If you have received this e-mail in error,
please notify the sender and permanently delete the e-mail and any
attachments immediately. You should not retain, copy or use this e-mail or
any attachment for any purpose, nor disclose all or any part of the
contents to any other person. Thank you.*


--
Konstantin Knauf | Solutions Architect
+49 160 91394525

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen