Fwd: Re: Updating ValueState not working in hosted Kinesis

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Fwd: Re: Updating ValueState not working in hosted Kinesis

Timo Walther
Hi Chris,

[forwarding the private discussion to the mailing list again]

first of all, are you sure that your Sensor class is either a top-level
class or a static inner class. Because it seems there is way more stuff
in it (maybe included by accident transitively?). Such as:

org.apache.logging.log4j.core.layout.AbstractCsvLayout
      > Serialization trace:
      > classes (sun.misc.Launcher$AppClassLoader)
      > classloader (java.security.ProtectionDomain)
      > cachedPDs (javax.security.auth.SubjectDomainCombiner)
      > combiner (java.security.AccessControlContext)
      > acc (sun.security.ssl.SSLSocketImpl)
      > connection (org.postgresql.core.PGStream)
      > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
      > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
      > commitQuery (org.postgresql.jdbc.PgConnection)
      > connection (org.postgresql.jdbc.PgResultSet)
      > val$rs

When declaring state you can use
`org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class<T>)` to
check if your state is a POJO type.

Regards,
Timo


-------- Forwarded Message --------
Subject: Re: Updating ValueState not working in hosted Kinesis
Date: Wed, 19 Feb 2020 12:02:16 +0000
From: Chris Stevens <[hidden email]>
To: Timo Walther <[hidden email]>



Hi Timo,

Thanks for your reply. This makes sense to me, how do I treat something
as a POJO instead of a generic serialized BB type? Sorry relatively new
to Java and Flink.

This is my full class def:

package sensingfeeling.models;
import java.io.Serializable;

public class Sensor implements Serializable {

      private static final long serialVersionUID = 8582433437601788991L;
      public String sensorId;
      public String companyId;
      public String label;
      // public Date createdAt;
      // public Date updatedAt;
      public Integer uncomfortableFaceLimit;
      public Boolean online;
      public String capabilityId;
      // public Date lastOnlineAt;
      // public Date lastOfflineAt;
      public Integer onlineVersionNumber;
      public int status;
      @Override
      public String toString(){
          return this.sensorId + " - " + this.label;
      }
}

Super simple really.

I'm not trying to upgrade anything as far as I know. Just making an
operator state aware.

Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595


On Wed, 19 Feb 2020 at 11:55, Timo Walther <[hidden email]
<mailto:[hidden email]>> wrote:

     Hi Chris,

     it seems there are field serialized into state that actually don't
     belong there. You should aim to treat Sensor as a POJO instead of a
     Kryo
     generic serialized black-box type.

     Furthermore, it seems that field such as
     "org.apache.logging.log4j.core.layout.AbstractCsvLayout" should not be
     state. Is there a "transient" keyword missing?

     Are you trying to upgrade your job or the Flink version?

     Regards,
     Timo



     On 18.02.20 18:59, Chris Stevens wrote:
      > Hi there,
      >
      > I'm trying to update state in one of my applications hosted in
     Kinesis
      > Data Analytics.
      >
      > private transient ValueState<Sensor> sensorState;
      > using sensorState.update(sensor);
      >
      > Get error:
      >
      > An error occurred: org.apache.flink.util.FlinkRuntimeException:
     Error
      > while adding data to RocksDB
      > at
      >
 
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
      > at
      >
 
org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
      > at
      >
 
sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97)
      > at
      >
 
sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48)
      > at
      >
 
org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460)
      > at
      >
 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777)
      > at
      >
 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
      > at
      >
 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
      > at
      >
 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
      > at
      >
 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
      > at
      >
 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
      > at
      >
 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
      > at
      >
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
      > at
      > org.apache.flink.streaming.runtime.io
 
<http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
      > at
      >
 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
      > at
      >
 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
      > at
      > org.apache.flink.streaming.runtime.io
 
<http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
      > at
      >
 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
      > at
      >
 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
      > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
      > at java.lang.Thread.run(Thread.java:748)
      > Caused by: com.esotericsoftware.kryo.KryoException:
      > java.lang.IllegalArgumentException: Unable to create serializer
      > "com.esotericsoftware.kryo.serializers.FieldSerializer" for class:
      > org.apache.logging.log4j.core.layout.AbstractCsvLayout
      > Serialization trace:
      > classes (sun.misc.Launcher$AppClassLoader)
      > classloader (java.security.ProtectionDomain)
      > cachedPDs (javax.security.auth.SubjectDomainCombiner)
      > combiner (java.security.AccessControlContext)
      > acc (sun.security.ssl.SSLSocketImpl)
      > connection (org.postgresql.core.PGStream)
      > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
      > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
      > commitQuery (org.postgresql.jdbc.PgConnection)
      > connection (org.postgresql.jdbc.PgResultSet)
      > val$rs
      >
 
(sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
      > at
      >
 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88)
      > at
      >
 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
      > at
      >
 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
      > at
      >
 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362)
      > at
      >
 
org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
      > at
      >
 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
      > at
      >
 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
      > at
      >
 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
      > at
      >
 
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
      > ... 20 more
      > Caused by: java.lang.IllegalArgumentException: Unable to create
      > serializer
     "com.esotericsoftware.kryo.serializers.FieldSerializer" for
      > class: org.apache.logging.log4j.core.layout.AbstractCsvLayout
      > at
      >
 
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48)
      > at
      >
 
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26)
      > at
com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351)
      > at
com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58)
      > at
com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344)
      > at
      >
 
com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
      > at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461)
      > at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
      > at
      >
 
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
      > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
      > at
      >
 
com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239)
      > at
      >
 
com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232)
      > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
      > at
      >
 
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75)
      > at
      >
 
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >
 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > ... 62 more
      > Caused by: java.lang.reflect.InvocationTargetException
      > at sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown
     Source)
      > at
      >
 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      > at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
      > at
      >
 
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35)
      > ... 78 more
      > Caused by: java.lang.NoClassDefFoundError:
      > Lorg/apache/commons/csv/CSVFormat;
      > at java.lang.Class.getDeclaredFields0(Native Method)
      > at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
      > at java.lang.Class.getDeclaredFields(Class.java:1916)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156)
      > at
      >
 
com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133)
      > ... 82 more
      > Caused by: java.lang.ClassNotFoundException:
      > org.apache.commons.csv.CSVFormat
      > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
      > at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
      > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
      > at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
      > ... 88 more
      >
      > Any help would be great. I tried manually including CSVFormat from
      > apache commons but didn't change anything.
      >
      > Many thanks,
      > Chris Stevens
      > Head of Research & Development
      > +44 7565 034 595

Reply | Threaded
Open this post in threaded view
|

Re: Re: Updating ValueState not working in hosted Kinesis

Chris Stevens
Thanks again Timo, I hope I replied correctly this time.

As per my previous message the Sensor class is a very simple POJO type (I think).

When the serialization trace talks about PGSql stuff it makes me think that something from my operator is being included in serialization. Not just the Sensor object itself which I am explicitly including in state.

package sensingfeeling.functions.mapping;

public final class ArbJoinFunction extends RichJoinFunction<TypeB, TypeC>, TypeA> {

private static final long serialVersionUID = 8582433437601788991L;

private transient ValueState<Sensor> sensorState;

@Override
public TypeA join(TypeB frame, TypeC activeMotionPaths) throws JsonProcessingException {

Sensor sensor = sensorState.value();
if (sensor == null) {
LOG.debug("Sensor was not in state, getting sensor: " + frame.sensorId);
sensor = getSensor(frame);
sensorState.update(sensor);
}

return new TypeA();
}

@Override
public void open(Configuration config) {
LOG.debug("Sensor open method called", config);

StateTtlConfig sensorTtlConfig = StateTtlConfig.newBuilder(Time.minutes(1))
.cleanupInBackground()
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build();

ValueStateDescriptor<Sensor> sensorStateDescriptor = new ValueStateDescriptor<>( "sensor", TypeInformation.of(new TypeHint<Sensor>(){}));
// sensorStateDescriptor.enableTimeToLive(sensorTtlConfig);
sensorState = getRuntimeContext().getState(sensorStateDescriptor);

}

private Sensor getSensor(TypeB frame) throws Exception {

Class.forName("org.postgresql.Driver");
try (Connection con = DriverManager.getConnection(dbURL, dbUser, dbPassword);
Statement st = con.createStatement();
ResultSet rs = st.executeQuery("SELECT * from sensor where sensorid = '" + frame.sensorId + "'" )) {

if(rs.next()) {
Sensor sensor = new Sensor() {};

LOG.debug("Got sensor" + sensor);

return sensor;
}

} catch (SQLException ex) {
LOG.error("Error when connection postgres", ex);
throw ex;
}

return null;
}

}

Above is a cut down version of my operator, I'm guessing it is the ResultSet rs that is getting serialized. How do I prevent this undesirable behaviour? I'm quite happy for my solution to serialize only what I explicitly tell it to, I don't need exactly once or anything.

Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595


On Wed, 19 Feb 2020 at 12:19, Timo Walther <[hidden email]> wrote:
Hi Chris,

[forwarding the private discussion to the mailing list again]

first of all, are you sure that your Sensor class is either a top-level
class or a static inner class. Because it seems there is way more stuff
in it (maybe included by accident transitively?). Such as:

