Hi,
I am using the flink training exercise TaxiRide [1] to execute a stream count of events. On the cluster and on my local machine I am receiving the message that joda.Time cannot be serialized "class org.joda.time.LocalDateTime is not a valid POJO type". However it is starting the job on the cluster, but not in my local machine. So I searched in the internet and it is requested to register the jodaTime class on the environment[2]. I did like this: env.getConfig().registerTypeWithKryoSerializer(DateTime.class, AvroKryoSerializerUtils.JodaDateTimeSerializer.class); env.getConfig().registerTypeWithKryoSerializer(LocalDate.class, AvroKryoSerializerUtils.JodaLocalDateSerializer.class); env.getConfig().registerTypeWithKryoSerializer(LocalTime.class, AvroKryoSerializerUtils.JodaLocalTimeSerializer.class); and I added the joda and avro dependency on the pom.xml: <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro</artifactId> <version>${project.version}</version> </dependency> I also tested using addDefaultKryoSerializer but I got the same error. For some reason, it is still not working. Does anyone have some hint of what could be happening? Thanks! Felipe [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com |
Hi Felipe, I tested the basic RideCleansingExercise[1] jobs that uses the TaxiRide type locally and it seems to be able to startup normally. Could you also share your current executing code and the full stacktrace of the exception ? Best, Yun
|
Hi Yun, it says an INFO "class org.joda.time.DateTime cannot be used
as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance", however I cannot submit my job. It is strange because I can start and run it on Intellij, but not on the standalone cluster in my machine. 2020-06-13 10:50:56,051 INFO org.apache.flink.core.fs.FileSystem - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available. 2020-06-13 10:50:56,143 INFO org.apache.flink.runtime.security.modules.HadoopModuleFactory - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath. 2020-06-13 10:50:56,164 INFO org.apache.flink.runtime.security.modules.JaasModule - Jaas file will be created as /tmp/jaas-837993701496785981.conf. 2020-06-13 10:50:56,169 INFO org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath. 2020-06-13 10:50:56,169 WARN org.apache.flink.runtime.security.SecurityUtils - Unable to install incompatible security context factory org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory 2020-06-13 10:50:56,171 INFO org.apache.flink.client.cli.CliFrontend - Running 'run' command. 2020-06-13 10:50:56,242 INFO org.apache.flink.client.cli.CliFrontend - Building program from JAR file 2020-06-13 10:50:57,084 INFO org.apache.flink.client.ClientUtils - Starting program (detached: false) 2020-06-13 10:50:59,021 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.DateTime does not contain a getter for field iMillis 2020-06-13 10:50:59,021 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.DateTime does not contain a setter for field iMillis 2020-06-13 10:50:59,021 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.joda.time.DateTime cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 2020-06-13 10:50:59,028 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.DateTime does not contain a getter for field iMillis 2020-06-13 10:50:59,028 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.DateTime does not contain a setter for field iMillis 2020-06-13 10:50:59,028 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.joda.time.DateTime cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 2020-06-13 10:53:19,508 WARN org.apache.flink.util.ExecutorUtils - ExecutorService did not terminate in time. Shutting it down now. 2020-06-13 10:53:19,510 ERROR org.apache.flink.client.cli.CliFrontend - Error while running the command. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:148) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:276) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1764) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:106) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:72) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643) at org.apache.flink.streaming.examples.aggregate.TaxiRideCountPreAggregate.main(TaxiRideCountPreAggregate.java:136) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) ... 8 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1759) ... 17 more Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not upload job files. at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:169) at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119) at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not upload job files. at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:80) at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:167) ... 7 more Caused by: java.io.IOException: Could not connect to BlobServer at address localhost/192.168.56.1:35193 at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100) at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:167) at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:76) ... 8 more Caused by: java.net.ConnectException: Connection timed out (Connection timed out) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:607) at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95) ... 10 more ] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ... 4 more -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Sat, Jun 13, 2020 at 5:08 AM Yun Gao <[hidden email]> wrote: > > Hi Felipe, > > I tested the basic RideCleansingExercise[1] jobs that uses the TaxiRide type locally and it seems to be able to startup normally. > > Could you also share your current executing code and the full stacktrace of the exception ? > > Best, > Yun > > [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/basics/RideCleansingExercise.java > > ------------------Original Mail ------------------ > Sender:Felipe Gutierrez <[hidden email]> > Send Date:Fri Jun 12 23:11:28 2020 > Recipients:user <[hidden email]> > Subject:How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example? >> >> Hi, >> >> I am using the flink training exercise TaxiRide [1] to execute a >> stream count of events. On the cluster and on my local machine I am >> receiving the message that joda.Time cannot be serialized "class >> org.joda.time.LocalDateTime is not a valid POJO type". However it is >> starting the job on the cluster, but not in my local machine. So I >> searched in the internet and it is requested to register the jodaTime >> class on the environment[2]. I did like this: >> >> env.getConfig().registerTypeWithKryoSerializer(DateTime.class, >> AvroKryoSerializerUtils.JodaDateTimeSerializer.class); >> env.getConfig().registerTypeWithKryoSerializer(LocalDate.class, >> AvroKryoSerializerUtils.JodaLocalDateSerializer.class); >> env.getConfig().registerTypeWithKryoSerializer(LocalTime.class, >> AvroKryoSerializerUtils.JodaLocalTimeSerializer.class); >> >> and I added the joda and avro dependency on the pom.xml: >> >> <dependency> >> <groupId>joda-time</groupId> >> <artifactId>joda-time</artifactId> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-avro</artifactId> >> <version>${project.version}</version> >> </dependency> >> >> I also tested using addDefaultKryoSerializer but I got the same error. >> For some reason, it is still not working. Does anyone have some hint >> of what could be happening? >> >> Thanks! Felipe >> [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java >> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html >> >> -- >> -- Felipe Gutierrez >> -- skype: felipe.o.gutierrez >> -- https://felipeogutierrez.blogspot.com |
Hi, I tried to change the joda.time maven version to be the same of
the flink-training example and I am getting this error on IntelliJ. Maybe it is more precislyL 2020-06-13 12:04:27,333 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 2020-06-13 12:04:27,333 INFO org.apache.flink.runtime.taskmanager.Task [] - reducer -> flat-output -> Sink: sink (4/4) (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING. 2020-06-13 12:04:27,337 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - reducer -> flat-output -> Sink: sink (4/4) (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING. 2020-06-13 12:04:27,360 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 2020-06-13 12:04:27,360 INFO org.apache.flink.runtime.taskmanager.Task [] - reducer -> flat-output -> Sink: sink (3/4) (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING. 2020-06-13 12:04:27,362 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - reducer -> flat-output -> Sink: sink (3/4) (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING. 2020-06-13 12:04:27,376 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - Initializing heap keyed state backend with stream factory. 2020-06-13 12:04:27,381 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - Initializing heap keyed state backend with stream factory. 2020-06-13 12:04:27,381 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - Initializing heap keyed state backend with stream factory. 2020-06-13 12:04:27,389 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - Initializing heap keyed state backend with stream factory. 2020-06-13 12:04:27,511 WARN org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Falling back to default Kryo serializer because Chill serializer couldn't be found. java.lang.reflect.InvocationTargetException: null at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:436) [classes/:?] at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:454) [classes/:?] at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:289) [classes/:?] at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175) [classes/:?] at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) [classes/:?] at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) [classes/:?] at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71) [classes/:?] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:117) [classes/:?] at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) [classes/:?] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) [classes/:?] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) [classes/:?] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) [classes/:?] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [classes/:?] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [classes/:?] at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) [classes/:?] at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) [classes/:?] at org.apache.flink.streaming.examples.aggregate.util.TaxiRideSource.generateTaxiRideArray(TaxiRideSource.java:114) [classes/:?] at org.apache.flink.streaming.examples.aggregate.util.TaxiRideSource.run(TaxiRideSource.java:96) [classes/:?] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [classes/:?] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [classes/:?] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208) [classes/:?] Caused by: com.esotericsoftware.kryo.KryoException: Unable to resolve type variable: A at com.esotericsoftware.kryo.util.GenericsUtil.resolveTypeVariable(GenericsUtil.java:114) ~[kryo-5.0.0-RC1.jar:?] at com.esotericsoftware.kryo.util.GenericsUtil.resolveTypeVariable(GenericsUtil.java:86) ~[kryo-5.0.0-RC1.jar:?] at com.esotericsoftware.kryo.util.GenericsUtil.resolveType(GenericsUtil.java:41) ~[kryo-5.0.0-RC1.jar:?] at com.esotericsoftware.kryo.util.Generics$GenericType.initialize(Generics.java:263) ~[kryo-5.0.0-RC1.jar:?] at com.esotericsoftware.kryo.util.Generics$GenericType.<init>(Generics.java:228) ~[kryo-5.0.0-RC1.jar:?] at com.esotericsoftware.kryo.util.Generics$GenericType.initialize(Generics.java:242) ~[kryo-5.0.0-RC1.jar:?] at com.esotericsoftware.kryo.util.Generics$GenericType.<init>(Generics.java:228) ~[kryo-5.0.0-RC1.jar:?] at com.esotericsoftware.kryo.serializers.CachedFields.addField(CachedFields.java:139) ~[kryo-5.0.0-RC1.jar:?] at com.esotericsoftware.kryo.serializers.CachedFields.rebuild(CachedFields.java:99) ~[kryo-5.0.0-RC1.jar:?] at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:82) ~[kryo-5.0.0-RC1.jar:?] at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:68) ~[kryo-5.0.0-RC1.jar:?] at org.apache.flink.runtime.types.ScalaCollectionsRegistrar.useField$1(FlinkScalaKryoInstantiator.scala:93) ~[classes/:?] at org.apache.flink.runtime.types.ScalaCollectionsRegistrar.apply(FlinkScalaKryoInstantiator.scala:98) ~[classes/:?] at org.apache.flink.runtime.types.AllScalaRegistrar.apply(FlinkScalaKryoInstantiator.scala:172) ~[classes/:?] at org.apache.flink.runtime.types.FlinkScalaKryoInstantiator.newKryo(FlinkScalaKryoInstantiator.scala:84) ~[classes/:?] ... 25 more -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Sat, Jun 13, 2020 at 10:57 AM Felipe Gutierrez <[hidden email]> wrote: > > Hi Yun, it says an INFO "class org.joda.time.DateTime cannot be used > as a POJO type because not all fields are valid POJO fields, and must > be processed as GenericType. Please read the Flink documentation on > "Data Types & Serialization" for details of the effect on > performance", however I cannot submit my job. It is strange because I > can start and run it on Intellij, but not on the standalone cluster in > my machine. > > 2020-06-13 10:50:56,051 INFO org.apache.flink.core.fs.FileSystem > - Hadoop is not in the classpath/dependencies. > The extended set of supported File Systems via Hadoop is not > available. > 2020-06-13 10:50:56,143 INFO > org.apache.flink.runtime.security.modules.HadoopModuleFactory - > Cannot create Hadoop Security Module because Hadoop cannot be found in > the Classpath. > 2020-06-13 10:50:56,164 INFO > org.apache.flink.runtime.security.modules.JaasModule - Jaas > file will be created as /tmp/jaas-837993701496785981.conf. > 2020-06-13 10:50:56,169 INFO > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory > - Cannot install HadoopSecurityContext because Hadoop cannot be found > in the Classpath. > 2020-06-13 10:50:56,169 WARN > org.apache.flink.runtime.security.SecurityUtils - Unable > to install incompatible security context factory > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory > 2020-06-13 10:50:56,171 INFO org.apache.flink.client.cli.CliFrontend > - Running 'run' command. > 2020-06-13 10:50:56,242 INFO org.apache.flink.client.cli.CliFrontend > - Building program from JAR file > 2020-06-13 10:50:57,084 INFO org.apache.flink.client.ClientUtils > - Starting program (detached: false) > 2020-06-13 10:50:59,021 INFO > org.apache.flink.api.java.typeutils.TypeExtractor - class > org.joda.time.DateTime does not contain a getter for field iMillis > 2020-06-13 10:50:59,021 INFO > org.apache.flink.api.java.typeutils.TypeExtractor - class > org.joda.time.DateTime does not contain a setter for field iMillis > 2020-06-13 10:50:59,021 INFO > org.apache.flink.api.java.typeutils.TypeExtractor - Class > class org.joda.time.DateTime cannot be used as a POJO type because not > all fields are valid POJO fields, and must be processed as > GenericType. Please read the Flink documentation on "Data Types & > Serialization" for details of the effect on performance. > 2020-06-13 10:50:59,028 INFO > org.apache.flink.api.java.typeutils.TypeExtractor - class > org.joda.time.DateTime does not contain a getter for field iMillis > 2020-06-13 10:50:59,028 INFO > org.apache.flink.api.java.typeutils.TypeExtractor - class > org.joda.time.DateTime does not contain a setter for field iMillis > 2020-06-13 10:50:59,028 INFO > org.apache.flink.api.java.typeutils.TypeExtractor - Class > class org.joda.time.DateTime cannot be used as a POJO type because not > all fields are valid POJO fields, and must be processed as > GenericType. Please read the Flink documentation on "Data Types & > Serialization" for details of the effect on performance. > 2020-06-13 10:53:19,508 WARN org.apache.flink.util.ExecutorUtils > - ExecutorService did not terminate in time. > Shutting it down now. > 2020-06-13 10:53:19,510 ERROR org.apache.flink.client.cli.CliFrontend > - Error while running the command. > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobSubmissionException: Failed to > submit JobGraph. > at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:148) > at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) > at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893) > at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) > at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) > Caused by: java.lang.RuntimeException: > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobSubmissionException: Failed to > submit JobGraph. > at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:276) > at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1764) > at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:106) > at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:72) > at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643) > at org.apache.flink.streaming.examples.aggregate.TaxiRideCountPreAggregate.main(TaxiRideCountPreAggregate.java:136) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > ... 8 more > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobSubmissionException: Failed to > submit JobGraph. > at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1759) > ... 17 more > Caused by: org.apache.flink.runtime.client.JobSubmissionException: > Failed to submit JobGraph. > at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359) > at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) > at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866) > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) > at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274) > at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) > at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) > at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.rest.util.RestClientException: > [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not > upload job files. > at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:169) > at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119) > at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084) > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkException: Could not upload job files. > at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:80) > at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:167) > ... 7 more > Caused by: java.io.IOException: Could not connect to BlobServer at > address localhost/192.168.56.1:35193 > at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100) > at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:167) > at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:76) > ... 8 more > Caused by: java.net.ConnectException: Connection timed out (Connection > timed out) > at java.net.PlainSocketImpl.socketConnect(Native Method) > at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) > at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) > at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) > at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) > at java.net.Socket.connect(Socket.java:607) > at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95) > ... 10 more > ] > at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) > at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) > at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) > at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) > ... 4 more > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com > > On Sat, Jun 13, 2020 at 5:08 AM Yun Gao <[hidden email]> wrote: > > > > Hi Felipe, > > > > I tested the basic RideCleansingExercise[1] jobs that uses the TaxiRide type locally and it seems to be able to startup normally. > > > > Could you also share your current executing code and the full stacktrace of the exception ? > > > > Best, > > Yun > > > > [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/basics/RideCleansingExercise.java > > > > ------------------Original Mail ------------------ > > Sender:Felipe Gutierrez <[hidden email]> > > Send Date:Fri Jun 12 23:11:28 2020 > > Recipients:user <[hidden email]> > > Subject:How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example? > >> > >> Hi, > >> > >> I am using the flink training exercise TaxiRide [1] to execute a > >> stream count of events. On the cluster and on my local machine I am > >> receiving the message that joda.Time cannot be serialized "class > >> org.joda.time.LocalDateTime is not a valid POJO type". However it is > >> starting the job on the cluster, but not in my local machine. So I > >> searched in the internet and it is requested to register the jodaTime > >> class on the environment[2]. I did like this: > >> > >> env.getConfig().registerTypeWithKryoSerializer(DateTime.class, > >> AvroKryoSerializerUtils.JodaDateTimeSerializer.class); > >> env.getConfig().registerTypeWithKryoSerializer(LocalDate.class, > >> AvroKryoSerializerUtils.JodaLocalDateSerializer.class); > >> env.getConfig().registerTypeWithKryoSerializer(LocalTime.class, > >> AvroKryoSerializerUtils.JodaLocalTimeSerializer.class); > >> > >> and I added the joda and avro dependency on the pom.xml: > >> > >> <dependency> > >> <groupId>joda-time</groupId> > >> <artifactId>joda-time</artifactId> > >> </dependency> > >> <dependency> > >> <groupId>org.apache.flink</groupId> > >> <artifactId>flink-avro</artifactId> > >> <version>${project.version}</version> > >> </dependency> > >> > >> I also tested using addDefaultKryoSerializer but I got the same error. > >> For some reason, it is still not working. Does anyone have some hint > >> of what could be happening? > >> > >> Thanks! Felipe > >> [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java > >> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html > >> > >> -- > >> -- Felipe Gutierrez > >> -- skype: felipe.o.gutierrez > >> -- https://felipeogutierrez.blogspot.com |
Hi Felipe, the problem why you cannot submit a job to the Flink cluster is that the client cannot reach the blob server: Caused by: java.io.IOException: Could not connect to BlobServer at address localhost/192.168.56.1:35193 Could you check whether the cluster has been properly started and is reachable under 192.168.56.1:35193? You could also share the cluster logs with us to further debug the problem. Cheers, Till On Sat, Jun 13, 2020 at 12:09 PM Felipe Gutierrez <[hidden email]> wrote: Hi, I tried to change the joda.time maven version to be the same of |
yes. again it trapped me. It was the /etc/hosts that I change when I
am using VMs. Now, even with the INFO "INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.DateTime does not contain a getter for field Millis" my program is running. Thanks! -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Sat, Jun 13, 2020 at 12:40 PM Till Rohrmann <[hidden email]> wrote: > > Hi Felipe, > > the problem why you cannot submit a job to the Flink cluster is that the client cannot reach the blob server: > > Caused by: java.io.IOException: Could not connect to BlobServer at address localhost/192.168.56.1:35193 > > Could you check whether the cluster has been properly started and is reachable under 192.168.56.1:35193? You could also share the cluster logs with us to further debug the problem. > > Cheers, > Till > > On Sat, Jun 13, 2020 at 12:09 PM Felipe Gutierrez <[hidden email]> wrote: >> >> Hi, I tried to change the joda.time maven version to be the same of >> the flink-training example and I am getting this error on IntelliJ. >> Maybe it is more precislyL >> >> 2020-06-13 12:04:27,333 INFO >> org.apache.flink.streaming.runtime.tasks.StreamTask [] - No >> state backend has been configured, using default (Memory / JobManager) >> MemoryStateBackend (data in heap memory / checkpoints to JobManager) >> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, >> maxStateSize: 5242880) >> 2020-06-13 12:04:27,333 INFO >> org.apache.flink.runtime.taskmanager.Task [] - >> reducer -> flat-output -> Sink: sink (4/4) >> (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING. >> 2020-06-13 12:04:27,337 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - >> reducer -> flat-output -> Sink: sink (4/4) >> (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING. >> 2020-06-13 12:04:27,360 INFO >> org.apache.flink.streaming.runtime.tasks.StreamTask [] - No >> state backend has been configured, using default (Memory / JobManager) >> MemoryStateBackend (data in heap memory / checkpoints to JobManager) >> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, >> maxStateSize: 5242880) >> 2020-06-13 12:04:27,360 INFO >> org.apache.flink.runtime.taskmanager.Task [] - >> reducer -> flat-output -> Sink: sink (3/4) >> (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING. >> 2020-06-13 12:04:27,362 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - >> reducer -> flat-output -> Sink: sink (3/4) >> (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING. >> 2020-06-13 12:04:27,376 INFO >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - >> Initializing heap keyed state backend with stream factory. >> 2020-06-13 12:04:27,381 INFO >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - >> Initializing heap keyed state backend with stream factory. >> 2020-06-13 12:04:27,381 INFO >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - >> Initializing heap keyed state backend with stream factory. >> 2020-06-13 12:04:27,389 INFO >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - >> Initializing heap keyed state backend with stream factory. >> 2020-06-13 12:04:27,511 WARN >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - >> Falling back to default Kryo serializer because Chill serializer >> couldn't be found. >> java.lang.reflect.InvocationTargetException: null >> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] >> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> ~[?:?] >> at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> ~[?:?] >> at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] >> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:436) >> [classes/:?] >> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:454) >> [classes/:?] >> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:289) >> [classes/:?] >> at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175) >> [classes/:?] >> at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) >> [classes/:?] >> at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) >> [classes/:?] >> at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71) >> [classes/:?] >> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:117) >> [classes/:?] >> at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) >> [classes/:?] >> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) >> [classes/:?] >> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) >> [classes/:?] >> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) >> [classes/:?] >> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) >> [classes/:?] >> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) >> [classes/:?] >> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) >> [classes/:?] >> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) >> [classes/:?] >> at org.apache.flink.streaming.examples.aggregate.util.TaxiRideSource.generateTaxiRideArray(TaxiRideSource.java:114) >> [classes/:?] >> at org.apache.flink.streaming.examples.aggregate.util.TaxiRideSource.run(TaxiRideSource.java:96) >> [classes/:?] >> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >> [classes/:?] >> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >> [classes/:?] >> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208) >> [classes/:?] >> Caused by: com.esotericsoftware.kryo.KryoException: Unable to resolve >> type variable: A >> at com.esotericsoftware.kryo.util.GenericsUtil.resolveTypeVariable(GenericsUtil.java:114) >> ~[kryo-5.0.0-RC1.jar:?] >> at com.esotericsoftware.kryo.util.GenericsUtil.resolveTypeVariable(GenericsUtil.java:86) >> ~[kryo-5.0.0-RC1.jar:?] >> at com.esotericsoftware.kryo.util.GenericsUtil.resolveType(GenericsUtil.java:41) >> ~[kryo-5.0.0-RC1.jar:?] >> at com.esotericsoftware.kryo.util.Generics$GenericType.initialize(Generics.java:263) >> ~[kryo-5.0.0-RC1.jar:?] >> at com.esotericsoftware.kryo.util.Generics$GenericType.<init>(Generics.java:228) >> ~[kryo-5.0.0-RC1.jar:?] >> at com.esotericsoftware.kryo.util.Generics$GenericType.initialize(Generics.java:242) >> ~[kryo-5.0.0-RC1.jar:?] >> at com.esotericsoftware.kryo.util.Generics$GenericType.<init>(Generics.java:228) >> ~[kryo-5.0.0-RC1.jar:?] >> at com.esotericsoftware.kryo.serializers.CachedFields.addField(CachedFields.java:139) >> ~[kryo-5.0.0-RC1.jar:?] >> at com.esotericsoftware.kryo.serializers.CachedFields.rebuild(CachedFields.java:99) >> ~[kryo-5.0.0-RC1.jar:?] >> at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:82) >> ~[kryo-5.0.0-RC1.jar:?] >> at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:68) >> ~[kryo-5.0.0-RC1.jar:?] >> at org.apache.flink.runtime.types.ScalaCollectionsRegistrar.useField$1(FlinkScalaKryoInstantiator.scala:93) >> ~[classes/:?] >> at org.apache.flink.runtime.types.ScalaCollectionsRegistrar.apply(FlinkScalaKryoInstantiator.scala:98) >> ~[classes/:?] >> at org.apache.flink.runtime.types.AllScalaRegistrar.apply(FlinkScalaKryoInstantiator.scala:172) >> ~[classes/:?] >> at org.apache.flink.runtime.types.FlinkScalaKryoInstantiator.newKryo(FlinkScalaKryoInstantiator.scala:84) >> ~[classes/:?] >> ... 25 more >> -- >> -- Felipe Gutierrez >> -- skype: felipe.o.gutierrez >> -- https://felipeogutierrez.blogspot.com >> >> On Sat, Jun 13, 2020 at 10:57 AM Felipe Gutierrez >> <[hidden email]> wrote: >> > >> > Hi Yun, it says an INFO "class org.joda.time.DateTime cannot be used >> > as a POJO type because not all fields are valid POJO fields, and must >> > be processed as GenericType. Please read the Flink documentation on >> > "Data Types & Serialization" for details of the effect on >> > performance", however I cannot submit my job. It is strange because I >> > can start and run it on Intellij, but not on the standalone cluster in >> > my machine. >> > >> > 2020-06-13 10:50:56,051 INFO org.apache.flink.core.fs.FileSystem >> > - Hadoop is not in the classpath/dependencies. >> > The extended set of supported File Systems via Hadoop is not >> > available. >> > 2020-06-13 10:50:56,143 INFO >> > org.apache.flink.runtime.security.modules.HadoopModuleFactory - >> > Cannot create Hadoop Security Module because Hadoop cannot be found in >> > the Classpath. >> > 2020-06-13 10:50:56,164 INFO >> > org.apache.flink.runtime.security.modules.JaasModule - Jaas >> > file will be created as /tmp/jaas-837993701496785981.conf. >> > 2020-06-13 10:50:56,169 INFO >> > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory >> > - Cannot install HadoopSecurityContext because Hadoop cannot be found >> > in the Classpath. >> > 2020-06-13 10:50:56,169 WARN >> > org.apache.flink.runtime.security.SecurityUtils - Unable >> > to install incompatible security context factory >> > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory >> > 2020-06-13 10:50:56,171 INFO org.apache.flink.client.cli.CliFrontend >> > - Running 'run' command. >> > 2020-06-13 10:50:56,242 INFO org.apache.flink.client.cli.CliFrontend >> > - Building program from JAR file >> > 2020-06-13 10:50:57,084 INFO org.apache.flink.client.ClientUtils >> > - Starting program (detached: false) >> > 2020-06-13 10:50:59,021 INFO >> > org.apache.flink.api.java.typeutils.TypeExtractor - class >> > org.joda.time.DateTime does not contain a getter for field iMillis >> > 2020-06-13 10:50:59,021 INFO >> > org.apache.flink.api.java.typeutils.TypeExtractor - class >> > org.joda.time.DateTime does not contain a setter for field iMillis >> > 2020-06-13 10:50:59,021 INFO >> > org.apache.flink.api.java.typeutils.TypeExtractor - Class >> > class org.joda.time.DateTime cannot be used as a POJO type because not >> > all fields are valid POJO fields, and must be processed as >> > GenericType. Please read the Flink documentation on "Data Types & >> > Serialization" for details of the effect on performance. >> > 2020-06-13 10:50:59,028 INFO >> > org.apache.flink.api.java.typeutils.TypeExtractor - class >> > org.joda.time.DateTime does not contain a getter for field iMillis >> > 2020-06-13 10:50:59,028 INFO >> > org.apache.flink.api.java.typeutils.TypeExtractor - class >> > org.joda.time.DateTime does not contain a setter for field iMillis >> > 2020-06-13 10:50:59,028 INFO >> > org.apache.flink.api.java.typeutils.TypeExtractor - Class >> > class org.joda.time.DateTime cannot be used as a POJO type because not >> > all fields are valid POJO fields, and must be processed as >> > GenericType. Please read the Flink documentation on "Data Types & >> > Serialization" for details of the effect on performance. >> > 2020-06-13 10:53:19,508 WARN org.apache.flink.util.ExecutorUtils >> > - ExecutorService did not terminate in time. >> > Shutting it down now. >> > 2020-06-13 10:53:19,510 ERROR org.apache.flink.client.cli.CliFrontend >> > - Error while running the command. >> > org.apache.flink.client.program.ProgramInvocationException: The main >> > method caused an error: java.util.concurrent.ExecutionException: >> > org.apache.flink.runtime.client.JobSubmissionException: Failed to >> > submit JobGraph. >> > at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) >> > at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >> > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:148) >> > at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662) >> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) >> > at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893) >> > at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) >> > at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) >> > Caused by: java.lang.RuntimeException: >> > java.util.concurrent.ExecutionException: >> > org.apache.flink.runtime.client.JobSubmissionException: Failed to >> > submit JobGraph. >> > at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:276) >> > at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1764) >> > at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:106) >> > at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:72) >> > at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643) >> > at org.apache.flink.streaming.examples.aggregate.TaxiRideCountPreAggregate.main(TaxiRideCountPreAggregate.java:136) >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> > at java.lang.reflect.Method.invoke(Method.java:498) >> > at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >> > ... 8 more >> > Caused by: java.util.concurrent.ExecutionException: >> > org.apache.flink.runtime.client.JobSubmissionException: Failed to >> > submit JobGraph. >> > at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >> > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) >> > at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1759) >> > ... 17 more >> > Caused by: org.apache.flink.runtime.client.JobSubmissionException: >> > Failed to submit JobGraph. >> > at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359) >> > at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) >> > at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866) >> > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) >> > at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) >> > at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274) >> > at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) >> > at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) >> > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) >> > at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) >> > at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) >> > at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) >> > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> > at java.lang.Thread.run(Thread.java:748) >> > Caused by: org.apache.flink.runtime.rest.util.RestClientException: >> > [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not >> > upload job files. >> > at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:169) >> > at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119) >> > at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084) >> > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) >> > at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) >> > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> > at java.lang.Thread.run(Thread.java:748) >> > Caused by: org.apache.flink.util.FlinkException: Could not upload job files. >> > at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:80) >> > at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:167) >> > ... 7 more >> > Caused by: java.io.IOException: Could not connect to BlobServer at >> > address localhost/192.168.56.1:35193 >> > at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100) >> > at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:167) >> > at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:76) >> > ... 8 more >> > Caused by: java.net.ConnectException: Connection timed out (Connection >> > timed out) >> > at java.net.PlainSocketImpl.socketConnect(Native Method) >> > at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) >> > at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) >> > at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) >> > at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) >> > at java.net.Socket.connect(Socket.java:607) >> > at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95) >> > ... 10 more >> > ] >> > at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) >> > at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) >> > at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) >> > at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) >> > ... 4 more >> > -- >> > -- Felipe Gutierrez >> > -- skype: felipe.o.gutierrez >> > -- https://felipeogutierrez.blogspot.com >> > >> > On Sat, Jun 13, 2020 at 5:08 AM Yun Gao <[hidden email]> wrote: >> > > >> > > Hi Felipe, >> > > >> > > I tested the basic RideCleansingExercise[1] jobs that uses the TaxiRide type locally and it seems to be able to startup normally. >> > > >> > > Could you also share your current executing code and the full stacktrace of the exception ? >> > > >> > > Best, >> > > Yun >> > > >> > > [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/basics/RideCleansingExercise.java >> > > >> > > ------------------Original Mail ------------------ >> > > Sender:Felipe Gutierrez <[hidden email]> >> > > Send Date:Fri Jun 12 23:11:28 2020 >> > > Recipients:user <[hidden email]> >> > > Subject:How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example? >> > >> >> > >> Hi, >> > >> >> > >> I am using the flink training exercise TaxiRide [1] to execute a >> > >> stream count of events. On the cluster and on my local machine I am >> > >> receiving the message that joda.Time cannot be serialized "class >> > >> org.joda.time.LocalDateTime is not a valid POJO type". However it is >> > >> starting the job on the cluster, but not in my local machine. So I >> > >> searched in the internet and it is requested to register the jodaTime >> > >> class on the environment[2]. I did like this: >> > >> >> > >> env.getConfig().registerTypeWithKryoSerializer(DateTime.class, >> > >> AvroKryoSerializerUtils.JodaDateTimeSerializer.class); >> > >> env.getConfig().registerTypeWithKryoSerializer(LocalDate.class, >> > >> AvroKryoSerializerUtils.JodaLocalDateSerializer.class); >> > >> env.getConfig().registerTypeWithKryoSerializer(LocalTime.class, >> > >> AvroKryoSerializerUtils.JodaLocalTimeSerializer.class); >> > >> >> > >> and I added the joda and avro dependency on the pom.xml: >> > >> >> > >> <dependency> >> > >> <groupId>joda-time</groupId> >> > >> <artifactId>joda-time</artifactId> >> > >> </dependency> >> > >> <dependency> >> > >> <groupId>org.apache.flink</groupId> >> > >> <artifactId>flink-avro</artifactId> >> > >> <version>${project.version}</version> >> > >> </dependency> >> > >> >> > >> I also tested using addDefaultKryoSerializer but I got the same error. >> > >> For some reason, it is still not working. Does anyone have some hint >> > >> of what could be happening? >> > >> >> > >> Thanks! Felipe >> > >> [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java >> > >> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html >> > >> >> > >> -- >> > >> -- Felipe Gutierrez >> > >> -- skype: felipe.o.gutierrez >> > >> -- https://felipeogutierrez.blogspot.com |
Great to hear that it is now working. Cheers, Till On Sat, Jun 13, 2020, 12:58 Felipe Gutierrez <[hidden email]> wrote: yes. again it trapped me. It was the /etc/hosts that I change when I |
Free forum by Nabble | Edit this page |