Hi there, I'm trying to update state in one of my applications hosted in Kinesis Data Analytics. private transient ValueState<Sensor> sensorState; using sensorState.update(sensor); Get error: An error occurred: org.apache.flink.util.FlinkRuntimeException: Error while adding data to RocksDB at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108) at org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50) at sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97) at sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48) at org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460) at org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714) at java.lang.Thread.run(Thread.java:748) Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: org.apache.logging.log4j.core.layout.AbstractCsvLayout Serialization trace: classes (sun.misc.Launcher$AppClassLoader) classloader (java.security.ProtectionDomain) cachedPDs (javax.security.auth.SubjectDomainCombiner) combiner (java.security.AccessControlContext) acc (sun.security.ssl.SSLSocketImpl) connection (org.postgresql.core.PGStream) pgStream (org.postgresql.core.v3.QueryExecutorImpl) transferModeRegistry (org.postgresql.core.v3.SimpleQuery) commitQuery (org.postgresql.jdbc.PgConnection) connection (org.postgresql.jdbc.PgResultSet) val$rs (sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362) at org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142) at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158) at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178) at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167) at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106) ... 20 more Caused by: java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: org.apache.logging.log4j.core.layout.AbstractCsvLayout at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48) at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26) at com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351) at com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58) at com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344) at com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56) at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461) at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52) at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488) at com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239) at com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ... 62 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35) ... 78 more Caused by: java.lang.NoClassDefFoundError: Lorg/apache/commons/csv/CSVFormat; at java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.privateGetDeclaredFields(Class.java:2583) at java.lang.Class.getDeclaredFields(Class.java:1916) at com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193) at com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156) at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133) ... 82 more Caused by: java.lang.ClassNotFoundException: org.apache.commons.csv.CSVFormat at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 88 more Any help would be great. I tried manually including CSVFormat from apache commons but didn't change anything. Many thanks, Chris Stevens Head of Research & Development +44 7565 034 595 |
Hi Chris,
it seems there are field serialized into state that actually don't belong there. You should aim to treat Sensor as a POJO instead of a Kryo generic serialized black-box type. Furthermore, it seems that field such as "org.apache.logging.log4j.core.layout.AbstractCsvLayout" should not be state. Is there a "transient" keyword missing? Are you trying to upgrade your job or the Flink version? Regards, Timo On 18.02.20 18:59, Chris Stevens wrote: > Hi there, > > I'm trying to update state in one of my applications hosted in Kinesis > Data Analytics. > > private transient ValueState<Sensor> sensorState; > using sensorState.update(sensor); > > Get error: > > An error occurred: org.apache.flink.util.FlinkRuntimeException: Error > while adding data to RocksDB > at > org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108) > at > org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50) > at > sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97) > at > sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48) > at > org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460) > at > org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714) > at java.lang.Thread.run(Thread.java:748) > Caused by: com.esotericsoftware.kryo.KryoException: > java.lang.IllegalArgumentException: Unable to create serializer > "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: > org.apache.logging.log4j.core.layout.AbstractCsvLayout > Serialization trace: > classes (sun.misc.Launcher$AppClassLoader) > classloader (java.security.ProtectionDomain) > cachedPDs (javax.security.auth.SubjectDomainCombiner) > combiner (java.security.AccessControlContext) > acc (sun.security.ssl.SSLSocketImpl) > connection (org.postgresql.core.PGStream) > pgStream (org.postgresql.core.v3.QueryExecutorImpl) > transferModeRegistry (org.postgresql.core.v3.SimpleQuery) > commitQuery (org.postgresql.jdbc.PgConnection) > connection (org.postgresql.jdbc.PgResultSet) > val$rs > (sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > at > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88) > at > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362) > at > org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142) > at > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158) > at > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178) > at > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167) > at > org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106) > ... 20 more > Caused by: java.lang.IllegalArgumentException: Unable to create > serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for > class: org.apache.logging.log4j.core.layout.AbstractCsvLayout > at > com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48) > at > com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26) > at com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351) > at com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58) > at com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56) > at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461) > at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488) > at > com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239) > at > com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > ... 62 more > Caused by: java.lang.reflect.InvocationTargetException > at sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown Source) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35) > ... 78 more > Caused by: java.lang.NoClassDefFoundError: > Lorg/apache/commons/csv/CSVFormat; > at java.lang.Class.getDeclaredFields0(Native Method) > at java.lang.Class.privateGetDeclaredFields(Class.java:2583) > at java.lang.Class.getDeclaredFields(Class.java:1916) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133) > ... 82 more > Caused by: java.lang.ClassNotFoundException: > org.apache.commons.csv.CSVFormat > at java.net.URLClassLoader.findClass(URLClassLoader.java:382) > at java.lang.ClassLoader.loadClass(ClassLoader.java:418) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) > at java.lang.ClassLoader.loadClass(ClassLoader.java:351) > ... 88 more > > Any help would be great. I tried manually including CSVFormat from > apache commons but didn't change anything. > > Many thanks, > Chris Stevens > Head of Research & Development > +44 7565 034 595 |
Free forum by Nabble | Edit this page |