How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?

Felipe Gutierrez
Hi,

I am using the flink training exercise TaxiRide [1] to execute a
stream count of events. On the cluster and on my local machine I am
receiving the message that joda.Time cannot be serialized "class
org.joda.time.LocalDateTime is not a valid POJO type". However it is
starting the job on the cluster, but not in my local machine. So I
searched in the internet and it is requested to register the jodaTime
class on the environment[2]. I did like this:

env.getConfig().registerTypeWithKryoSerializer(DateTime.class,
AvroKryoSerializerUtils.JodaDateTimeSerializer.class);
env.getConfig().registerTypeWithKryoSerializer(LocalDate.class,
AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
env.getConfig().registerTypeWithKryoSerializer(LocalTime.class,
AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);

and I added the joda and avro dependency on the pom.xml:

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
</dependency>

I also tested using addDefaultKryoSerializer but I got the same error.
For some reason, it is still not working. Does anyone have some hint
of what could be happening?

Thanks! Felipe
[1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?

Yun Gao
Hi Felipe,

   I tested the basic RideCleansingExercise[1] jobs that uses the TaxiRide type locally and it seems to be able to startup normally.

   Could you also share your current executing code and the full stacktrace of the exception ?

Best,
 Yun

------------------Original Mail ------------------
Sender:Felipe Gutierrez <[hidden email]>
Send Date:Fri Jun 12 23:11:28 2020
Recipients:user <[hidden email]>
Subject:How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?
Hi,

I am using the flink training exercise TaxiRide [1] to execute a
stream count of events. On the cluster and on my local machine I am
receiving the message that joda.Time cannot be serialized "class
org.joda.time.LocalDateTime is not a valid POJO type". However it is
starting the job on the cluster, but not in my local machine. So I
searched in the internet and it is requested to register the jodaTime
class on the environment[2]. I did like this:

env.getConfig().registerTypeWithKryoSerializer(DateTime.class,
AvroKryoSerializerUtils.JodaDateTimeSerializer.class);
env.getConfig().registerTypeWithKryoSerializer(LocalDate.class,
AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
env.getConfig().registerTypeWithKryoSerializer(LocalTime.class,
AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);

and I added the joda and avro dependency on the pom.xml:

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
</dependency>

I also tested using addDefaultKryoSerializer but I got the same error.
For some reason, it is still not working. Does anyone have some hint
of what could be happening?

Thanks! Felipe
[1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?

Felipe Gutierrez
Hi Yun, it says an INFO "class org.joda.time.DateTime cannot be used
as a POJO type because not all fields are valid POJO fields, and must
be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on
performance", however I cannot submit my job. It is strange because I
can start and run it on Intellij, but not on the standalone cluster in
my machine.

2020-06-13 10:50:56,051 INFO  org.apache.flink.core.fs.FileSystem
                     - Hadoop is not in the classpath/dependencies.
The extended set of supported File Systems via Hadoop is not
available.
2020-06-13 10:50:56,143 INFO
org.apache.flink.runtime.security.modules.HadoopModuleFactory  -
Cannot create Hadoop Security Module because Hadoop cannot be found in
the Classpath.
2020-06-13 10:50:56,164 INFO
org.apache.flink.runtime.security.modules.JaasModule          - Jaas
file will be created as /tmp/jaas-837993701496785981.conf.
2020-06-13 10:50:56,169 INFO
org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory
 - Cannot install HadoopSecurityContext because Hadoop cannot be found
in the Classpath.
2020-06-13 10:50:56,169 WARN
org.apache.flink.runtime.security.SecurityUtils               - Unable
to install incompatible security context factory
org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory
2020-06-13 10:50:56,171 INFO  org.apache.flink.client.cli.CliFrontend
                     - Running 'run' command.
2020-06-13 10:50:56,242 INFO  org.apache.flink.client.cli.CliFrontend
                     - Building program from JAR file
2020-06-13 10:50:57,084 INFO  org.apache.flink.client.ClientUtils
                     - Starting program (detached: false)
2020-06-13 10:50:59,021 INFO
org.apache.flink.api.java.typeutils.TypeExtractor             - class
org.joda.time.DateTime does not contain a getter for field iMillis
2020-06-13 10:50:59,021 INFO
org.apache.flink.api.java.typeutils.TypeExtractor             - class
org.joda.time.DateTime does not contain a setter for field iMillis
2020-06-13 10:50:59,021 INFO
org.apache.flink.api.java.typeutils.TypeExtractor             - Class
class org.joda.time.DateTime cannot be used as a POJO type because not
all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.
2020-06-13 10:50:59,028 INFO
org.apache.flink.api.java.typeutils.TypeExtractor             - class
org.joda.time.DateTime does not contain a getter for field iMillis
2020-06-13 10:50:59,028 INFO
org.apache.flink.api.java.typeutils.TypeExtractor             - class
org.joda.time.DateTime does not contain a setter for field iMillis
2020-06-13 10:50:59,028 INFO
org.apache.flink.api.java.typeutils.TypeExtractor             - Class
class org.joda.time.DateTime cannot be used as a POJO type because not
all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.
2020-06-13 10:53:19,508 WARN  org.apache.flink.util.ExecutorUtils
                     - ExecutorService did not terminate in time.
Shutting it down now.
2020-06-13 10:53:19,510 ERROR org.apache.flink.client.cli.CliFrontend
                     - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobSubmissionException: Failed to
submit JobGraph.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:148)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.lang.RuntimeException:
java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobSubmissionException: Failed to
submit JobGraph.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:276)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1764)
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:106)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:72)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
at org.apache.flink.streaming.examples.aggregate.TaxiRideCountPreAggregate.main(TaxiRideCountPreAggregate.java:136)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 8 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobSubmissionException: Failed to
submit JobGraph.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1759)
... 17 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException:
Failed to submit JobGraph.
at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException:
[org.apache.flink.runtime.rest.handler.RestHandlerException: Could not
upload job files.
at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:169)
at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)
at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not upload job files.
at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:80)
at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:167)
... 7 more
Caused by: java.io.IOException: Could not connect to BlobServer at
address localhost/192.168.56.1:35193
at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100)
at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:167)
at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:76)
... 8 more
Caused by: java.net.ConnectException: Connection timed out (Connection
timed out)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95)
... 10 more
]
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 4 more
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Sat, Jun 13, 2020 at 5:08 AM Yun Gao <[hidden email]> wrote:

