Hi Chris,
[forwarding the private discussion to the mailing list again] first of all, are you sure that your Sensor class is either a top-level class or a static inner class. Because it seems there is way more stuff in it (maybe included by accident transitively?). Such as: org.apache.logging.log4j.core.layout.AbstractCsvLayout > Serialization trace: > classes (sun.misc.Launcher$AppClassLoader) > classloader (java.security.ProtectionDomain) > cachedPDs (javax.security.auth.SubjectDomainCombiner) > combiner (java.security.AccessControlContext) > acc (sun.security.ssl.SSLSocketImpl) > connection (org.postgresql.core.PGStream) > pgStream (org.postgresql.core.v3.QueryExecutorImpl) > transferModeRegistry (org.postgresql.core.v3.SimpleQuery) > commitQuery (org.postgresql.jdbc.PgConnection) > connection (org.postgresql.jdbc.PgResultSet) > val$rs When declaring state you can use `org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class<T>)` to check if your state is a POJO type. Regards, Timo -------- Forwarded Message -------- Subject: Re: Updating ValueState not working in hosted Kinesis Date: Wed, 19 Feb 2020 12:02:16 +0000 From: Chris Stevens <[hidden email]> To: Timo Walther <[hidden email]> Hi Timo, Thanks for your reply. This makes sense to me, how do I treat something as a POJO instead of a generic serialized BB type? Sorry relatively new to Java and Flink. This is my full class def: package sensingfeeling.models; import java.io.Serializable; public class Sensor implements Serializable { private static final long serialVersionUID = 8582433437601788991L; public String sensorId; public String companyId; public String label; // public Date createdAt; // public Date updatedAt; public Integer uncomfortableFaceLimit; public Boolean online; public String capabilityId; // public Date lastOnlineAt; // public Date lastOfflineAt; public Integer onlineVersionNumber; public int status; @Override public String toString(){ return this.sensorId + " - " + this.label; } } Super simple really. I'm not trying to upgrade anything as far as I know. Just making an operator state aware. Many thanks, Chris Stevens Head of Research & Development +44 7565 034 595 On Wed, 19 Feb 2020 at 11:55, Timo Walther <[hidden email] <mailto:[hidden email]>> wrote: Hi Chris, it seems there are field serialized into state that actually don't belong there. You should aim to treat Sensor as a POJO instead of a Kryo generic serialized black-box type. Furthermore, it seems that field such as "org.apache.logging.log4j.core.layout.AbstractCsvLayout" should not be state. Is there a "transient" keyword missing? Are you trying to upgrade your job or the Flink version? Regards, Timo On 18.02.20 18:59, Chris Stevens wrote: > Hi there, > > I'm trying to update state in one of my applications hosted in Kinesis > Data Analytics. > > private transient ValueState<Sensor> sensorState; > using sensorState.update(sensor); > > Get error: > > An error occurred: org.apache.flink.util.FlinkRuntimeException: Error > while adding data to RocksDB > at > org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108) > at > org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50) > at > sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97) > at > sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48) > at > org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460) > at > org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775) > at > org.apache.flink.streaming.runtime.io <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > at > org.apache.flink.streaming.runtime.io <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:184) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714) > at java.lang.Thread.run(Thread.java:748) > Caused by: com.esotericsoftware.kryo.KryoException: > java.lang.IllegalArgumentException: Unable to create serializer > "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: > org.apache.logging.log4j.core.layout.AbstractCsvLayout > Serialization trace: > classes (sun.misc.Launcher$AppClassLoader) > classloader (java.security.ProtectionDomain) > cachedPDs (javax.security.auth.SubjectDomainCombiner) > combiner (java.security.AccessControlContext) > acc (sun.security.ssl.SSLSocketImpl) > connection (org.postgresql.core.PGStream) > pgStream (org.postgresql.core.v3.QueryExecutorImpl) > transferModeRegistry (org.postgresql.core.v3.SimpleQuery) > commitQuery (org.postgresql.jdbc.PgConnection) > connection (org.postgresql.jdbc.PgResultSet) > val$rs > (sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > at > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88) > at > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362) > at > org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142) > at > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158) > at > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178) > at > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167) > at > org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106) > ... 20 more > Caused by: java.lang.IllegalArgumentException: Unable to create > serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for > class: org.apache.logging.log4j.core.layout.AbstractCsvLayout > at > com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48) > at > com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26) > at com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351) > at com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58) > at com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56) > at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461) > at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488) > at > com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239) > at > com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > ... 62 more > Caused by: java.lang.reflect.InvocationTargetException > at sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown Source) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35) > ... 78 more > Caused by: java.lang.NoClassDefFoundError: > Lorg/apache/commons/csv/CSVFormat; > at java.lang.Class.getDeclaredFields0(Native Method) > at java.lang.Class.privateGetDeclaredFields(Class.java:2583) > at java.lang.Class.getDeclaredFields(Class.java:1916) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133) > ... 82 more > Caused by: java.lang.ClassNotFoundException: > org.apache.commons.csv.CSVFormat > at java.net.URLClassLoader.findClass(URLClassLoader.java:382) > at java.lang.ClassLoader.loadClass(ClassLoader.java:418) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) > at java.lang.ClassLoader.loadClass(ClassLoader.java:351) > ... 88 more > > Any help would be great. I tried manually including CSVFormat from > apache commons but didn't change anything. > > Many thanks, > Chris Stevens > Head of Research & Development > +44 7565 034 595 |
Thanks again Timo, I hope I replied correctly this time. As per my previous message the Sensor class is a very simple POJO type (I think). When the serialization trace talks about PGSql stuff it makes me think that something from my operator is being included in serialization. Not just the Sensor object itself which I am explicitly including in state. package sensingfeeling.functions.mapping; public final class ArbJoinFunction extends RichJoinFunction<TypeB, TypeC>, TypeA> { private static final long serialVersionUID = 8582433437601788991L; private transient ValueState<Sensor> sensorState; @Override public TypeA join(TypeB frame, TypeC activeMotionPaths) throws JsonProcessingException { Sensor sensor = sensorState.value(); if (sensor == null) { LOG.debug("Sensor was not in state, getting sensor: " + frame.sensorId); sensor = getSensor(frame); sensorState.update(sensor); } return new TypeA(); } @Override public void open(Configuration config) { LOG.debug("Sensor open method called", config); StateTtlConfig sensorTtlConfig = StateTtlConfig.newBuilder(Time.minutes(1)) .cleanupInBackground() .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build(); ValueStateDescriptor<Sensor> sensorStateDescriptor = new ValueStateDescriptor<>( "sensor", TypeInformation.of(new TypeHint<Sensor>(){})); // sensorStateDescriptor.enableTimeToLive(sensorTtlConfig); sensorState = getRuntimeContext().getState(sensorStateDescriptor); } private Sensor getSensor(TypeB frame) throws Exception { Class.forName("org.postgresql.Driver"); try (Connection con = DriverManager.getConnection(dbURL, dbUser, dbPassword); Statement st = con.createStatement(); ResultSet rs = st.executeQuery("SELECT * from sensor where sensorid = '" + frame.sensorId + "'" )) { if(rs.next()) { Sensor sensor = new Sensor() {}; LOG.debug("Got sensor" + sensor); return sensor; } } catch (SQLException ex) { LOG.error("Error when connection postgres", ex); throw ex; } return null; } } Above is a cut down version of my operator, I'm guessing it is the ResultSet rs that is getting serialized. How do I prevent this undesirable behaviour? I'm quite happy for my solution to serialize only what I explicitly tell it to, I don't need exactly once or anything. Many thanks, Chris Stevens Head of Research & Development +44 7565 034 595 On Wed, 19 Feb 2020 at 12:19, Timo Walther <[hidden email]> wrote: Hi Chris, |
Hi Chris,
your observation is right. By `new Sensor() {}` instead of just `new Sensor()` you are creating an anonymous non-static class that references the outer method and class. If you check your logs, there might be also a reason why your POJO is used as a generic type. I assume because you declared `implements Serializable` which forces Flink to think "user wants to deal with serialization of the POJO himself". Regards, Timo On 19.02.20 14:24, Chris Stevens wrote: > Thanks again Timo, I hope I replied correctly this time. > > As per my previous message the Sensor class is a very simple POJO type > (I think). > > When the serialization trace talks about PGSql stuff it makes me think > that something from my operator is being included in serialization. Not > just the Sensor object itself which I am explicitly including in state. > > packagesensingfeeling.functions.mapping; > > publicfinalclassArbJoinFunctionextendsRichJoinFunction<TypeB, TypeC>, > TypeA> { > > privatestaticfinallongserialVersionUID= 8582433437601788991L; > > privatetransientValueState<Sensor> sensorState; > > @Override > publicTypeAjoin(TypeBframe, TypeCactiveMotionPaths) > throwsJsonProcessingException{ > > Sensorsensor= sensorState.value(); > if(sensor == null) { > LOG.debug("Sensor was not in state, getting sensor: "+ frame.sensorId); > sensor = getSensor(frame); > sensorState.update(sensor); > } > > returnnewTypeA(); > } > > @Override > publicvoidopen(Configurationconfig) { > LOG.debug("Sensor open method called", config); > > StateTtlConfigsensorTtlConfig= StateTtlConfig.newBuilder(Time.minutes(1)) > .cleanupInBackground() > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build(); > > ValueStateDescriptor<Sensor> sensorStateDescriptor= > newValueStateDescriptor<>( "sensor", > TypeInformation.of(newTypeHint<Sensor>(){})); > // sensorStateDescriptor.enableTimeToLive(sensorTtlConfig); > sensorState = getRuntimeContext().getState(sensorStateDescriptor); > > } > > privateSensorgetSensor(TypeBframe) throwsException{ > > Class.forName("org.postgresql.Driver"); > try(Connectioncon= DriverManager.getConnection(dbURL, dbUser, dbPassword); > Statementst= con.createStatement(); > ResultSetrs= st.executeQuery("SELECT * from sensor where sensorid = '"+ > frame.sensorId+ "'")) { > > if(rs.next()) { > Sensorsensor= newSensor() {}; > > LOG.debug("Got sensor"+ sensor); > > returnsensor; > } > > } catch(SQLExceptionex) { > LOG.error("Error when connection postgres", ex); > throwex; > } > > returnnull; > } > > } > > Above is a cut down version of my operator, I'm guessing it is the > ResultSet rs that is getting serialized. How do I prevent this > undesirable behaviour? I'm quite happy for my solution to serialize only > what I explicitly tell it to, I don't need exactly once or anything. > > Many thanks, > Chris Stevens > Head of Research & Development > +44 7565 034 595 > > > On Wed, 19 Feb 2020 at 12:19, Timo Walther <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi Chris, > > [forwarding the private discussion to the mailing list again] > > first of all, are you sure that your Sensor class is either a top-level > class or a static inner class. Because it seems there is way more stuff > in it (maybe included by accident transitively?). Such as: > > org.apache.logging.log4j.core.layout.AbstractCsvLayout > > Serialization trace: > > classes (sun.misc.Launcher$AppClassLoader) > > classloader (java.security.ProtectionDomain) > > cachedPDs (javax.security.auth.SubjectDomainCombiner) > > combiner (java.security.AccessControlContext) > > acc (sun.security.ssl.SSLSocketImpl) > > connection (org.postgresql.core.PGStream) > > pgStream (org.postgresql.core.v3.QueryExecutorImpl) > > transferModeRegistry (org.postgresql.core.v3.SimpleQuery) > > commitQuery (org.postgresql.jdbc.PgConnection) > > connection (org.postgresql.jdbc.PgResultSet) > > val$rs > > When declaring state you can use > `org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class<T>)` to > > check if your state is a POJO type. > > Regards, > Timo > > > -------- Forwarded Message -------- > Subject: Re: Updating ValueState not working in hosted Kinesis > Date: Wed, 19 Feb 2020 12:02:16 +0000 > From: Chris Stevens <[hidden email] > <mailto:[hidden email]>> > To: Timo Walther <[hidden email] <mailto:[hidden email]>> > > > > Hi Timo, > > Thanks for your reply. This makes sense to me, how do I treat something > as a POJO instead of a generic serialized BB type? Sorry relatively new > to Java and Flink. > > This is my full class def: > > package sensingfeeling.models; > import java.io.Serializable; > > public class Sensor implements Serializable { > > private static final long serialVersionUID = > 8582433437601788991L; > public String sensorId; > public String companyId; > public String label; > // public Date createdAt; > // public Date updatedAt; > public Integer uncomfortableFaceLimit; > public Boolean online; > public String capabilityId; > // public Date lastOnlineAt; > // public Date lastOfflineAt; > public Integer onlineVersionNumber; > public int status; > @Override > public String toString(){ > return this.sensorId + " - " + this.label; > } > } > > Super simple really. > > I'm not trying to upgrade anything as far as I know. Just making an > operator state aware. > > Many thanks, > Chris Stevens > Head of Research & Development > +44 7565 034 595 > > > On Wed, 19 Feb 2020 at 11:55, Timo Walther <[hidden email] > <mailto:[hidden email]> > <mailto:[hidden email] <mailto:[hidden email]>>> wrote: > > Hi Chris, > > it seems there are field serialized into state that actually don't > belong there. You should aim to treat Sensor as a POJO instead > of a > Kryo > generic serialized black-box type. > > Furthermore, it seems that field such as > "org.apache.logging.log4j.core.layout.AbstractCsvLayout" > should not be > state. Is there a "transient" keyword missing? > > Are you trying to upgrade your job or the Flink version? > > Regards, > Timo > > > > On 18.02.20 18:59, Chris Stevens wrote: > > Hi there, > > > > I'm trying to update state in one of my applications hosted in > Kinesis > > Data Analytics. > > > > private transient ValueState<Sensor> sensorState; > > using sensorState.update(sensor); > > > > Get error: > > > > An error occurred: org.apache.flink.util.FlinkRuntimeException: > Error > > while adding data to RocksDB > > at > > > > org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108) > > at > > > > org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50) > > at > > > > sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97) > > at > > > > sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48) > > at > > > > org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460) > > at > > > > org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777) > > at > > > > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) > > at > > > > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) > > at > > > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546) > > at > > > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454) > > at > > > > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255) > > at > > > > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) > > at > > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775) > > at > > org.apache.flink.streaming.runtime.io > <http://org.apache.flink.streaming.runtime.io> > > <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) > > at > > > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > > at > > > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > > at > > org.apache.flink.streaming.runtime.io > <http://org.apache.flink.streaming.runtime.io> > > <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:184) > > at > > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > > at > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: com.esotericsoftware.kryo.KryoException: > > java.lang.IllegalArgumentException: Unable to create serializer > > "com.esotericsoftware.kryo.serializers.FieldSerializer" for > class: > > org.apache.logging.log4j.core.layout.AbstractCsvLayout > > Serialization trace: > > classes (sun.misc.Launcher$AppClassLoader) > > classloader (java.security.ProtectionDomain) > > cachedPDs (javax.security.auth.SubjectDomainCombiner) > > combiner (java.security.AccessControlContext) > > acc (sun.security.ssl.SSLSocketImpl) > > connection (org.postgresql.core.PGStream) > > pgStream (org.postgresql.core.v3.QueryExecutorImpl) > > transferModeRegistry (org.postgresql.core.v3.SimpleQuery) > > commitQuery (org.postgresql.jdbc.PgConnection) > > connection (org.postgresql.jdbc.PgResultSet) > > val$rs > > > > (sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4) > > at > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) > > at > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > at > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > > at > > > > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88) > > at > > > > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > at > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > at > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at > com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) > > at > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68) > > at > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > at > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > at > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > at > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > at > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > at > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > at > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > > at > > > > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305) > > at > > > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362) > > at > > > > org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142) > > at > > > > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158) > > at > > > > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178) > > at > > > > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167) > > at > > > > org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106) > > ... 20 more > > Caused by: java.lang.IllegalArgumentException: Unable to create > > serializer > "com.esotericsoftware.kryo.serializers.FieldSerializer" for > > class: org.apache.logging.log4j.core.layout.AbstractCsvLayout > > at > > > > com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48) > > at > > > > com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26) > > at > com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351) > > at > com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58) > > at > com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344) > > at > > > > com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56) > > at > com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461) > > at > com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52) > > at > > > > com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) > > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488) > > at > > > > com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239) > > at > > > > com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232) > > at > com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) > > at > > > > com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75) > > at > > > > com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > at > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > ... 62 more > > Caused by: java.lang.reflect.InvocationTargetException > > at > sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown > Source) > > at > > > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > > at > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > > at > > > > com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35) > > ... 78 more > > Caused by: java.lang.NoClassDefFoundError: > > Lorg/apache/commons/csv/CSVFormat; > > at java.lang.Class.getDeclaredFields0(Native Method) > > at java.lang.Class.privateGetDeclaredFields(Class.java:2583) > > at java.lang.Class.getDeclaredFields(Class.java:1916) > > at > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193) > > at > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156) > > at > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133) > > ... 82 more > > Caused by: java.lang.ClassNotFoundException: > > org.apache.commons.csv.CSVFormat > > at java.net.URLClassLoader.findClass(URLClassLoader.java:382) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:418) > > at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:351) > > ... 88 more > > > > Any help would be great. I tried manually including > CSVFormat from > > apache commons but didn't change anything. > > > > Many thanks, > > Chris Stevens > > Head of Research & Development > > +44 7565 034 595 > |
Thanks so much Timo, got it working now. All down to my lack of Java skill. Many thanks, Chris Stevens Head of Research & Development +44 7565 034 595 On Wed, 19 Feb 2020 at 15:12, Timo Walther <[hidden email]> wrote: Hi Chris, |
Free forum by Nabble | Edit this page |