org.apache.logging.log4j.core.layout.AbstractCsvLayout
      > Serialization trace:
      > classes (sun.misc.Launcher$AppClassLoader)
      > classloader (java.security.ProtectionDomain)
      > cachedPDs (javax.security.auth.SubjectDomainCombiner)
      > combiner (java.security.AccessControlContext)
      > acc (sun.security.ssl.SSLSocketImpl)
      > connection (org.postgresql.core.PGStream)
      > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
      > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
      > commitQuery (org.postgresql.jdbc.PgConnection)
      > connection (org.postgresql.jdbc.PgResultSet)
      > val$rs

When declaring state you can use
`org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class<T>)` to
check if your state is a POJO type.

Regards,
Timo


-------- Forwarded Message --------
Subject:        Re: Updating ValueState not working in hosted Kinesis
Date:   Wed, 19 Feb 2020 12:02:16 +0000
From:   Chris Stevens <[hidden email]>
To:     Timo Walther <[hidden email]>



Hi Timo,

Thanks for your reply. This makes sense to me, how do I treat something
as a POJO instead of a generic serialized BB type? Sorry relatively new
to Java and Flink.

This is my full class def:

package sensingfeeling.models;
import java.io.Serializable;

public class Sensor implements Serializable {

      private static final long serialVersionUID = 8582433437601788991L;
      public String sensorId;
      public String companyId;
      public String label;
      // public Date createdAt;
      // public Date updatedAt;
      public Integer uncomfortableFaceLimit;
      public Boolean online;
      public String capabilityId;
      // public Date lastOnlineAt;
      // public Date lastOfflineAt;
      public Integer onlineVersionNumber;
      public int status;
      @Override
      public String toString(){
          return this.sensorId + " - " + this.label;
      }
}

Super simple really.

I'm not trying to upgrade anything as far as I know. Just making an
operator state aware.

Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595


On Wed, 19 Feb 2020 at 11:55, Timo Walther <[hidden email]
<mailto:[hidden email]>> wrote:

     Hi Chris,

     it seems there are field serialized into state that actually don't
     belong there. You should aim to treat Sensor as a POJO instead of a
     Kryo
     generic serialized black-box type.

     Furthermore, it seems that field such as
     "org.apache.logging.log4j.core.layout.AbstractCsvLayout" should not be
     state. Is there a "transient" keyword missing?

     Are you trying to upgrade your job or the Flink version?

     Regards,
     Timo



     On 18.02.20 18:59, Chris Stevens wrote:
      > Hi there,
      >
      > I'm trying to update state in one of my applications hosted in
     Kinesis
      > Data Analytics.
      >
      > private transient ValueState<Sensor> sensorState;
      > using sensorState.update(sensor);
      >
      > Get error:
      >
      > An error occurred: org.apache.flink.util.FlinkRuntimeException:
     Error
      > while adding data to RocksDB
      > at
      >

org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
      > at
      >

org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
      > at
      >

sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97)
      > at
      >

sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48)
      > at
      >

org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460)
      > at
      >

org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777)
      > at
      >

org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
      > at
      >

org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
      > at
      >

org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
      > at
      >

org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
      > at
      >

org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
      > at
      >

org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
      > at
      >

org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
      > at
      > org.apache.flink.streaming.runtime.io

<http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
      > at
      >

org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
      > at
      >

org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
      > at
      > org.apache.flink.streaming.runtime.io

<http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
      > at
      >

org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
      > at
      >

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
      > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
      > at java.lang.Thread.run(Thread.java:748)
      > Caused by: com.esotericsoftware.kryo.KryoException:
      > java.lang.IllegalArgumentException: Unable to create serializer
      > "com.esotericsoftware.kryo.serializers.FieldSerializer" for class:
      > org.apache.logging.log4j.core.layout.AbstractCsvLayout
      > Serialization trace:
      > classes (sun.misc.Launcher$AppClassLoader)
      > classloader (java.security.ProtectionDomain)
      > cachedPDs (javax.security.auth.SubjectDomainCombiner)
      > combiner (java.security.AccessControlContext)
      > acc (sun.security.ssl.SSLSocketImpl)
      > connection (org.postgresql.core.PGStream)
      > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
      > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
      > commitQuery (org.postgresql.jdbc.PgConnection)
      > connection (org.postgresql.jdbc.PgResultSet)
      > val$rs
      >

(sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4)
      > at
      >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
      > at
      >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
      > at
      >

com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88)
      > at
      >

com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
      > at
      >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
      > at
      >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > at
      >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
      > at
      >

org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
      > at
      >

