Re: Updating ValueState not working in hosted Kinesis

Posted by Chris Stevens on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Fwd-Re-Updating-ValueState-not-working-in-hosted-Kinesis-tp32905p32917.html

Thanks so much Timo, got it working now. All down to my lack of Java skill.

Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595


On Wed, 19 Feb 2020 at 15:12, Timo Walther <[hidden email]> wrote:
Hi Chris,

your observation is right. By `new Sensor() {}` instead of just `new
Sensor()` you are creating an anonymous non-static class that references
the outer method and class.

If you check your logs, there might be also a reason why your POJO is
used as a generic type. I assume because you declared `implements
Serializable` which forces Flink to think "user wants to deal with
serialization of the POJO himself".

Regards,
Timo


On 19.02.20 14:24, Chris Stevens wrote:
> Thanks again Timo, I hope I replied correctly this time.
>
> As per my previous message the Sensor class is a very simple POJO type
> (I think).
>
> When the serialization trace talks about PGSql stuff it makes me think
> that something from my operator is being included in serialization. Not
> just the Sensor object itself which I am explicitly including in state.
>
> packagesensingfeeling.functions.mapping;
>
> publicfinalclassArbJoinFunctionextendsRichJoinFunction<TypeB, TypeC>,
> TypeA> {
>
> privatestaticfinallongserialVersionUID= 8582433437601788991L;
>
> privatetransientValueState<Sensor> sensorState;
>
> @Override
> publicTypeAjoin(TypeBframe, TypeCactiveMotionPaths)
> throwsJsonProcessingException{
>
> Sensorsensor= sensorState.value();
> if(sensor == null) {
> LOG.debug("Sensor was not in state, getting sensor: "+ frame.sensorId);
> sensor = getSensor(frame);
> sensorState.update(sensor);
> }
>
> returnnewTypeA();
> }
>
> @Override
> publicvoidopen(Configurationconfig) {
> LOG.debug("Sensor open method called", config);
>
> StateTtlConfigsensorTtlConfig= StateTtlConfig.newBuilder(Time.minutes(1))
> .cleanupInBackground()
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build();
>
> ValueStateDescriptor<Sensor> sensorStateDescriptor=
> newValueStateDescriptor<>( "sensor",
> TypeInformation.of(newTypeHint<Sensor>(){}));
> // sensorStateDescriptor.enableTimeToLive(sensorTtlConfig);
> sensorState = getRuntimeContext().getState(sensorStateDescriptor);
>
> }
>
> privateSensorgetSensor(TypeBframe) throwsException{
>
> Class.forName("org.postgresql.Driver");
> try(Connectioncon= DriverManager.getConnection(dbURL, dbUser, dbPassword);
> Statementst= con.createStatement();
> ResultSetrs= st.executeQuery("SELECT * from sensor where sensorid = '"+
> frame.sensorId+ "'")) {
>
> if(rs.next()) {
> Sensorsensor= newSensor() {};
>
> LOG.debug("Got sensor"+ sensor);
>
> returnsensor;
> }
>
> } catch(SQLExceptionex) {
> LOG.error("Error when connection postgres", ex);
> throwex;
> }
>
> returnnull;
> }
>
> }
>
> Above is a cut down version of my operator, I'm guessing it is the
> ResultSet rs that is getting serialized. How do I prevent this
> undesirable behaviour? I'm quite happy for my solution to serialize only
> what I explicitly tell it to, I don't need exactly once or anything.
>
> Many thanks,
> Chris Stevens
> Head of Research & Development
> +44 7565 034 595
>
>
> On Wed, 19 Feb 2020 at 12:19, Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Chris,
>
>     [forwarding the private discussion to the mailing list again]
>
>     first of all, are you sure that your Sensor class is either a top-level
>     class or a static inner class. Because it seems there is way more stuff
>     in it (maybe included by accident transitively?). Such as:
>
>     org.apache.logging.log4j.core.layout.AbstractCsvLayout
>            > Serialization trace:
>            > classes (sun.misc.Launcher$AppClassLoader)
>            > classloader (java.security.ProtectionDomain)
>            > cachedPDs (javax.security.auth.SubjectDomainCombiner)
>            > combiner (java.security.AccessControlContext)
>            > acc (sun.security.ssl.SSLSocketImpl)
>            > connection (org.postgresql.core.PGStream)
>            > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
>            > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
>            > commitQuery (org.postgresql.jdbc.PgConnection)
>            > connection (org.postgresql.jdbc.PgResultSet)
>            > val$rs
>
>     When declaring state you can use
>     `org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class<T>)` to
>
>     check if your state is a POJO type.
>
>     Regards,
>     Timo
>
>
>     -------- Forwarded Message --------
>     Subject:        Re: Updating ValueState not working in hosted Kinesis
>     Date:   Wed, 19 Feb 2020 12:02:16 +0000
>     From:   Chris Stevens <[hidden email]
>     <mailto:[hidden email]>>
>     To:     Timo Walther <[hidden email] <mailto:[hidden email]>>
>
>
>
>     Hi Timo,
>
>     Thanks for your reply. This makes sense to me, how do I treat something
>     as a POJO instead of a generic serialized BB type? Sorry relatively new
>     to Java and Flink.
>
>     This is my full class def:
>
>     package sensingfeeling.models;
>     import java.io.Serializable;
>
>     public class Sensor implements Serializable {
>
>            private static final long serialVersionUID =
>     8582433437601788991L;
>            public String sensorId;
>            public String companyId;
>            public String label;
>            // public Date createdAt;
>            // public Date updatedAt;
>            public Integer uncomfortableFaceLimit;
>            public Boolean online;
>            public String capabilityId;
>            // public Date lastOnlineAt;
>            // public Date lastOfflineAt;
>            public Integer onlineVersionNumber;
>            public int status;
>            @Override
>            public String toString(){
>                return this.sensorId + " - " + this.label;
>            }
>     }
>
>     Super simple really.
>
>     I'm not trying to upgrade anything as far as I know. Just making an
>     operator state aware.
>
>     Many thanks,
>     Chris Stevens
>     Head of Research & Development
>     +44 7565 034 595
>
>
>     On Wed, 19 Feb 2020 at 11:55, Timo Walther <[hidden email]
>     <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>
>           Hi Chris,
>
>           it seems there are field serialized into state that actually don't
>           belong there. You should aim to treat Sensor as a POJO instead
>     of a
>           Kryo
>           generic serialized black-box type.
>
>           Furthermore, it seems that field such as
>           "org.apache.logging.log4j.core.layout.AbstractCsvLayout"
>     should not be
>           state. Is there a "transient" keyword missing?
>
>           Are you trying to upgrade your job or the Flink version?
>
>           Regards,
>           Timo
>
>
>
>           On 18.02.20 18:59, Chris Stevens wrote:
>            > Hi there,
>            >
>            > I'm trying to update state in one of my applications hosted in
>           Kinesis
>            > Data Analytics.
>            >
>            > private transient ValueState<Sensor> sensorState;
>            > using sensorState.update(sensor);
>            >
>            > Get error:
>            >
>            > An error occurred: org.apache.flink.util.FlinkRuntimeException:
>           Error
>            > while adding data to RocksDB
>            > at
>            >
>
>     org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
>            > at
>            >
>
>     org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
>            > at
>            >
>
>     sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97)
>            > at
>            >
>
>     sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48)
>            > at
>            >
>
>     org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460)
>            > at
>            >
>
>     org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
>            > at
>            >
>
>     org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
>            > at
>            >
>
>     org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
>            > at
>            >
>
>     org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
>            > at
>            > org.apache.flink.streaming.runtime.io
>     <http://org.apache.flink.streaming.runtime.io>
>
>     <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>            > at
>            > org.apache.flink.streaming.runtime.io
>     <http://org.apache.flink.streaming.runtime.io>
>
>     <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>            > at
>            >
>
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
>            > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
>            > at java.lang.Thread.run(Thread.java:748)
>            > Caused by: com.esotericsoftware.kryo.KryoException:
>            > java.lang.IllegalArgumentException: Unable to create serializer
>            > "com.esotericsoftware.kryo.serializers.FieldSerializer" for
>     class:
>            > org.apache.logging.log4j.core.layout.AbstractCsvLayout
>            > Serialization trace:
>            > classes (sun.misc.Launcher$AppClassLoader)
>            > classloader (java.security.ProtectionDomain)
>            > cachedPDs (javax.security.auth.SubjectDomainCombiner)
>            > combiner (java.security.AccessControlContext)
>            > acc (sun.security.ssl.SSLSocketImpl)
>            > connection (org.postgresql.core.PGStream)
>            > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
>            > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
>            > commitQuery (org.postgresql.jdbc.PgConnection)
>            > connection (org.postgresql.jdbc.PgResultSet)
>            > val$rs
>            >
>
>     (sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at
>     com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at
>     com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>            > at
>     com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>            > at
>            >
>
>     org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
>            > at
>            >
>
>     org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362)
>            > at
>            >
>
>     org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
>            > at
>            >
>
>     org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
>            > at
>            >
>
>     org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
>            > at
>            >
>
>     org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
>            > at
>            >
>
>     org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
>            > ... 20 more
>            > Caused by: java.lang.IllegalArgumentException: Unable to create
>            > serializer
>           "com.esotericsoftware.kryo.serializers.FieldSerializer" for
>            > class: org.apache.logging.log4j.core.layout.AbstractCsvLayout
>            > at
>            >
>
>     com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48)
>            > at
>            >
>
>     com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26)
>            > at
>     com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351)
>            > at
>     com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58)
>            > at
>     com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344)
>            > at
>            >
>
>     com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
>            > at
>     com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461)
>            > at
>     com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
>            > at
>            >
>
>     com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
>            > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232)
>            > at
>     com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
>            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>            > ... 62 more
>            > Caused by: java.lang.reflect.InvocationTargetException
>            > at
>     sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown
>           Source)
>            > at
>            >
>
>     sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>            > at
>     java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>            > at
>            >
>
>     com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35)
>            > ... 78 more
>            > Caused by: java.lang.NoClassDefFoundError:
>            > Lorg/apache/commons/csv/CSVFormat;
>            > at java.lang.Class.getDeclaredFields0(Native Method)
>            > at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
>            > at java.lang.Class.getDeclaredFields(Class.java:1916)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156)
>            > at
>            >
>
>     com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133)
>            > ... 82 more
>            > Caused by: java.lang.ClassNotFoundException:
>            > org.apache.commons.csv.CSVFormat
>            > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>            > at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>            > at
>     sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
>            > at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>            > ... 88 more
>            >
>            > Any help would be great. I tried manually including
>     CSVFormat from
>            > apache commons but didn't change anything.
>            >
>            > Many thanks,
>            > Chris Stevens
>            > Head of Research & Development
>            > +44 7565 034 595
>