>
> Hi Felipe,
>
>    I tested the basic RideCleansingExercise[1] jobs that uses the TaxiRide type locally and it seems to be able to startup normally.
>
>    Could you also share your current executing code and the full stacktrace of the exception ?
>
> Best,
>  Yun
>
>  [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/basics/RideCleansingExercise.java
>
> ------------------Original Mail ------------------
> Sender:Felipe Gutierrez <[hidden email]>
> Send Date:Fri Jun 12 23:11:28 2020
> Recipients:user <[hidden email]>
> Subject:How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?
>>
>> Hi,
>>
>> I am using the flink training exercise TaxiRide [1] to execute a
>> stream count of events. On the cluster and on my local machine I am
>> receiving the message that joda.Time cannot be serialized "class
>> org.joda.time.LocalDateTime is not a valid POJO type". However it is
>> starting the job on the cluster, but not in my local machine. So I
>> searched in the internet and it is requested to register the jodaTime
>> class on the environment[2]. I did like this:
>>
>> env.getConfig().registerTypeWithKryoSerializer(DateTime.class,
>> AvroKryoSerializerUtils.JodaDateTimeSerializer.class);
>> env.getConfig().registerTypeWithKryoSerializer(LocalDate.class,
>> AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
>> env.getConfig().registerTypeWithKryoSerializer(LocalTime.class,
>> AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);
>>
>> and I added the joda and avro dependency on the pom.xml:
>>
>> <dependency>
>> <groupId>joda-time</groupId>
>> <artifactId>joda-time</artifactId>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-avro</artifactId>
>> <version>${project.version}</version>
>> </dependency>
>>
>> I also tested using addDefaultKryoSerializer but I got the same error.
>> For some reason, it is still not working. Does anyone have some hint
>> of what could be happening?
>>
>> Thanks! Felipe
>> [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java
>> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
>>
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?

Felipe Gutierrez
Hi, I tried to change the joda.time maven version to be the same of
the flink-training example and I am getting this error on IntelliJ.
Maybe it is more precislyL

2020-06-13 12:04:27,333 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No
state backend has been configured, using default (Memory / JobManager)
MemoryStateBackend (data in heap memory / checkpoints to JobManager)
(checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
maxStateSize: 5242880)
2020-06-13 12:04:27,333 INFO
org.apache.flink.runtime.taskmanager.Task                    [] -
reducer -> flat-output -> Sink: sink (4/4)
(bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING.
2020-06-13 12:04:27,337 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
reducer -> flat-output -> Sink: sink (4/4)
(bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING.
2020-06-13 12:04:27,360 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No
state backend has been configured, using default (Memory / JobManager)
MemoryStateBackend (data in heap memory / checkpoints to JobManager)
(checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
maxStateSize: 5242880)
2020-06-13 12:04:27,360 INFO
org.apache.flink.runtime.taskmanager.Task                    [] -
reducer -> flat-output -> Sink: sink (3/4)
(0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING.
2020-06-13 12:04:27,362 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
reducer -> flat-output -> Sink: sink (3/4)
(0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING.
2020-06-13 12:04:27,376 INFO
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
Initializing heap keyed state backend with stream factory.
2020-06-13 12:04:27,381 INFO
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
Initializing heap keyed state backend with stream factory.
2020-06-13 12:04:27,381 INFO
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
Initializing heap keyed state backend with stream factory.
2020-06-13 12:04:27,389 INFO
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
Initializing heap keyed state backend with stream factory.
2020-06-13 12:04:27,511 WARN
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] -
Falling back to default Kryo serializer because Chill serializer
couldn't be found.
java.lang.reflect.InvocationTargetException: null
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:436)
[classes/:?]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:454)
[classes/:?]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:289)
[classes/:?]
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
[classes/:?]
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
[classes/:?]
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
[classes/:?]
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71)
[classes/:?]
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:117)
[classes/:?]
at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
[classes/:?]
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
[classes/:?]
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
[classes/:?]
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
[classes/:?]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
[classes/:?]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
[classes/:?]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
[classes/:?]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
[classes/:?]
at org.apache.flink.streaming.examples.aggregate.util.TaxiRideSource.generateTaxiRideArray(TaxiRideSource.java:114)
[classes/:?]
at org.apache.flink.streaming.examples.aggregate.util.TaxiRideSource.run(TaxiRideSource.java:96)
[classes/:?]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
[classes/:?]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
[classes/:?]
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
[classes/:?]
Caused by: com.esotericsoftware.kryo.KryoException: Unable to resolve
type variable: A
at com.esotericsoftware.kryo.util.GenericsUtil.resolveTypeVariable(GenericsUtil.java:114)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.util.GenericsUtil.resolveTypeVariable(GenericsUtil.java:86)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.util.GenericsUtil.resolveType(GenericsUtil.java:41)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.util.Generics$GenericType.initialize(Generics.java:263)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.util.Generics$GenericType.<init>(Generics.java:228)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.util.Generics$GenericType.initialize(Generics.java:242)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.util.Generics$GenericType.<init>(Generics.java:228)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.serializers.CachedFields.addField(CachedFields.java:139)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.serializers.CachedFields.rebuild(CachedFields.java:99)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:82)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:68)
~[kryo-5.0.0-RC1.jar:?]
at org.apache.flink.runtime.types.ScalaCollectionsRegistrar.useField$1(FlinkScalaKryoInstantiator.scala:93)
~[classes/:?]
at org.apache.flink.runtime.types.ScalaCollectionsRegistrar.apply(FlinkScalaKryoInstantiator.scala:98)
~[classes/:?]
at org.apache.flink.runtime.types.AllScalaRegistrar.apply(FlinkScalaKryoInstantiator.scala:172)
~[classes/:?]
at org.apache.flink.runtime.types.FlinkScalaKryoInstantiator.newKryo(FlinkScalaKryoInstantiator.scala:84)
~[classes/:?]
... 25 more
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Sat, Jun 13, 2020 at 10:57 AM Felipe Gutierrez
<[hidden email]> wrote:

>
> Hi Yun, it says an INFO "class org.joda.time.DateTime cannot be used
> as a POJO type because not all fields are valid POJO fields, and must
> be processed as GenericType. Please read the Flink documentation on
> "Data Types & Serialization" for details of the effect on
> performance", however I cannot submit my job. It is strange because I
> can start and run it on Intellij, but not on the standalone cluster in
> my machine.
>
> 2020-06-13 10:50:56,051 INFO  org.apache.flink.core.fs.FileSystem
>                      - Hadoop is not in the classpath/dependencies.
> The extended set of supported File Systems via Hadoop is not
> available.
> 2020-06-13 10:50:56,143 INFO
> org.apache.flink.runtime.security.modules.HadoopModuleFactory  -
> Cannot create Hadoop Security Module because Hadoop cannot be found in
> the Classpath.
> 2020-06-13 10:50:56,164 INFO
> org.apache.flink.runtime.security.modules.JaasModule          - Jaas
> file will be created as /tmp/jaas-837993701496785981.conf.
> 2020-06-13 10:50:56,169 INFO
> org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory
>  - Cannot install HadoopSecurityContext because Hadoop cannot be found
> in the Classpath.
> 2020-06-13 10:50:56,169 WARN
> org.apache.flink.runtime.security.SecurityUtils               - Unable
> to install incompatible security context factory
> org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory
> 2020-06-13 10:50:56,171 INFO  org.apache.flink.client.cli.CliFrontend
>                      - Running 'run' command.
> 2020-06-13 10:50:56,242 INFO  org.apache.flink.client.cli.CliFrontend
>                      - Building program from JAR file
> 2020-06-13 10:50:57,084 INFO  org.apache.flink.client.ClientUtils
>                      - Starting program (detached: false)
> 2020-06-13 10:50:59,021 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor             - class
> org.joda.time.DateTime does not contain a getter for field iMillis
> 2020-06-13 10:50:59,021 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor             - class
> org.joda.time.DateTime does not contain a setter for field iMillis
> 2020-06-13 10:50:59,021 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor             - Class
> class org.joda.time.DateTime cannot be used as a POJO type because not
> all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> 2020-06-13 10:50:59,028 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor             - class
> org.joda.time.DateTime does not contain a getter for field iMillis
> 2020-06-13 10:50:59,028 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor             - class
> org.joda.time.DateTime does not contain a setter for field iMillis
> 2020-06-13 10:50:59,028 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor             - Class
> class org.joda.time.DateTime cannot be used as a POJO type because not
> all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> 2020-06-13 10:53:19,508 WARN  org.apache.flink.util.ExecutorUtils
>                      - ExecutorService did not terminate in time.
> Shutting it down now.
> 2020-06-13 10:53:19,510 ERROR org.apache.flink.client.cli.CliFrontend
>                      - Error while running the command.
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to
> submit JobGraph.
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:148)
> at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
> at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> Caused by: java.lang.RuntimeException:
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to
> submit JobGraph.
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:276)
> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1764)
> at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:106)
> at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:72)
> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
> at org.apache.flink.streaming.examples.aggregate.TaxiRideCountPreAggregate.main(TaxiRideCountPreAggregate.java:136)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> ... 8 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to
> submit JobGraph.
> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1759)
> ... 17 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> Failed to submit JobGraph.
> at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
> at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
> at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
> at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
> at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not
> upload job files.
> at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:169)
> at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)
> at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not upload job files.
> at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:80)
> at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:167)
> ... 7 more
> Caused by: java.io.IOException: Could not connect to BlobServer at
> address localhost/192.168.56.1:35193
> at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100)
> at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:167)
> at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:76)
> ... 8 more
> Caused by: java.net.ConnectException: Connection timed out (Connection
> timed out)
> at java.net.PlainSocketImpl.socketConnect(Native Method)
> at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
> at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
> at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> at java.net.Socket.connect(Socket.java:607)
> at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95)
> ... 10 more
> ]
> at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
> at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
> at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
> at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
> ... 4 more
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Sat, Jun 13, 2020 at 5:08 AM Yun Gao <[hidden email]> wrote:
> >
> > Hi Felipe,
> >
> >    I tested the basic RideCleansingExercise[1] jobs that uses the TaxiRide type locally and it seems to be able to startup normally.
> >
> >    Could you also share your current executing code and the full stacktrace of the exception ?
> >
> > Best,
> >  Yun
> >
> >  [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/basics/RideCleansingExercise.java
> >
> > ------------------Original Mail ------------------
> > Sender:Felipe Gutierrez <[hidden email]>
> > Send Date:Fri Jun 12 23:11:28 2020
> > Recipients:user <[hidden email]>
> > Subject:How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?
> >>
> >> Hi,
> >>
> >> I am using the flink training exercise TaxiRide [1] to execute a
> >> stream count of events. On the cluster and on my local machine I am
> >> receiving the message that joda.Time cannot be serialized "class
> >> org.joda.time.LocalDateTime is not a valid POJO type". However it is
> >> starting the job on the cluster, but not in my local machine. So I
> >> searched in the internet and it is requested to register the jodaTime
> >> class on the environment[2]. I did like this:
> >>
> >> env.getConfig().registerTypeWithKryoSerializer(DateTime.class,
> >> AvroKryoSerializerUtils.JodaDateTimeSerializer.class);
> >> env.getConfig().registerTypeWithKryoSerializer(LocalDate.class,
> >> AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
> >> env.getConfig().registerTypeWithKryoSerializer(LocalTime.class,
> >> AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);
> >>
> >> and I added the joda and avro dependency on the pom.xml:
> >>
> >> <dependency>
> >> <groupId>joda-time</groupId>
> >> <artifactId>joda-time</artifactId>
> >> </dependency>
> >> <dependency>
> >> <groupId>org.apache.flink</groupId>
> >> <artifactId>flink-avro</artifactId>
> >> <version>${project.version}</version>
> >> </dependency>
> >>
> >> I also tested using addDefaultKryoSerializer but I got the same error.
> >> For some reason, it is still not working. Does anyone have some hint
> >> of what could be happening?
> >>
> >> Thanks! Felipe
> >> [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java
> >> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
> >>
> >> --
> >> -- Felipe Gutierrez
> >> -- skype: felipe.o.gutierrez
> >> -- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?

Till Rohrmann
Hi Felipe,

the problem why you cannot submit a job to the Flink cluster is that the client cannot reach the blob server:

Caused by: java.io.IOException: Could not connect to BlobServer at address localhost/192.168.56.1:35193

Could you check whether the cluster has been properly started and is reachable under 192.168.56.1:35193? You could also share the cluster logs with us to further debug the problem.

Cheers,
Till

On Sat, Jun 13, 2020 at 12:09 PM Felipe Gutierrez <[hidden email]> wrote:
Hi, I tried to change the joda.time maven version to be the same of
the flink-training example and I am getting this error on IntelliJ.
Maybe it is more precislyL

2020-06-13 12:04:27,333 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No
state backend has been configured, using default (Memory / JobManager)
MemoryStateBackend (data in heap memory / checkpoints to JobManager)
(checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
maxStateSize: 5242880)
2020-06-13 12:04:27,333 INFO
org.apache.flink.runtime.taskmanager.Task                    [] -
reducer -> flat-output -> Sink: sink (4/4)
(bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING.
2020-06-13 12:04:27,337 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
reducer -> flat-output -> Sink: sink (4/4)
(bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING.
2020-06-13 12:04:27,360 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No
state backend has been configured, using default (Memory / JobManager)
MemoryStateBackend (data in heap memory / checkpoints to JobManager)
(checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
maxStateSize: 5242880)
2020-06-13 12:04:27,360 INFO
org.apache.flink.runtime.taskmanager.Task                    [] -
reducer -> flat-output -> Sink: sink (3/4)
(0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING.
2020-06-13 12:04:27,362 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
reducer -> flat-output -> Sink: sink (3/4)
(0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING.
2020-06-13 12:04:27,376 INFO
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
Initializing heap keyed state backend with stream factory.
2020-06-13 12:04:27,381 INFO
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
Initializing heap keyed state backend with stream factory.
2020-06-13 12:04:27,381 INFO
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
Initializing heap keyed state backend with stream factory.
2020-06-13 12:04:27,389 INFO
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
Initializing heap keyed state backend with stream factory.
2020-06-13 12:04:27,511 WARN
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] -
Falling back to default Kryo serializer because Chill serializer
couldn't be found.
java.lang.reflect.InvocationTargetException: null
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:436)
[classes/:?]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:454)
[classes/:?]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:289)
[classes/:?]
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
[classes/:?]
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
[classes/:?]
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
[classes/:?]
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71)
[classes/:?]
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:117)
[classes/:?]
at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
[classes/:?]
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
[classes/:?]
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
[classes/:?]
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
[classes/:?]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
[classes/:?]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
[classes/:?]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
[classes/:?]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
[classes/:?]
at org.apache.flink.streaming.examples.aggregate.util.TaxiRideSource.generateTaxiRideArray(TaxiRideSource.java:114)
[classes/:?]
at org.apache.flink.streaming.examples.aggregate.util.TaxiRideSource.run(TaxiRideSource.java:96)
[classes/:?]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
[classes/:?]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
[classes/:?]
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
[classes/:?]
Caused by: com.esotericsoftware.kryo.KryoException: Unable to resolve
type variable: A
at com.esotericsoftware.kryo.util.GenericsUtil.resolveTypeVariable(GenericsUtil.java:114)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.util.GenericsUtil.resolveTypeVariable(GenericsUtil.java:86)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.util.GenericsUtil.resolveType(GenericsUtil.java:41)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.util.Generics$GenericType.initialize(Generics.java:263)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.util.Generics$GenericType.<init>(Generics.java:228)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.util.Generics$GenericType.initialize(Generics.java:242)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.util.Generics$GenericType.<init>(Generics.java:228)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.serializers.CachedFields.addField(CachedFields.java:139)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.serializers.CachedFields.rebuild(CachedFields.java:99)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:82)
~[kryo-5.0.0-RC1.jar:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:68)
~[kryo-5.0.0-RC1.jar:?]
at org.apache.flink.runtime.types.ScalaCollectionsRegistrar.useField$1(FlinkScalaKryoInstantiator.scala:93)
~[classes/:?]
at org.apache.flink.runtime.types.ScalaCollectionsRegistrar.apply(FlinkScalaKryoInstantiator.scala:98)
~[classes/:?]
at org.apache.flink.runtime.types.AllScalaRegistrar.apply(FlinkScalaKryoInstantiator.scala:172)
~[classes/:?]
at org.apache.flink.runtime.types.FlinkScalaKryoInstantiator.newKryo(FlinkScalaKryoInstantiator.scala:84)
~[classes/:?]
... 25 more
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Sat, Jun 13, 2020 at 10:57 AM Felipe Gutierrez
<[hidden email]> wrote:
>
> Hi Yun, it says an INFO "class org.joda.time.DateTime cannot be used
> as a POJO type because not all fields are valid POJO fields, and must
> be processed as GenericType. Please read the Flink documentation on
> "Data Types & Serialization" for details of the effect on
> performance", however I cannot submit my job. It is strange because I
> can start and run it on Intellij, but not on the standalone cluster in
> my machine.
>
> 2020-06-13 10:50:56,051 INFO  org.apache.flink.core.fs.FileSystem
>                      - Hadoop is not in the classpath/dependencies.
> The extended set of supported File Systems via Hadoop is not
> available.
> 2020-06-13 10:50:56,143 INFO
> org.apache.flink.runtime.security.modules.HadoopModuleFactory  -
> Cannot create Hadoop Security Module because Hadoop cannot be found in
> the Classpath.
> 2020-06-13 10:50:56,164 INFO
> org.apache.flink.runtime.security.modules.JaasModule          - Jaas
> file will be created as /tmp/jaas-837993701496785981.conf.
> 2020-06-13 10:50:56,169 INFO
> org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory
>  - Cannot install HadoopSecurityContext because Hadoop cannot be found
> in the Classpath.
> 2020-06-13 10:50:56,169 WARN
> org.apache.flink.runtime.security.SecurityUtils               - Unable
> to install incompatible security context factory
> org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory
> 2020-06-13 10:50:56,171 INFO  org.apache.flink.client.cli.CliFrontend
>                      - Running 'run' command.
> 2020-06-13 10:50:56,242 INFO  org.apache.flink.client.cli.CliFrontend
>                      - Building program from JAR file
> 2020-06-13 10:50:57,084 INFO  org.apache.flink.client.ClientUtils
>                      - Starting program (detached: false)
> 2020-06-13 10:50:59,021 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor             - class
> org.joda.time.DateTime does not contain a getter for field iMillis
> 2020-06-13 10:50:59,021 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor             - class
> org.joda.time.DateTime does not contain a setter for field iMillis
> 2020-06-13 10:50:59,021 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor             - Class
> class org.joda.time.DateTime cannot be used as a POJO type because not
> all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> 2020-06-13 10:50:59,028 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor             - class
> org.joda.time.DateTime does not contain a getter for field iMillis
> 2020-06-13 10:50:59,028 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor             - class
> org.joda.time.DateTime does not contain a setter for field iMillis
> 2020-06-13 10:50:59,028 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor             - Class
> class org.joda.time.DateTime cannot be used as a POJO type because not
> all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> 2020-06-13 10:53:19,508 WARN  org.apache.flink.util.ExecutorUtils
>                      - ExecutorService did not terminate in time.
> Shutting it down now.
> 2020-06-13 10:53:19,510 ERROR org.apache.flink.client.cli.CliFrontend
>                      - Error while running the command.
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to
> submit JobGraph.
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:148)
> at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
> at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> Caused by: java.lang.RuntimeException:
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to
> submit JobGraph.
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:276)
> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1764)
> at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:106)
> at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:72)
> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
> at org.apache.flink.streaming.examples.aggregate.TaxiRideCountPreAggregate.main(TaxiRideCountPreAggregate.java:136)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> ... 8 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to
> submit JobGraph.
> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1759)
> ... 17 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> Failed to submit JobGraph.
> at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
> at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
> at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
> at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
> at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not
> upload job files.
> at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:169)
> at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)
> at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not upload job files.
> at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:80)
> at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:167)
> ... 7 more
> Caused by: java.io.IOException: Could not connect to BlobServer at
> address localhost/192.168.56.1:35193
> at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100)
> at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:167)
> at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:76)
> ... 8 more
> Caused by: java.net.ConnectException: Connection timed out (Connection
> timed out)
> at java.net.PlainSocketImpl.socketConnect(Native Method)
> at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
> at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
> at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> at java.net.Socket.connect(Socket.java:607)
> at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95)
> ... 10 more
> ]
> at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
> at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
> at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
> at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
> ... 4 more
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Sat, Jun 13, 2020 at 5:08 AM Yun Gao <[hidden email]> wrote:
> >
> > Hi Felipe,
> >
> >    I tested the basic RideCleansingExercise[1] jobs that uses the TaxiRide type locally and it seems to be able to startup normally.
> >
> >    Could you also share your current executing code and the full stacktrace of the exception ?
> >
> > Best,
> >  Yun
> >
> >  [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/basics/RideCleansingExercise.java
> >
> > ------------------Original Mail ------------------
> > Sender:Felipe Gutierrez <[hidden email]>
> > Send Date:Fri Jun 12 23:11:28 2020
> > Recipients:user <[hidden email]>
> > Subject:How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?
> >>
> >> Hi,
> >>
> >> I am using the flink training exercise TaxiRide [1] to execute a
> >> stream count of events. On the cluster and on my local machine I am
> >> receiving the message that joda.Time cannot be serialized "class
> >> org.joda.time.LocalDateTime is not a valid POJO type". However it is
> >> starting the job on the cluster, but not in my local machine. So I
> >> searched in the internet and it is requested to register the jodaTime
> >> class on the environment[2]. I did like this:
> >>
> >> env.getConfig().registerTypeWithKryoSerializer(DateTime.class,
> >> AvroKryoSerializerUtils.JodaDateTimeSerializer.class);
> >> env.getConfig().registerTypeWithKryoSerializer(LocalDate.class,
> >> AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
> >> env.getConfig().registerTypeWithKryoSerializer(LocalTime.class,
> >> AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);
> >>
> >> and I added the joda and avro dependency on the pom.xml:
> >>
> >> <dependency>
> >> <groupId>joda-time</groupId>
> >> <artifactId>joda-time</artifactId>
> >> </dependency>
> >> <dependency>
> >> <groupId>org.apache.flink</groupId>
> >> <artifactId>flink-avro</artifactId>
> >> <version>${project.version}</version>
> >> </dependency>
> >>
> >> I also tested using addDefaultKryoSerializer but I got the same error.
> >> For some reason, it is still not working. Does anyone have some hint
> >> of what could be happening?
> >>
> >> Thanks! Felipe
> >> [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java
> >> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
> >>
> >> --
> >> -- Felipe Gutierrez
> >> -- skype: felipe.o.gutierrez
> >> -- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?

Felipe Gutierrez
yes. again it trapped me. It was the /etc/hosts that I change when I
am using VMs. Now, even with the INFO "INFO
org.apache.flink.api.java.typeutils.TypeExtractor             - class
org.joda.time.DateTime does not contain a getter for field Millis" my
program is running. Thanks!
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Sat, Jun 13, 2020 at 12:40 PM Till Rohrmann <[hidden email]> wrote:

>
> Hi Felipe,
>
> the problem why you cannot submit a job to the Flink cluster is that the client cannot reach the blob server:
>
> Caused by: java.io.IOException: Could not connect to BlobServer at address localhost/192.168.56.1:35193
>
> Could you check whether the cluster has been properly started and is reachable under 192.168.56.1:35193? You could also share the cluster logs with us to further debug the problem.
>
> Cheers,
> Till
>
> On Sat, Jun 13, 2020 at 12:09 PM Felipe Gutierrez <[hidden email]> wrote:
>>
>> Hi, I tried to change the joda.time maven version to be the same of
>> the flink-training example and I am getting this error on IntelliJ.
>> Maybe it is more precislyL
>>
>> 2020-06-13 12:04:27,333 INFO
>> org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No
>> state backend has been configured, using default (Memory / JobManager)
>> MemoryStateBackend (data in heap memory / checkpoints to JobManager)
>> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
>> maxStateSize: 5242880)
>> 2020-06-13 12:04:27,333 INFO
>> org.apache.flink.runtime.taskmanager.Task                    [] -
>> reducer -> flat-output -> Sink: sink (4/4)
>> (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING.
>> 2020-06-13 12:04:27,337 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
>> reducer -> flat-output -> Sink: sink (4/4)
>> (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING.
>> 2020-06-13 12:04:27,360 INFO
>> org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No
>> state backend has been configured, using default (Memory / JobManager)
>> MemoryStateBackend (data in heap memory / checkpoints to JobManager)
>> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
>> maxStateSize: 5242880)
>> 2020-06-13 12:04:27,360 INFO
>> org.apache.flink.runtime.taskmanager.Task                    [] -
>> reducer -> flat-output -> Sink: sink (3/4)
>> (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING.
>> 2020-06-13 12:04:27,362 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
>> reducer -> flat-output -> Sink: sink (3/4)
>> (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING.
>> 2020-06-13 12:04:27,376 INFO
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
>> Initializing heap keyed state backend with stream factory.
>> 2020-06-13 12:04:27,381 INFO
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
>> Initializing heap keyed state backend with stream factory.
>> 2020-06-13 12:04:27,381 INFO
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
>> Initializing heap keyed state backend with stream factory.
>> 2020-06-13 12:04:27,389 INFO
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
>> Initializing heap keyed state backend with stream factory.
>> 2020-06-13 12:04:27,511 WARN
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] -
>> Falling back to default Kryo serializer because Chill serializer
>> couldn't be found.
>> java.lang.reflect.InvocationTargetException: null
>> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
>> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:?]
>> at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:?]
>> at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:436)
>> [classes/:?]
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:454)
>> [classes/:?]
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:289)
>> [classes/:?]
>> at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
>> [classes/:?]
>> at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
>> [classes/:?]
>> at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>> [classes/:?]
>> at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71)
>> [classes/:?]
>> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:117)
>> [classes/:?]
>> at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
>> [classes/:?]
>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>> [classes/:?]
>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>> [classes/:?]
>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>> [classes/:?]
>> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> [classes/:?]
>> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> [classes/:?]
>> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>> [classes/:?]
>> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>> [classes/:?]
>> at org.apache.flink.streaming.examples.aggregate.util.TaxiRideSource.generateTaxiRideArray(TaxiRideSource.java:114)
>> [classes/:?]
>> at org.apache.flink.streaming.examples.aggregate.util.TaxiRideSource.run(TaxiRideSource.java:96)
>> [classes/:?]
>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> [classes/:?]
>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> [classes/:?]
>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
>> [classes/:?]
>> Caused by: com.esotericsoftware.kryo.KryoException: Unable to resolve
>> type variable: A
>> at com.esotericsoftware.kryo.util.GenericsUtil.resolveTypeVariable(GenericsUtil.java:114)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.util.GenericsUtil.resolveTypeVariable(GenericsUtil.java:86)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.util.GenericsUtil.resolveType(GenericsUtil.java:41)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.util.Generics$GenericType.initialize(Generics.java:263)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.util.Generics$GenericType.<init>(Generics.java:228)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.util.Generics$GenericType.initialize(Generics.java:242)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.util.Generics$GenericType.<init>(Generics.java:228)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.serializers.CachedFields.addField(CachedFields.java:139)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.serializers.CachedFields.rebuild(CachedFields.java:99)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:82)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:68)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at org.apache.flink.runtime.types.ScalaCollectionsRegistrar.useField$1(FlinkScalaKryoInstantiator.scala:93)
>> ~[classes/:?]
>> at org.apache.flink.runtime.types.ScalaCollectionsRegistrar.apply(FlinkScalaKryoInstantiator.scala:98)
>> ~[classes/:?]
>> at org.apache.flink.runtime.types.AllScalaRegistrar.apply(FlinkScalaKryoInstantiator.scala:172)
>> ~[classes/:?]
>> at org.apache.flink.runtime.types.FlinkScalaKryoInstantiator.newKryo(FlinkScalaKryoInstantiator.scala:84)
>> ~[classes/:?]
>> ... 25 more
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com
>>
>> On Sat, Jun 13, 2020 at 10:57 AM Felipe Gutierrez
>> <[hidden email]> wrote:
>> >
>> > Hi Yun, it says an INFO "class org.joda.time.DateTime cannot be used
>> > as a POJO type because not all fields are valid POJO fields, and must
>> > be processed as GenericType. Please read the Flink documentation on
>> > "Data Types & Serialization" for details of the effect on
>> > performance", however I cannot submit my job. It is strange because I
>> > can start and run it on Intellij, but not on the standalone cluster in
>> > my machine.
>> >
>> > 2020-06-13 10:50:56,051 INFO  org.apache.flink.core.fs.FileSystem
>> >                      - Hadoop is not in the classpath/dependencies.
>> > The extended set of supported File Systems via Hadoop is not
>> > available.
>> > 2020-06-13 10:50:56,143 INFO
>> > org.apache.flink.runtime.security.modules.HadoopModuleFactory  -
>> > Cannot create Hadoop Security Module because Hadoop cannot be found in
>> > the Classpath.
>> > 2020-06-13 10:50:56,164 INFO
>> > org.apache.flink.runtime.security.modules.JaasModule          - Jaas
>> > file will be created as /tmp/jaas-837993701496785981.conf.
>> > 2020-06-13 10:50:56,169 INFO
>> > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory
>> >  - Cannot install HadoopSecurityContext because Hadoop cannot be found
>> > in the Classpath.
>> > 2020-06-13 10:50:56,169 WARN
>> > org.apache.flink.runtime.security.SecurityUtils               - Unable
>> > to install incompatible security context factory
>> > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory
>> > 2020-06-13 10:50:56,171 INFO  org.apache.flink.client.cli.CliFrontend
>> >                      - Running 'run' command.
>> > 2020-06-13 10:50:56,242 INFO  org.apache.flink.client.cli.CliFrontend
>> >                      - Building program from JAR file
>> > 2020-06-13 10:50:57,084 INFO  org.apache.flink.client.ClientUtils
>> >                      - Starting program (detached: false)
>> > 2020-06-13 10:50:59,021 INFO
>> > org.apache.flink.api.java.typeutils.TypeExtractor             - class
>> > org.joda.time.DateTime does not contain a getter for field iMillis
>> > 2020-06-13 10:50:59,021 INFO
>> > org.apache.flink.api.java.typeutils.TypeExtractor             - class
>> > org.joda.time.DateTime does not contain a setter for field iMillis
>> > 2020-06-13 10:50:59,021 INFO
>> > org.apache.flink.api.java.typeutils.TypeExtractor             - Class
>> > class org.joda.time.DateTime cannot be used as a POJO type because not
>> > all fields are valid POJO fields, and must be processed as
>> > GenericType. Please read the Flink documentation on "Data Types &
>> > Serialization" for details of the effect on performance.
>> > 2020-06-13 10:50:59,028 INFO
>> > org.apache.flink.api.java.typeutils.TypeExtractor             - class
>> > org.joda.time.DateTime does not contain a getter for field iMillis
>> > 2020-06-13 10:50:59,028 INFO
>> > org.apache.flink.api.java.typeutils.TypeExtractor             - class
>> > org.joda.time.DateTime does not contain a setter for field iMillis
>> > 2020-06-13 10:50:59,028 INFO
>> > org.apache.flink.api.java.typeutils.TypeExtractor             - Class
>> > class org.joda.time.DateTime cannot be used as a POJO type because not
>> > all fields are valid POJO fields, and must be processed as
>> > GenericType. Please read the Flink documentation on "Data Types &
>> > Serialization" for details of the effect on performance.
>> > 2020-06-13 10:53:19,508 WARN  org.apache.flink.util.ExecutorUtils
>> >                      - ExecutorService did not terminate in time.
>> > Shutting it down now.
>> > 2020-06-13 10:53:19,510 ERROR org.apache.flink.client.cli.CliFrontend
>> >                      - Error while running the command.
>> > org.apache.flink.client.program.ProgramInvocationException: The main
>> > method caused an error: java.util.concurrent.ExecutionException:
>> > org.apache.flink.runtime.client.JobSubmissionException: Failed to
>> > submit JobGraph.
>> > at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> > at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:148)
>> > at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
>> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>> > at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
>> > at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>> > at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
>> > Caused by: java.lang.RuntimeException:
>> > java.util.concurrent.ExecutionException:
>> > org.apache.flink.runtime.client.JobSubmissionException: Failed to
>> > submit JobGraph.
>> > at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:276)
>> > at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1764)
>> > at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:106)
>> > at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:72)
>> > at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
>> > at org.apache.flink.streaming.examples.aggregate.TaxiRideCountPreAggregate.main(TaxiRideCountPreAggregate.java:136)
>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:498)
>> > at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>> > ... 8 more
>> > Caused by: java.util.concurrent.ExecutionException:
>> > org.apache.flink.runtime.client.JobSubmissionException: Failed to
>> > submit JobGraph.
>> > at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>> > at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1759)
>> > ... 17 more
>> > Caused by: org.apache.flink.runtime.client.JobSubmissionException:
>> > Failed to submit JobGraph.
>> > at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
>> > at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
>> > at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
>> > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>> > at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>> > at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
>> > at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>> > at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>> > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>> > at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>> > at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
>> > at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>> > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> > at java.lang.Thread.run(Thread.java:748)
>> > Caused by: org.apache.flink.runtime.rest.util.RestClientException:
>> > [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not
>> > upload job files.
>> > at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:169)
>> > at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)
>> > at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)
>> > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>> > at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
>> > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> > at java.lang.Thread.run(Thread.java:748)
>> > Caused by: org.apache.flink.util.FlinkException: Could not upload job files.
>> > at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:80)
>> > at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:167)
>> > ... 7 more
>> > Caused by: java.io.IOException: Could not connect to BlobServer at
>> > address localhost/192.168.56.1:35193
>> > at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100)
>> > at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:167)
>> > at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:76)
>> > ... 8 more
>> > Caused by: java.net.ConnectException: Connection timed out (Connection
>> > timed out)
>> > at java.net.PlainSocketImpl.socketConnect(Native Method)
>> > at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>> > at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>> > at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>> > at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>> > at java.net.Socket.connect(Socket.java:607)
>> > at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95)
>> > ... 10 more
>> > ]
>> > at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
>> > at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
>> > at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
>> > at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
>> > ... 4 more
>> > --
>> > -- Felipe Gutierrez
>> > -- skype: felipe.o.gutierrez
>> > -- https://felipeogutierrez.blogspot.com
>> >
>> > On Sat, Jun 13, 2020 at 5:08 AM Yun Gao <[hidden email]> wrote:
>> > >
>> > > Hi Felipe,
>> > >
>> > >    I tested the basic RideCleansingExercise[1] jobs that uses the TaxiRide type locally and it seems to be able to startup normally.
>> > >
>> > >    Could you also share your current executing code and the full stacktrace of the exception ?
>> > >
>> > > Best,
>> > >  Yun
>> > >
>> > >  [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/basics/RideCleansingExercise.java
>> > >
>> > > ------------------Original Mail ------------------
>> > > Sender:Felipe Gutierrez <[hidden email]>
>> > > Send Date:Fri Jun 12 23:11:28 2020
>> > > Recipients:user <[hidden email]>
>> > > Subject:How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?
>> > >>
>> > >> Hi,
>> > >>
>> > >> I am using the flink training exercise TaxiRide [1] to execute a
>> > >> stream count of events. On the cluster and on my local machine I am
>> > >> receiving the message that joda.Time cannot be serialized "class
>> > >> org.joda.time.LocalDateTime is not a valid POJO type". However it is
>> > >> starting the job on the cluster, but not in my local machine. So I
>> > >> searched in the internet and it is requested to register the jodaTime
>> > >> class on the environment[2]. I did like this:
>> > >>
>> > >> env.getConfig().registerTypeWithKryoSerializer(DateTime.class,
>> > >> AvroKryoSerializerUtils.JodaDateTimeSerializer.class);
>> > >> env.getConfig().registerTypeWithKryoSerializer(LocalDate.class,
>> > >> AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
>> > >> env.getConfig().registerTypeWithKryoSerializer(LocalTime.class,
>> > >> AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);
>> > >>
>> > >> and I added the joda and avro dependency on the pom.xml:
>> > >>
>> > >> <dependency>
>> > >> <groupId>joda-time</groupId>
>> > >> <artifactId>joda-time</artifactId>
>> > >> </dependency>
>> > >> <dependency>
>> > >> <groupId>org.apache.flink</groupId>
>> > >> <artifactId>flink-avro</artifactId>
>> > >> <version>${project.version}</version>
>> > >> </dependency>
>> > >>
>> > >> I also tested using addDefaultKryoSerializer but I got the same error.
>> > >> For some reason, it is still not working. Does anyone have some hint
>> > >> of what could be happening?
>> > >>
>> > >> Thanks! Felipe
>> > >> [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java
>> > >> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
>> > >>
>> > >> --
>> > >> -- Felipe Gutierrez
>> > >> -- skype: felipe.o.gutierrez
>> > >> -- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?

Till Rohrmann
Great to hear that it is now working.

Cheers,
Till

On Sat, Jun 13, 2020, 12:58 Felipe Gutierrez <[hidden email]> wrote:
yes. again it trapped me. It was the /etc/hosts that I change when I
am using VMs. Now, even with the INFO "INFO
org.apache.flink.api.java.typeutils.TypeExtractor             - class
org.joda.time.DateTime does not contain a getter for field Millis" my
program is running. Thanks!
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Sat, Jun 13, 2020 at 12:40 PM Till Rohrmann <[hidden email]> wrote:
>
> Hi Felipe,
>
> the problem why you cannot submit a job to the Flink cluster is that the client cannot reach the blob server:
>
> Caused by: java.io.IOException: Could not connect to BlobServer at address localhost/192.168.56.1:35193
>
> Could you check whether the cluster has been properly started and is reachable under 192.168.56.1:35193? You could also share the cluster logs with us to further debug the problem.
>
> Cheers,
> Till
>
> On Sat, Jun 13, 2020 at 12:09 PM Felipe Gutierrez <[hidden email]> wrote:
>>
>> Hi, I tried to change the joda.time maven version to be the same of
>> the flink-training example and I am getting this error on IntelliJ.
>> Maybe it is more precislyL
>>
>> 2020-06-13 12:04:27,333 INFO
>> org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No
>> state backend has been configured, using default (Memory / JobManager)
>> MemoryStateBackend (data in heap memory / checkpoints to JobManager)
>> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
>> maxStateSize: 5242880)
>> 2020-06-13 12:04:27,333 INFO
>> org.apache.flink.runtime.taskmanager.Task                    [] -
>> reducer -> flat-output -> Sink: sink (4/4)
>> (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING.
>> 2020-06-13 12:04:27,337 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
>> reducer -> flat-output -> Sink: sink (4/4)
>> (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING.
>> 2020-06-13 12:04:27,360 INFO
>> org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No
>> state backend has been configured, using default (Memory / JobManager)
>> MemoryStateBackend (data in heap memory / checkpoints to JobManager)
>> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
>> maxStateSize: 5242880)
>> 2020-06-13 12:04:27,360 INFO
>> org.apache.flink.runtime.taskmanager.Task                    [] -
>> reducer -> flat-output -> Sink: sink (3/4)
>> (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING.
>> 2020-06-13 12:04:27,362 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
>> reducer -> flat-output -> Sink: sink (3/4)
>> (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING.
>> 2020-06-13 12:04:27,376 INFO
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
>> Initializing heap keyed state backend with stream factory.
>> 2020-06-13 12:04:27,381 INFO
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
>> Initializing heap keyed state backend with stream factory.
>> 2020-06-13 12:04:27,381 INFO
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
>> Initializing heap keyed state backend with stream factory.
>> 2020-06-13 12:04:27,389 INFO
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
>> Initializing heap keyed state backend with stream factory.
>> 2020-06-13 12:04:27,511 WARN
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] -
>> Falling back to default Kryo serializer because Chill serializer
>> couldn't be found.
>> java.lang.reflect.InvocationTargetException: null
>> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
>> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:?]
>> at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:?]
>> at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:436)
>> [classes/:?]
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:454)
>> [classes/:?]
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:289)
>> [classes/:?]
>> at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
>> [classes/:?]
>> at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
>> [classes/:?]
>> at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>> [classes/:?]
>> at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71)
>> [classes/:?]
>> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:117)
>> [classes/:?]
>> at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
>> [classes/:?]
>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>> [classes/:?]
>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>> [classes/:?]
>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>> [classes/:?]
>> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> [classes/:?]
>> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> [classes/:?]
>> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>> [classes/:?]
>> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>> [classes/:?]
>> at org.apache.flink.streaming.examples.aggregate.util.TaxiRideSource.generateTaxiRideArray(TaxiRideSource.java:114)
>> [classes/:?]
>> at org.apache.flink.streaming.examples.aggregate.util.TaxiRideSource.run(TaxiRideSource.java:96)
>> [classes/:?]
>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> [classes/:?]
>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> [classes/:?]
>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
>> [classes/:?]
>> Caused by: com.esotericsoftware.kryo.KryoException: Unable to resolve
>> type variable: A
>> at com.esotericsoftware.kryo.util.GenericsUtil.resolveTypeVariable(GenericsUtil.java:114)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.util.GenericsUtil.resolveTypeVariable(GenericsUtil.java:86)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.util.GenericsUtil.resolveType(GenericsUtil.java:41)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.util.Generics$GenericType.initialize(Generics.java:263)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.util.Generics$GenericType.<init>(Generics.java:228)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.util.Generics$GenericType.initialize(Generics.java:242)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.util.Generics$GenericType.<init>(Generics.java:228)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.serializers.CachedFields.addField(CachedFields.java:139)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.serializers.CachedFields.rebuild(CachedFields.java:99)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:82)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:68)
>> ~[kryo-5.0.0-RC1.jar:?]
>> at org.apache.flink.runtime.types.ScalaCollectionsRegistrar.useField$1(FlinkScalaKryoInstantiator.scala:93)
>> ~[classes/:?]
>> at org.apache.flink.runtime.types.ScalaCollectionsRegistrar.apply(FlinkScalaKryoInstantiator.scala:98)
>> ~[classes/:?]
>> at org.apache.flink.runtime.types.AllScalaRegistrar.apply(FlinkScalaKryoInstantiator.scala:172)
>> ~[classes/:?]
>> at org.apache.flink.runtime.types.FlinkScalaKryoInstantiator.newKryo(FlinkScalaKryoInstantiator.scala:84)
>> ~[classes/:?]
>> ... 25 more
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com
>>
>> On Sat, Jun 13, 2020 at 10:57 AM Felipe Gutierrez
>> <[hidden email]> wrote:
>> >
>> > Hi Yun, it says an INFO "class org.joda.time.DateTime cannot be used
>> > as a POJO type because not all fields are valid POJO fields, and must
>> > be processed as GenericType. Please read the Flink documentation on
>> > "Data Types & Serialization" for details of the effect on
>> > performance", however I cannot submit my job. It is strange because I
>> > can start and run it on Intellij, but not on the standalone cluster in
>> > my machine.
>> >
>> > 2020-06-13 10:50:56,051 INFO  org.apache.flink.core.fs.FileSystem
>> >                      - Hadoop is not in the classpath/dependencies.
>> > The extended set of supported File Systems via Hadoop is not
>> > available.
>> > 2020-06-13 10:50:56,143 INFO
>> > org.apache.flink.runtime.security.modules.HadoopModuleFactory  -
>> > Cannot create Hadoop Security Module because Hadoop cannot be found in
>> > the Classpath.
>> > 2020-06-13 10:50:56,164 INFO
>> > org.apache.flink.runtime.security.modules.JaasModule          - Jaas
>> > file will be created as /tmp/jaas-837993701496785981.conf.
>> > 2020-06-13 10:50:56,169 INFO
>> > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory
>> >  - Cannot install HadoopSecurityContext because Hadoop cannot be found
>> > in the Classpath.
>> > 2020-06-13 10:50:56,169 WARN
>> > org.apache.flink.runtime.security.SecurityUtils               - Unable
>> > to install incompatible security context factory
>> > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory
>> > 2020-06-13 10:50:56,171 INFO  org.apache.flink.client.cli.CliFrontend
>> >                      - Running 'run' command.
>> > 2020-06-13 10:50:56,242 INFO  org.apache.flink.client.cli.CliFrontend
>> >                      - Building program from JAR file
>> > 2020-06-13 10:50:57,084 INFO  org.apache.flink.client.ClientUtils
>> >                      - Starting program (detached: false)
>> > 2020-06-13 10:50:59,021 INFO
>> > org.apache.flink.api.java.typeutils.TypeExtractor             - class
>> > org.joda.time.DateTime does not contain a getter for field iMillis
>> > 2020-06-13 10:50:59,021 INFO
>> > org.apache.flink.api.java.typeutils.TypeExtractor             - class
>> > org.joda.time.DateTime does not contain a setter for field iMillis
>> > 2020-06-13 10:50:59,021 INFO
>> > org.apache.flink.api.java.typeutils.TypeExtractor             - Class
>> > class org.joda.time.DateTime cannot be used as a POJO type because not
>> > all fields are valid POJO fields, and must be processed as
>> > GenericType. Please read the Flink documentation on "Data Types &
>> > Serialization" for details of the effect on performance.
>> > 2020-06-13 10:50:59,028 INFO
>> > org.apache.flink.api.java.typeutils.TypeExtractor             - class
>> > org.joda.time.DateTime does not contain a getter for field iMillis
>> > 2020-06-13 10:50:59,028 INFO
>> > org.apache.flink.api.java.typeutils.TypeExtractor             - class
>> > org.joda.time.DateTime does not contain a setter for field iMillis
>> > 2020-06-13 10:50:59,028 INFO
>> > org.apache.flink.api.java.typeutils.TypeExtractor             - Class
>> > class org.joda.time.DateTime cannot be used as a POJO type because not
>> > all fields are valid POJO fields, and must be processed as
>> > GenericType. Please read the Flink documentation on "Data Types &
>> > Serialization" for details of the effect on performance.
>> > 2020-06-13 10:53:19,508 WARN  org.apache.flink.util.ExecutorUtils
>> >                      - ExecutorService did not terminate in time.
>> > Shutting it down now.
>> > 2020-06-13 10:53:19,510 ERROR org.apache.flink.client.cli.CliFrontend
>> >                      - Error while running the command.
>> > org.apache.flink.client.program.ProgramInvocationException: The main
>> > method caused an error: java.util.concurrent.ExecutionException:
>> > org.apache.flink.runtime.client.JobSubmissionException: Failed to
>> > submit JobGraph.
>> > at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> > at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:148)
>> > at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
>> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>> > at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
>> > at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>> > at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
>> > Caused by: java.lang.RuntimeException:
>> > java.util.concurrent.ExecutionException:
>> > org.apache.flink.runtime.client.JobSubmissionException: Failed to
>> > submit JobGraph.
>> > at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:276)
>> > at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1764)
>> > at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:106)
>> > at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:72)
>> > at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
>> > at org.apache.flink.streaming.examples.aggregate.TaxiRideCountPreAggregate.main(TaxiRideCountPreAggregate.java:136)
>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:498)
>> > at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>> > ... 8 more
>> > Caused by: java.util.concurrent.ExecutionException:
>> > org.apache.flink.runtime.client.JobSubmissionException: Failed to
>> > submit JobGraph.
>> > at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>> > at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1759)
>> > ... 17 more
>> > Caused by: org.apache.flink.runtime.client.JobSubmissionException:
>> > Failed to submit JobGraph.
>> > at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
>> > at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
>> > at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
>> > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>> > at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>> > at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
>> > at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>> > at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>> > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>> > at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>> > at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
>> > at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>> > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> > at java.lang.Thread.run(Thread.java:748)
>> > Caused by: org.apache.flink.runtime.rest.util.RestClientException:
>> > [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not
>> > upload job files.
>> > at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:169)
>> > at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)
>> > at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)
>> > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>> > at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
>> > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> > at java.lang.Thread.run(Thread.java:748)
>> > Caused by: org.apache.flink.util.FlinkException: Could not upload job files.
>> > at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:80)
>> > at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:167)
>> > ... 7 more
>> > Caused by: java.io.IOException: Could not connect to BlobServer at
>> > address localhost/192.168.56.1:35193
>> > at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100)
>> > at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:167)
>> > at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:76)
>> > ... 8 more
>> > Caused by: java.net.ConnectException: Connection timed out (Connection
>> > timed out)
>> > at java.net.PlainSocketImpl.socketConnect(Native Method)
>> > at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>> > at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>> > at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>> > at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>> > at java.net.Socket.connect(Socket.java:607)
>> > at org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95)
>> > ... 10 more
>> > ]
>> > at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
>> > at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
>> > at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
>> > at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
>> > ... 4 more
>> > --
>> > -- Felipe Gutierrez
>> > -- skype: felipe.o.gutierrez
>> > -- https://felipeogutierrez.blogspot.com
>> >
>> > On Sat, Jun 13, 2020 at 5:08 AM Yun Gao <[hidden email]> wrote:
>> > >
>> > > Hi Felipe,
>> > >
>> > >    I tested the basic RideCleansingExercise[1] jobs that uses the TaxiRide type locally and it seems to be able to startup normally.
>> > >
>> > >    Could you also share your current executing code and the full stacktrace of the exception ?
>> > >
>> > > Best,
>> > >  Yun
>> > >
>> > >  [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/basics/RideCleansingExercise.java
>> > >
>> > > ------------------Original Mail ------------------
>> > > Sender:Felipe Gutierrez <[hidden email]>
>> > > Send Date:Fri Jun 12 23:11:28 2020
>> > > Recipients:user <[hidden email]>
>> > > Subject:How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?
>> > >>
>> > >> Hi,
>> > >>
>> > >> I am using the flink training exercise TaxiRide [1] to execute a
>> > >> stream count of events. On the cluster and on my local machine I am
>> > >> receiving the message that joda.Time cannot be serialized "class
>> > >> org.joda.time.LocalDateTime is not a valid POJO type". However it is
>> > >> starting the job on the cluster, but not in my local machine. So I
>> > >> searched in the internet and it is requested to register the jodaTime
>> > >> class on the environment[2]. I did like this:
>> > >>
>> > >> env.getConfig().registerTypeWithKryoSerializer(DateTime.class,
>> > >> AvroKryoSerializerUtils.JodaDateTimeSerializer.class);
>> > >> env.getConfig().registerTypeWithKryoSerializer(LocalDate.class,
>> > >> AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
>> > >> env.getConfig().registerTypeWithKryoSerializer(LocalTime.class,
>> > >> AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);
>> > >>
>> > >> and I added the joda and avro dependency on the pom.xml:
>> > >>
>> > >> <dependency>
>> > >> <groupId>joda-time</groupId>
>> > >> <artifactId>joda-time</artifactId>
>> > >> </dependency>
>> > >> <dependency>
>> > >> <groupId>org.apache.flink</groupId>
>> > >> <artifactId>flink-avro</artifactId>
>> > >> <version>${project.version}</version>
>> > >> </dependency>
>> > >>
>> > >> I also tested using addDefaultKryoSerializer but I got the same error.
>> > >> For some reason, it is still not working. Does anyone have some hint
>> > >> of what could be happening?
>> > >>
>> > >> Thanks! Felipe
>> > >> [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java
>> > >> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
>> > >>
>> > >> --
>> > >> -- Felipe Gutierrez
>> > >> -- skype: felipe.o.gutierrez
>> > >> -- https://felipeogutierrez.blogspot.com