org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362)
      > at
      >

org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
      > at
      >

org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
      > at
      >

org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
      > at
      >

org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
      > at
      >

org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
      > ... 20 more
      > Caused by: java.lang.IllegalArgumentException: Unable to create
      > serializer
     "com.esotericsoftware.kryo.serializers.FieldSerializer" for
      > class: org.apache.logging.log4j.core.layout.AbstractCsvLayout
      > at
      >

com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48)
      > at
      >

com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26)
      > at
com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351)
      > at
com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58)
      > at
com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344)
      > at
      >

com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
      > at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461)
      > at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
      > at
      >

com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
      > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
      > at
      >

com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239)
      > at
      >

com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232)
      > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
      > at
      >

com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75)
      > at
      >

com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
      > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      > at
      >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      > ... 62 more
      > Caused by: java.lang.reflect.InvocationTargetException
      > at sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown
     Source)
      > at
      >

sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      > at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
      > at
      >

com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35)
      > ... 78 more
      > Caused by: java.lang.NoClassDefFoundError:
      > Lorg/apache/commons/csv/CSVFormat;
      > at java.lang.Class.getDeclaredFields0(Native Method)
      > at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
      > at java.lang.Class.getDeclaredFields(Class.java:1916)
      > at
      >

com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193)
      > at
      >

com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156)
      > at
      >

com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133)
      > ... 82 more
      > Caused by: java.lang.ClassNotFoundException:
      > org.apache.commons.csv.CSVFormat
      > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
      > at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
      > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
      > at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
      > ... 88 more
      >
      > Any help would be great. I tried manually including CSVFormat from
      > apache commons but didn't change anything.
      >
      > Many thanks,
      > Chris Stevens
      > Head of Research & Development
      > +44 7565 034 595

Reply | Threaded
Open this post in threaded view
|

Re: Updating ValueState not working in hosted Kinesis

Timo Walther
Hi Chris,

your observation is right. By `new Sensor() {}` instead of just `new
Sensor()` you are creating an anonymous non-static class that references
the outer method and class.

If you check your logs, there might be also a reason why your POJO is
used as a generic type. I assume because you declared `implements
Serializable` which forces Flink to think "user wants to deal with
serialization of the POJO himself".

Regards,
Timo


On 19.02.20 14:24, Chris Stevens wrote:

