Fwd: Re: Updating ValueState not working in hosted Kinesis

Posted by Timo Walther on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Fwd-Re-Updating-ValueState-not-working-in-hosted-Kinesis-tp32905.html

Hi Chris,

[forwarding the private discussion to the mailing list again]

first of all, are you sure that your Sensor class is either a top-level
class or a static inner class. Because it seems there is way more stuff
in it (maybe included by accident transitively?). Such as:

org.apache.logging.log4j.core.layout.AbstractCsvLayout
      > Serialization trace:
      > classes (sun.misc.Launcher$AppClassLoader)
      > classloader (java.security.ProtectionDomain)
      > cachedPDs (javax.security.auth.SubjectDomainCombiner)
      > combiner (java.security.AccessControlContext)
      > acc (sun.security.ssl.SSLSocketImpl)
      > connection (org.postgresql.core.PGStream)
      > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
      > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
      > commitQuery (org.postgresql.jdbc.PgConnection)
      > connection (org.postgresql.jdbc.PgResultSet)
      > val$rs

When declaring state you can use
`org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class<T>)` to
check if your state is a POJO type.

Regards,
Timo


-------- Forwarded Message --------
Subject: Re: Updating ValueState not working in hosted Kinesis
Date: Wed, 19 Feb 2020 12:02:16 +0000
From: Chris Stevens <[hidden email]>
To: Timo Walther <[hidden email]>



Hi Timo,

Thanks for your reply. This makes sense to me, how do I treat something
as a POJO instead of a generic serialized BB type? Sorry relatively new
to Java and Flink.

This is my full class def:

package sensingfeeling.models;
import java.io.Serializable;

public class Sensor implements Serializable {

      private static final long serialVersionUID = 8582433437601788991L;
      public String sensorId;
      public String companyId;
      public String label;
      // public Date createdAt;
      // public Date updatedAt;
      public Integer uncomfortableFaceLimit;
      public Boolean online;
      public String capabilityId;
      // public Date lastOnlineAt;
      // public Date lastOfflineAt;
      public Integer onlineVersionNumber;
      public int status;
      @Override
      public String toString(){
          return this.sensorId + " - " + this.label;
      }
}

Super simple really.

I'm not trying to upgrade anything as far as I know. Just making an
operator state aware.

Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595


On Wed, 19 Feb 2020 at 11:55, Timo Walther <[hidden email]
<mailto:[hidden email]>> wrote:

     Hi Chris,

     it seems there are field serialized into state that actually don't
     belong there. You should aim to treat Sensor as a POJO instead of a
     Kryo
     generic serialized black-box type.

     Furthermore, it seems that field such as
     "org.apache.logging.log4j.core.layout.AbstractCsvLayout" should not be
     state. Is there a "transient" keyword missing?

     Are you trying to upgrade your job or the Flink version?

     Regards,
     Timo



     On 18.02.20 18:59, Chris Stevens wrote:
      > Hi there,
      >
      > I'm trying to update state in one of my applications hosted in
     Kinesis
      > Data Analytics.
      >
      > private transient ValueState<Sensor> sensorState;
      > using sensorState.update(sensor);
      >
      > Get error:
      >
      > An error occurred: org.apache.flink.util.FlinkRuntimeException:
     Error
      > while adding data to RocksDB
      > at
      >
 
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
      > at
      >
 
org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
      > at
      >
 
sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97)
      > at
      >
 
sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48)
      > at
      >
 
org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460)
      > at
      >
 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777)
      > at
      >
 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
      > at
      >
 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
      > at
      >
 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
      > at
      >
 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
      > at
      >
 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
      > at
      >
 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
      > at
      >
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
      > at
      > org.apache.flink.streaming.runtime.io
 
<http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
      > at
      >
 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
      > at
      >
 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
      > at
      > org.apache.flink.streaming.runtime.io
 
<http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
      > at
      >
 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
      > at
      >
 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
      > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
      > at java.lang.Thread.run(Thread.java:748)
      > Caused by: com.esotericsoftware.kryo.KryoException:
      > java.lang.IllegalArgumentException: Unable to create serializer
      > "com.esotericsoftware.kryo.serializers.FieldSerializer" for class:
      > org.apache.logging.log4j.core.layout.AbstractCsvLayout
      > Serialization trace:
      > classes (sun.misc.Launcher$AppClassLoader)
      > classloader (java.security.ProtectionDomain)
      > cachedPDs (javax.security.auth.SubjectDomainCombiner)
      > combiner (java.security.AccessControlContext)
      > acc (sun.security.ssl.SSLSocketImpl)
      > connection (org.postgresql.core.PGStream)
      > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
      > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
      > commitQuery (org.postgresql.jdbc.PgConnection)
      > connection (org.postgresql.jdbc.PgResultSet)
      > val$rs
      >
 
(sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
      > at
      >
 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88)
      > at
      >
 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
      > at
      >
 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
      > at
      >
 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362)
      > at
      >
 
org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
      > at
      >
 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
      > at
      >
 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
      > at
      >
 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
      > at
      >
 
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
      > ... 20 more
      > Caused by: java.lang.IllegalArgumentException: Unable to create
      > serializer
     "com.esotericsoftware.kryo.serializers.FieldSerializer" for
      > class: org.apache.logging.log4j.core.layout.AbstractCsvLayout
      > at
      >
 
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48)
      > at
      >
 
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26)
      > at
com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351)
      > at
com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58)
      > at
com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344)
      > at
      >
 
com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
      > at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461)
      > at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
      > at
      >
 
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
      > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
      > at
      >
 
com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239)
      > at
      >
 
com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232)
      > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
      > at
      >
 
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75)
      > at
      >
 
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > ... 62 more
      > Caused by: java.lang.reflect.InvocationTargetException
      > at sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown
     Source)
      > at
      >
 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      > at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
      > at
      >
 
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35)
      > ... 78 more
      > Caused by: java.lang.NoClassDefFoundError:
      > Lorg/apache/commons/csv/CSVFormat;
      > at java.lang.Class.getDeclaredFields0(Native Method)
      > at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
      > at java.lang.Class.getDeclaredFields(Class.java:1916)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133)
      > ... 82 more
      > Caused by: java.lang.ClassNotFoundException:
      > org.apache.commons.csv.CSVFormat
      > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
      > at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
      > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
      > at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
      > ... 88 more
      >
      > Any help would be great. I tried manually including CSVFormat from
      > apache commons but didn't change anything.
      >
      > Many thanks,
      > Chris Stevens
      > Head of Research & Development
      > +44 7565 034 595