Posted by
Timo Walther on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Fwd-Re-Updating-ValueState-not-working-in-hosted-Kinesis-tp32905.html
Hi Chris,
[forwarding the private discussion to the mailing list again]
first of all, are you sure that your Sensor class is either a top-level
class or a static inner class. Because it seems there is way more stuff
in it (maybe included by accident transitively?). Such as:
org.apache.logging.log4j.core.layout.AbstractCsvLayout
> Serialization trace:
> classes (sun.misc.Launcher$AppClassLoader)
> classloader (java.security.ProtectionDomain)
> cachedPDs (javax.security.auth.SubjectDomainCombiner)
> combiner (java.security.AccessControlContext)
> acc (sun.security.ssl.SSLSocketImpl)
> connection (org.postgresql.core.PGStream)
> pgStream (org.postgresql.core.v3.QueryExecutorImpl)
> transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
> commitQuery (org.postgresql.jdbc.PgConnection)
> connection (org.postgresql.jdbc.PgResultSet)
> val$rs
When declaring state you can use
`org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class<T>)` to
check if your state is a POJO type.
Regards,
Timo
-------- Forwarded Message --------
Subject: Re: Updating ValueState not working in hosted Kinesis
Date: Wed, 19 Feb 2020 12:02:16 +0000
From: Chris Stevens <
[hidden email]>
To: Timo Walther <
[hidden email]>
Hi Timo,
Thanks for your reply. This makes sense to me, how do I treat something
as a POJO instead of a generic serialized BB type? Sorry relatively new
to Java and Flink.
This is my full class def:
package sensingfeeling.models;
import java.io.Serializable;
public class Sensor implements Serializable {
private static final long serialVersionUID = 8582433437601788991L;
public String sensorId;
public String companyId;
public String label;
// public Date createdAt;
// public Date updatedAt;
public Integer uncomfortableFaceLimit;
public Boolean online;
public String capabilityId;
// public Date lastOnlineAt;
// public Date lastOfflineAt;
public Integer onlineVersionNumber;
public int status;
@Override
public String toString(){
return this.sensorId + " - " + this.label;
}
}
Super simple really.
I'm not trying to upgrade anything as far as I know. Just making an
operator state aware.
Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595
On Wed, 19 Feb 2020 at 11:55, Timo Walther <
[hidden email]
<mailto:
[hidden email]>> wrote:
Hi Chris,
it seems there are field serialized into state that actually don't
belong there. You should aim to treat Sensor as a POJO instead of a
Kryo
generic serialized black-box type.
Furthermore, it seems that field such as
"org.apache.logging.log4j.core.layout.AbstractCsvLayout" should not be
state. Is there a "transient" keyword missing?
Are you trying to upgrade your job or the Flink version?
Regards,
Timo
On 18.02.20 18:59, Chris Stevens wrote:
> Hi there,
>
> I'm trying to update state in one of my applications hosted in
Kinesis
> Data Analytics.
>
> private transient ValueState<Sensor> sensorState;
> using sensorState.update(sensor);
>
> Get error:
>
> An error occurred: org.apache.flink.util.FlinkRuntimeException:
Error
> while adding data to RocksDB
> at
>
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
> at
>
org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
> at
>
sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97)
> at
>
sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48)
> at
>
org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460)
> at
>
org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777)
> at
>
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
> at
>
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
> at
>
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
> at
>
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
> at
>
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
> at
>
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
> at
>
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
> at
> org.apache.flink.streaming.runtime.io
<
http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
> at
>
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> at
>
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> at
> org.apache.flink.streaming.runtime.io
<
http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> at
>
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at
>
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.lang.IllegalArgumentException: Unable to create serializer
> "com.esotericsoftware.kryo.serializers.FieldSerializer" for class:
> org.apache.logging.log4j.core.layout.AbstractCsvLayout
> Serialization trace:
> classes (sun.misc.Launcher$AppClassLoader)
> classloader (java.security.ProtectionDomain)
> cachedPDs (javax.security.auth.SubjectDomainCombiner)
> combiner (java.security.AccessControlContext)
> acc (sun.security.ssl.SSLSocketImpl)
> connection (org.postgresql.core.PGStream)
> pgStream (org.postgresql.core.v3.QueryExecutorImpl)
> transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
> commitQuery (org.postgresql.jdbc.PgConnection)
> connection (org.postgresql.jdbc.PgResultSet)
> val$rs
>
(sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> at
>
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88)
> at
>
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> at
>
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
> at
>
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362)
> at
>
org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
> at
>
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
> at
>
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
> at
>
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
> at
>
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
> ... 20 more
> Caused by: java.lang.IllegalArgumentException: Unable to create
> serializer
"com.esotericsoftware.kryo.serializers.FieldSerializer" for
> class: org.apache.logging.log4j.core.layout.AbstractCsvLayout
> at
>
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48)
> at
>
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26)
> at
com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351)
> at
com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58)
> at
com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344)
> at
>
com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
> at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461)
> at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
> at
>
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
> at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
> at
>
com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239)
> at
>
com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
> at
>
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75)
> at
>
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ... 62 more
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown
Source)
> at
>
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
>
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35)
> ... 78 more
> Caused by: java.lang.NoClassDefFoundError:
> Lorg/apache/commons/csv/CSVFormat;
> at java.lang.Class.getDeclaredFields0(Native Method)
> at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
> at java.lang.Class.getDeclaredFields(Class.java:1916)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133)
> ... 82 more
> Caused by: java.lang.ClassNotFoundException:
> org.apache.commons.csv.CSVFormat
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> ... 88 more
>
> Any help would be great. I tried manually including CSVFormat from
> apache commons but didn't change anything.
>
> Many thanks,
> Chris Stevens
> Head of Research & Development
> +44 7565 034 595