> Thanks again Timo, I hope I replied correctly this time.
>
> As per my previous message the Sensor class is a very simple POJO type
> (I think).
>
> When the serialization trace talks about PGSql stuff it makes me think
> that something from my operator is being included in serialization. Not
> just the Sensor object itself which I am explicitly including in state.
>
> packagesensingfeeling.functions.mapping;
>
> publicfinalclassArbJoinFunctionextendsRichJoinFunction<TypeB, TypeC>,
> TypeA> {
>
> privatestaticfinallongserialVersionUID= 8582433437601788991L;
>
> privatetransientValueState<Sensor> sensorState;
>
> @Override
> publicTypeAjoin(TypeBframe, TypeCactiveMotionPaths)
> throwsJsonProcessingException{
>
> Sensorsensor= sensorState.value();
> if(sensor == null) {
> LOG.debug("Sensor was not in state, getting sensor: "+ frame.sensorId);
> sensor = getSensor(frame);
> sensorState.update(sensor);
> }
>
> returnnewTypeA();
> }
>
> @Override
> publicvoidopen(Configurationconfig) {
> LOG.debug("Sensor open method called", config);
>
> StateTtlConfigsensorTtlConfig= StateTtlConfig.newBuilder(Time.minutes(1))
> .cleanupInBackground()
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build();
>
> ValueStateDescriptor<Sensor> sensorStateDescriptor=
> newValueStateDescriptor<>( "sensor",
> TypeInformation.of(newTypeHint<Sensor>(){}));
> // sensorStateDescriptor.enableTimeToLive(sensorTtlConfig);
> sensorState = getRuntimeContext().getState(sensorStateDescriptor);
>
> }
>
> privateSensorgetSensor(TypeBframe) throwsException{
>
> Class.forName("org.postgresql.Driver");
> try(Connectioncon= DriverManager.getConnection(dbURL, dbUser, dbPassword);
> Statementst= con.createStatement();
> ResultSetrs= st.executeQuery("SELECT * from sensor where sensorid = '"+
> frame.sensorId+ "'")) {
>
> if(rs.next()) {
> Sensorsensor= newSensor() {};
>
> LOG.debug("Got sensor"+ sensor);
>
> returnsensor;
> }
>
> } catch(SQLExceptionex) {
> LOG.error("Error when connection postgres", ex);
> throwex;
> }
>
> returnnull;
> }
>
> }
>
> Above is a cut down version of my operator, I'm guessing it is the
> ResultSet rs that is getting serialized. How do I prevent this
> undesirable behaviour? I'm quite happy for my solution to serialize only
> what I explicitly tell it to, I don't need exactly once or anything.
>
> Many thanks,
> Chris Stevens
> Head of Research & Development
> +44 7565 034 595
>
>
> On Wed, 19 Feb 2020 at 12:19, Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Chris,
>
>     [forwarding the private discussion to the mailing list again]
>
>     first of all, are you sure that your Sensor class is either a top-level
>     class or a static inner class. Because it seems there is way more stuff
>     in it (maybe included by accident transitively?). Such as:
>
>     org.apache.logging.log4j.core.layout.AbstractCsvLayout
>            > Serialization trace:
>            > classes (sun.misc.Launcher$AppClassLoader)
>            > classloader (java.security.ProtectionDomain)
>            > cachedPDs (javax.security.auth.SubjectDomainCombiner)
>            > combiner (java.security.AccessControlContext)
>            > acc (sun.security.ssl.SSLSocketImpl)
>            > connection (org.postgresql.core.PGStream)
>            > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
>            > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
>            > commitQuery (org.postgresql.jdbc.PgConnection)
>            > connection (org.postgresql.jdbc.PgResultSet)
>            > val$rs
>
>     When declaring state you can use
>     `org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class<T>)` to
>
>     check if your state is a POJO type.
>
>     Regards,
>     Timo
>
>
>     -------- Forwarded Message --------
>     Subject:        Re: Updating ValueState not working in hosted Kinesis
>     Date:   Wed, 19 Feb 2020 12:02:16 +0000
>     From:   Chris Stevens <[hidden email]
>     <mailto:[hidden email]>>
>     To:     Timo Walther <[hidden email] <mailto:[hidden email]>>
>
>
>
>     Hi Timo,
>
>     Thanks for your reply. This makes sense to me, how do I treat something
>     as a POJO instead of a generic serialized BB type? Sorry relatively new
>     to Java and Flink.
>
>     This is my full class def:
>
>     package sensingfeeling.models;
>     import java.io.Serializable;
>
>     public class Sensor implements Serializable {
>
>            private static final long serialVersionUID =
>     8582433437601788991L;
>            public String sensorId;
>            public String companyId;
>            public String label;
>            // public Date createdAt;
>            // public Date updatedAt;
>            public Integer uncomfortableFaceLimit;
>            public Boolean online;
>            public String capabilityId;
>            // public Date lastOnlineAt;
>            // public Date lastOfflineAt;
>            public Integer onlineVersionNumber;
>            public int status;
>            @Override
>            public String toString(){
>                return this.sensorId + " - " + this.label;
>            }
>     }
>
>     Super simple really.
>
>     I'm not trying to upgrade anything as far as I know. Just making an
>     operator state aware.
>
>     Many thanks,
>     Chris Stevens
>     Head of Research & Development
>     +44 7565 034 595
>
>
>     On Wed, 19 Feb 2020 at 11:55, Timo Walther <[hidden email]
>     <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>
>           Hi Chris,
>
>           it seems there are field serialized into state that actually don't
>           belong there. You should aim to treat Sensor as a POJO instead
>     of a
>           Kryo
>           generic serialized black-box type.
>
>           Furthermore, it seems that field such as
>           "org.apache.logging.log4j.core.layout.AbstractCsvLayout"
>     should not be
>           state. Is there a "transient" keyword missing?
>
>           Are you trying to upgrade your job or the Flink version?
>
>           Regards,
>           Timo
>
>
>
>           On 18.02.20 18:59, Chris Stevens wrote:
>            > Hi there,
>            >
>            > I'm trying to update state in one of my applications hosted in
>           Kinesis
>            > Data Analytics.
>            >
>            > private transient ValueState<Sensor> sensorState;
>            > using sensorState.update(sensor);
>            >
>            > Get error:
>            >
>            > An error occurred: org.apache.flink.util.FlinkRuntimeException:
>           Error
>            > while adding data to RocksDB
>            > at
>            >
>
>     org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
>            > at
>            >
>
>     org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
>            > at
>            >
>
>     sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97)
>            > at
>            >
>
>     sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48)
>            > at
>            >
>
>     org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460)
>            > at
>            >
>
>     org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
>            > at
>            >
>
>     org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
>            > at
>            >
>
>     org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
>            > at
>            >
>
>     org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
>            > at
>            > org.apache.flink.streaming.runtime.io
>     <http://org.apache.flink.streaming.runtime.io>
>
>     <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>            > at
>            > org.apache.flink.streaming.runtime.io
>     <http://org.apache.flink.streaming.runtime.io>
>
>     <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
>            > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
>            > at java.lang.Thread.run(Thread.java:748)
>            > Caused by: com.esotericsoftware.kryo.KryoException:
>            > java.lang.IllegalArgumentException: Unable to create serializer
>            > "com.esotericsoftware.kryo.serializers.FieldSerializer" for
>     class:
>            > org.apache.logging.log4j.core.layout.AbstractCsvLayout
>            > Serialization trace:
>            > classes (sun.misc.Launcher$AppClassLoader)
>            > classloader (java.security.ProtectionDomain)
>            > cachedPDs (javax.security.auth.SubjectDomainCombiner)
>            > combiner (java.security.AccessControlContext)
>            > acc (sun.security.ssl.SSLSocketImpl)
>            > connection (org.postgresql.core.PGStream)
>            > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
>            > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
>            > commitQuery (org.postgresql.jdbc.PgConnection)
>            > connection (org.postgresql.jdbc.PgResultSet)
>            > val$rs
>            >
>
>     (sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at
>     com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at
>     com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at
>     com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>            > at
>            >
>
>     org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
>            > at
>            >
>
>     org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362)
>            > at
>            >
>
>     org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
>            > at
>            >
>
>     org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
>            > at
>            >
>
>     org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
>            > at
>            >
>
>     org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
>            > at
>            >
>
>     org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
>            > ... 20 more
>            > Caused by: java.lang.IllegalArgumentException: Unable to create
>            > serializer
>           "com.esotericsoftware.kryo.serializers.FieldSerializer" for
>            > class: org.apache.logging.log4j.core.layout.AbstractCsvLayout
>            > at
>            >
>
>     com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48)
>            > at
>            >
>
>     com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26)
>            > at
>     com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351)
>            > at
>     com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58)
>            > at
>     com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344)
>            > at
>            >
>
>     com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
>            > at
>     com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461)
>            > at
>     com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
>            > at
>            >
>
>     com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
>            > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232)
>            > at
>     com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > ... 62 more
>            > Caused by: java.lang.reflect.InvocationTargetException
>            > at
>     sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown
>           Source)
>            > at
>            >
>
>     sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>            > at
>     java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>            > at
>            >
>
>     com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35)
>            > ... 78 more
>            > Caused by: java.lang.NoClassDefFoundError:
>            > Lorg/apache/commons/csv/CSVFormat;
>            > at java.lang.Class.getDeclaredFields0(Native Method)
>            > at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
>            > at java.lang.Class.getDeclaredFields(Class.java:1916)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133)
>            > ... 82 more
>            > Caused by: java.lang.ClassNotFoundException:
>            > org.apache.commons.csv.CSVFormat
>            > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>            > at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>            > at
>     sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
>            > at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>            > ... 88 more
>            >
>            > Any help would be great. I tried manually including
>     CSVFormat from
>            > apache commons but didn't change anything.
>            >
>            > Many thanks,
>            > Chris Stevens
>            > Head of Research & Development
>            > +44 7565 034 595
>

