http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Fwd-Re-Updating-ValueState-not-working-in-hosted-Kinesis-tp32905p32911.html
your observation is right. By `new Sensor() {}` instead of just `new
used as a generic type. I assume because you declared `implements
> Thanks again Timo, I hope I replied correctly this time.
>
> As per my previous message the Sensor class is a very simple POJO type
> (I think).
>
> When the serialization trace talks about PGSql stuff it makes me think
> that something from my operator is being included in serialization. Not
> just the Sensor object itself which I am explicitly including in state.
>
> packagesensingfeeling.functions.mapping;
>
> publicfinalclassArbJoinFunctionextendsRichJoinFunction<TypeB, TypeC>,
> TypeA> {
>
> privatestaticfinallongserialVersionUID= 8582433437601788991L;
>
> privatetransientValueState<Sensor> sensorState;
>
> @Override
> publicTypeAjoin(TypeBframe, TypeCactiveMotionPaths)
> throwsJsonProcessingException{
>
> Sensorsensor= sensorState.value();
> if(sensor == null) {
> LOG.debug("Sensor was not in state, getting sensor: "+ frame.sensorId);
> sensor = getSensor(frame);
> sensorState.update(sensor);
> }
>
> returnnewTypeA();
> }
>
> @Override
> publicvoidopen(Configurationconfig) {
> LOG.debug("Sensor open method called", config);
>
> StateTtlConfigsensorTtlConfig= StateTtlConfig.newBuilder(Time.minutes(1))
> .cleanupInBackground()
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build();
>
> ValueStateDescriptor<Sensor> sensorStateDescriptor=
> newValueStateDescriptor<>( "sensor",
> TypeInformation.of(newTypeHint<Sensor>(){}));
> // sensorStateDescriptor.enableTimeToLive(sensorTtlConfig);
> sensorState = getRuntimeContext().getState(sensorStateDescriptor);
>
> }
>
> privateSensorgetSensor(TypeBframe) throwsException{
>
> Class.forName("org.postgresql.Driver");
> try(Connectioncon= DriverManager.getConnection(dbURL, dbUser, dbPassword);
> Statementst= con.createStatement();
> ResultSetrs= st.executeQuery("SELECT * from sensor where sensorid = '"+
> frame.sensorId+ "'")) {
>
> if(rs.next()) {
> Sensorsensor= newSensor() {};
>
> LOG.debug("Got sensor"+ sensor);
>
> returnsensor;
> }
>
> } catch(SQLExceptionex) {
> LOG.error("Error when connection postgres", ex);
> throwex;
> }
>
> returnnull;
> }
>
> }
>
> Above is a cut down version of my operator, I'm guessing it is the
> ResultSet rs that is getting serialized. How do I prevent this
> undesirable behaviour? I'm quite happy for my solution to serialize only
> what I explicitly tell it to, I don't need exactly once or anything.
>
> Many thanks,
> Chris Stevens
> Head of Research & Development
> +44 7565 034 595
>
>
> On Wed, 19 Feb 2020 at 12:19, Timo Walther <
[hidden email]
> <mailto:
[hidden email]>> wrote:
>
> Hi Chris,
>
> [forwarding the private discussion to the mailing list again]
>
> first of all, are you sure that your Sensor class is either a top-level
> class or a static inner class. Because it seems there is way more stuff
> in it (maybe included by accident transitively?). Such as:
>
> org.apache.logging.log4j.core.layout.AbstractCsvLayout
> > Serialization trace:
> > classes (sun.misc.Launcher$AppClassLoader)
> > classloader (java.security.ProtectionDomain)
> > cachedPDs (javax.security.auth.SubjectDomainCombiner)
> > combiner (java.security.AccessControlContext)
> > acc (sun.security.ssl.SSLSocketImpl)
> > connection (org.postgresql.core.PGStream)
> > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
> > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
> > commitQuery (org.postgresql.jdbc.PgConnection)
> > connection (org.postgresql.jdbc.PgResultSet)
> > val$rs
>
> When declaring state you can use
> `org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class<T>)` to
>
> check if your state is a POJO type.
>
> Regards,
> Timo
>
>
> -------- Forwarded Message --------
> Subject: Re: Updating ValueState not working in hosted Kinesis
> Date: Wed, 19 Feb 2020 12:02:16 +0000
> From: Chris Stevens <
[hidden email]
> <mailto:
[hidden email]>>
> To: Timo Walther <
[hidden email] <mailto:
[hidden email]>>
>
>
>
> Hi Timo,
>
> Thanks for your reply. This makes sense to me, how do I treat something
> as a POJO instead of a generic serialized BB type? Sorry relatively new
> to Java and Flink.
>
> This is my full class def:
>
> package sensingfeeling.models;
> import java.io.Serializable;
>
> public class Sensor implements Serializable {
>
> private static final long serialVersionUID =
> 8582433437601788991L;
> public String sensorId;
> public String companyId;
> public String label;
> // public Date createdAt;
> // public Date updatedAt;
> public Integer uncomfortableFaceLimit;
> public Boolean online;
> public String capabilityId;
> // public Date lastOnlineAt;
> // public Date lastOfflineAt;
> public Integer onlineVersionNumber;
> public int status;
> @Override
> public String toString(){
> return this.sensorId + " - " + this.label;
> }
> }
>
> Super simple really.
>
> I'm not trying to upgrade anything as far as I know. Just making an
> operator state aware.
>
> Many thanks,
> Chris Stevens
> Head of Research & Development
> +44 7565 034 595
>
>
> On Wed, 19 Feb 2020 at 11:55, Timo Walther <
[hidden email]
> <mailto:
[hidden email]>
> <mailto:
[hidden email] <mailto:
[hidden email]>>> wrote:
>
> Hi Chris,
>
> it seems there are field serialized into state that actually don't
> belong there. You should aim to treat Sensor as a POJO instead
> of a
> Kryo
> generic serialized black-box type.
>
> Furthermore, it seems that field such as
> "org.apache.logging.log4j.core.layout.AbstractCsvLayout"
> should not be
> state. Is there a "transient" keyword missing?
>
> Are you trying to upgrade your job or the Flink version?
>
> Regards,
> Timo
>
>
>
> On 18.02.20 18:59, Chris Stevens wrote:
> > Hi there,
> >
> > I'm trying to update state in one of my applications hosted in
> Kinesis
> > Data Analytics.
> >
> > private transient ValueState<Sensor> sensorState;
> > using sensorState.update(sensor);
> >
> > Get error:
> >
> > An error occurred: org.apache.flink.util.FlinkRuntimeException:
> Error
> > while adding data to RocksDB
> > at
> >
>
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
> > at
> >
>
> org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
> > at
> >
>
> sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97)
> > at
> >
>
> sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48)
> > at
> >
>
> org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460)
> > at
> >
>
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777)
> > at
> >
>
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
> > at
> >
>
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
> > at
> >
>
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
> > at
> >
>
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
> > at
> >
>
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
> > at
> >
>
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
> > at
> >
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
> > at
> > org.apache.flink.streaming.runtime.io
> <
http://org.apache.flink.streaming.runtime.io>
>
> <
http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
> > at
> >
>
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> > at
> >
>
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> > at
> > org.apache.flink.streaming.runtime.io
> <
http://org.apache.flink.streaming.runtime.io>
>
> <
http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> > at
> >
>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> > at
> >
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: com.esotericsoftware.kryo.KryoException:
> > java.lang.IllegalArgumentException: Unable to create serializer
> > "com.esotericsoftware.kryo.serializers.FieldSerializer" for
> class:
> > org.apache.logging.log4j.core.layout.AbstractCsvLayout
> > Serialization trace:
> > classes (sun.misc.Launcher$AppClassLoader)
> > classloader (java.security.ProtectionDomain)
> > cachedPDs (javax.security.auth.SubjectDomainCombiner)
> > combiner (java.security.AccessControlContext)
> > acc (sun.security.ssl.SSLSocketImpl)
> > connection (org.postgresql.core.PGStream)
> > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
> > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
> > commitQuery (org.postgresql.jdbc.PgConnection)
> > connection (org.postgresql.jdbc.PgResultSet)
> > val$rs
> >
>
> (sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> > at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
> > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> > at
> com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> > at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> > at
> >
>
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
> > at
> >
>
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362)
> > at
> >
>
> org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
> > at
> >
>
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
> > at
> >
>
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
> > at
> >
>
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
> > at
> >
>
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
> > ... 20 more
> > Caused by: java.lang.IllegalArgumentException: Unable to create
> > serializer
> "com.esotericsoftware.kryo.serializers.FieldSerializer" for
> > class: org.apache.logging.log4j.core.layout.AbstractCsvLayout
> > at
> >
>
> com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48)
> > at
> >
>
> com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26)
> > at
> com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351)
> > at
> com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58)
> > at
> com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344)
> > at
> >
>
> com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
> > at
> com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461)
> > at
> com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
> > at
> >
>
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
> > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232)
> > at
> com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
> > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> > ... 62 more
> > Caused by: java.lang.reflect.InvocationTargetException
> > at
> sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown
> Source)
> > at
> >
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> > at
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > at
> >
>
> com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35)
> > ... 78 more
> > Caused by: java.lang.NoClassDefFoundError:
> > Lorg/apache/commons/csv/CSVFormat;
> > at java.lang.Class.getDeclaredFields0(Native Method)
> > at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
> > at java.lang.Class.getDeclaredFields(Class.java:1916)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156)
> > at
> >
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133)
> > ... 82 more
> > Caused by: java.lang.ClassNotFoundException:
> > org.apache.commons.csv.CSVFormat
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> > at
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> > ... 88 more
> >
> > Any help would be great. I tried manually including
> CSVFormat from
> > apache commons but didn't change anything.
> >
> > Many thanks,
> > Chris Stevens
> > Head of Research & Development
> > +44 7565 034 595
>