Reply | Threaded
Open this post in threaded view
|

Re: Updating ValueState not working in hosted Kinesis

Chris Stevens
Thanks so much Timo, got it working now. All down to my lack of Java skill.

Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595


On Wed, 19 Feb 2020 at 15:12, Timo Walther <[hidden email]> wrote:
Hi Chris,

your observation is right. By `new Sensor() {}` instead of just `new
Sensor()` you are creating an anonymous non-static class that references
the outer method and class.

If you check your logs, there might be also a reason why your POJO is
used as a generic type. I assume because you declared `implements
Serializable` which forces Flink to think "user wants to deal with
serialization of the POJO himself".

Regards,
Timo


On 19.02.20 14:24, Chris Stevens wrote:
> Thanks again Timo, I hope I replied correctly this time.
>
> As per my previous message the Sensor class is a very simple POJO type
> (I think).
>
> When the serialization trace talks about PGSql stuff it makes me think
> that something from my operator is being included in serialization. Not
> just the Sensor object itself which I am explicitly including in state.
>
> packagesensingfeeling.functions.mapping;
>
> publicfinalclassArbJoinFunctionextendsRichJoinFunction<TypeB, TypeC>,
> TypeA> {
>
> privatestaticfinallongserialVersionUID= 8582433437601788991L;
>
> privatetransientValueState<Sensor> sensorState;
>
> @Override
> publicTypeAjoin(TypeBframe, TypeCactiveMotionPaths)
> throwsJsonProcessingException{
>
> Sensorsensor= sensorState.value();
> if(sensor == null) {
> LOG.debug("Sensor was not in state, getting sensor: "+ frame.sensorId);
> sensor = getSensor(frame);
> sensorState.update(sensor);
> }
>
> returnnewTypeA();
> }
>
> @Override
> publicvoidopen(Configurationconfig) {
> LOG.debug("Sensor open method called", config);
>
> StateTtlConfigsensorTtlConfig= StateTtlConfig.newBuilder(Time.minutes(1))
> .cleanupInBackground()
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build();
>
> ValueStateDescriptor<Sensor> sensorStateDescriptor=
> newValueStateDescriptor<>( "sensor",
> TypeInformation.of(newTypeHint<Sensor>(){}));
> // sensorStateDescriptor.enableTimeToLive(sensorTtlConfig);
> sensorState = getRuntimeContext().getState(sensorStateDescriptor);
>
> }
>
> privateSensorgetSensor(TypeBframe) throwsException{
>
> Class.forName("org.postgresql.Driver");
> try(Connectioncon= DriverManager.getConnection(dbURL, dbUser, dbPassword);
> Statementst= con.createStatement();
> ResultSetrs= st.executeQuery("SELECT * from sensor where sensorid = '"+
> frame.sensorId+ "'")) {
>
> if(rs.next()) {
> Sensorsensor= newSensor() {};
>
> LOG.debug("Got sensor"+ sensor);
>
> returnsensor;
> }
>
> } catch(SQLExceptionex) {
> LOG.error("Error when connection postgres", ex);
> throwex;
> }
>
> returnnull;
> }
>
> }
>
> Above is a cut down version of my operator, I'm guessing it is the
> ResultSet rs that is getting serialized. How do I prevent this
> undesirable behaviour? I'm quite happy for my solution to serialize only
> what I explicitly tell it to, I don't need exactly once or anything.
>
> Many thanks,
> Chris Stevens
> Head of Research & Development
> +44 7565 034 595
>
>
> On Wed, 19 Feb 2020 at 12:19, Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Chris,
>
>     [forwarding the private discussion to the mailing list again]
>
>     first of all, are you sure that your Sensor class is either a top-level
>     class or a static inner class. Because it seems there is way more stuff
>     in it (maybe included by accident transitively?). Such as:
>
>     org.apache.logging.log4j.core.layout.AbstractCsvLayout
>            > Serialization trace:
>            > classes (sun.misc.Launcher$AppClassLoader)
>            > classloader (java.security.ProtectionDomain)
>            > cachedPDs (javax.security.auth.SubjectDomainCombiner)
>            > combiner (java.security.AccessControlContext)
>            > acc (sun.security.ssl.SSLSocketImpl)
>            > connection (org.postgresql.core.PGStream)
>            > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
>            > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
>            > commitQuery (org.postgresql.jdbc.PgConnection)
>            > connection (org.postgresql.jdbc.PgResultSet)
>            > val$rs
>
>     When declaring state you can use
>     `org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class<T>)` to
>
>     check if your state is a POJO type.
>
>     Regards,
>     Timo
>
>
>     -------- Forwarded Message --------
>     Subject:        Re: Updating ValueState not working in hosted Kinesis
>     Date:   Wed, 19 Feb 2020 12:02:16 +0000
>     From:   Chris Stevens <[hidden email]
>     <mailto:[hidden email]>>
>     To:     Timo Walther <[hidden email] <mailto:[hidden email]>>
>
>
>
>     Hi Timo,
>
>     Thanks for your reply. This makes sense to me, how do I treat something
>     as a POJO instead of a generic serialized BB type? Sorry relatively new
>     to Java and Flink.
>
>     This is my full class def:
>
>     package sensingfeeling.models;
>     import java.io.Serializable;
>
>     public class Sensor implements Serializable {
>
>            private static final long serialVersionUID =
>     8582433437601788991L;
>            public String sensorId;
>            public String companyId;
>            public String label;
>            // public Date createdAt;
>            // public Date updatedAt;
>            public Integer uncomfortableFaceLimit;
>            public Boolean online;
>            public String capabilityId;
>            // public Date lastOnlineAt;
>            // public Date lastOfflineAt;
>            public Integer onlineVersionNumber;
>            public int status;
>            @Override
>            public String toString(){
>                return this.sensorId + " - " + this.label;
>            }
>     }
>
>     Super simple really.
>
>     I'm not trying to upgrade anything as far as I know. Just making an
>     operator state aware.
>
>     Many thanks,
>     Chris Stevens
>     Head of Research & Development
>     +44 7565 034 595
>
>
>     On Wed, 19 Feb 2020 at 11:55, Timo Walther <[hidden email]
>     <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>
>           Hi Chris,
>
>           it seems there are field serialized into state that actually don't
>           belong there. You should aim to treat Sensor as a POJO instead
>     of a
>           Kryo
>           generic serialized black-box type.
>
>           Furthermore, it seems that field such as
>           "org.apache.logging.log4j.core.layout.AbstractCsvLayout"
>     should not be
>           state. Is there a "transient" keyword missing?
>
>           Are you trying to upgrade your job or the Flink version?
>
>           Regards,
>           Timo
>
>
>
>           On 18.02.20 18:59, Chris Stevens wrote:
>            > Hi there,
>            >
>            > I'm trying to update state in one of my applications hosted in
>           Kinesis
>            > Data Analytics.
>            >
>            > private transient ValueState<Sensor> sensorState;
>            > using sensorState.update(sensor);
>            >
>            > Get error:
>            >
>            > An error occurred: org.apache.flink.util.FlinkRuntimeException:
>           Error
>            > while adding data to RocksDB
>            > at
>            >
>
>     org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
>            > at
>            >
>
>     org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
>            > at
>            >
>
>     sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97)
>            > at
>            >
>
>     sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48)
>            > at
>            >
>
>     org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460)
>            > at
>            >
>
>     org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
>            > at
>            >
>
>     org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
>            > at
>            >
>
>     org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
>            > at
>            >
>
>     org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
>            > at
>            > org.apache.flink.streaming.runtime.io
>     <http://org.apache.flink.streaming.runtime.io>
>
>     <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>            > at
>            > org.apache.flink.streaming.runtime.io
>     <http://org.apache.flink.streaming.runtime.io>
>
>     <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
>            > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
>            > at java.lang.Thread.run(Thread.java:748)
>            > Caused by: com.esotericsoftware.kryo.KryoException:
>            > java.lang.IllegalArgumentException: Unable to create serializer
>            > "com.esotericsoftware.kryo.serializers.FieldSerializer" for
>     class:
>            > org.apache.logging.log4j.core.layout.AbstractCsvLayout
>            > Serialization trace:
>            > classes (sun.misc.Launcher$AppClassLoader)
>            > classloader (java.security.ProtectionDomain)
>            > cachedPDs (javax.security.auth.SubjectDomainCombiner)
>            > combiner (java.security.AccessControlContext)
>            > acc (sun.security.ssl.SSLSocketImpl)
>            > connection (org.postgresql.core.PGStream)
>            > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
>            > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
>            > commitQuery (org.postgresql.jdbc.PgConnection)
>            > connection (org.postgresql.jdbc.PgResultSet)
>            > val$rs
>            >
>
>     (sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at
>     com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at
>     com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at
>     com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>            > at
>            >
>
>     org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
>            > at
>            >
>
>     org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362)
>            > at
>            >
>
>     org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
>            > at
>            >
>
>     org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
>            > at
>            >
>
>     org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
>            > at
>            >
>
>     org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
>            > at
>            >
>
>     org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
>            > ... 20 more
>            > Caused by: java.lang.IllegalArgumentException: Unable to create
>            > serializer
>           "com.esotericsoftware.kryo.serializers.FieldSerializer" for
>            > class: org.apache.logging.log4j.core.layout.AbstractCsvLayout
>            > at
>            >
>
>     com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48)
>            > at
>            >
>
>     com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26)
>            > at
>     com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351)
>            > at
>     com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58)
>            > at
>     com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344)
>            > at
>            >
>
>     com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
>            > at
>     com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461)
>            > at
>     com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
>            > at
>            >
>
>     com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
>            > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232)
>            > at
>     com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > ... 62 more
>            > Caused by: java.lang.reflect.InvocationTargetException
>            > at
>     sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown
>           Source)
>            > at
>            >
>
>     sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>            > at
>     java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>            > at
>            >
>
>     com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35)
>            > ... 78 more
>            > Caused by: java.lang.NoClassDefFoundError:
>            > Lorg/apache/commons/csv/CSVFormat;
>            > at java.lang.Class.getDeclaredFields0(Native Method)
>            > at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
>            > at java.lang.Class.getDeclaredFields(Class.java:1916)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133)
>            > ... 82 more
>            > Caused by: java.lang.ClassNotFoundException:
>            > org.apache.commons.csv.CSVFormat
>            > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>            > at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>            > at
>     sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
>            > at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>            > ... 88 more
>            >
>            > Any help would be great. I tried manually including
>     CSVFormat from
>            > apache commons but didn't change anything.
>            >
>            > Many thanks,
>            > Chris Stevens
>            > Head of Research & Development
>            > +44 7565 034 595
>