Anyway to read Cassandra as DataStream/DataSet in Flink?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Anyway to read Cassandra as DataStream/DataSet in Flink?

James Yu
Hi,

I tried to treat Cassandra as the source of data in Flink with the information provided in the following links:
- https://stackoverflow.com/questions/43067681/read-data-from-cassandra-for-processing-in-flink
- https://www.javatips.net/api/flink-master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java

I got the AsyncWaitOperator exception when I run the task. According the the 1st link, this exception occurs due to network problem. However, the strange thing is that I am running Cassandra on my local VM with only 10 rows of data in the target table.

@Jicaar in 1st link also mentions that switching from RichAsyncFunction to RichMapFunction can avoid the AsyncWaitOperator exception, can someone with similar experience share how to do it in RichMapFunction?

AsyncWaitOperator exception trace -->
02:21:00.164 [AsyncIO-Emitter-Thread (Source: Custom Source -> async wait operator -> (Flat Map, Sink: Unnamed) (1/1))] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> async wait operator -> (Flat Map, Sink: Unnamed) (1/1) (2809cef511194e612b2cc65510f78c64) switched from RUNNING to FAILED.
java.lang.Exception: An async function call terminated with an exception. Failing the AsyncWaitOperator.
  at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137) [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85) [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:611) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:572) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:133) [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  ... 2 common frames omitted
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader)
contextClassLoader (java.lang.Thread)
threads (java.lang.ThreadGroup)
groups (java.lang.ThreadGroup)
threadGroup (io.netty.util.concurrent.DefaultThreadFactory)
val$backingThreadFactory (com.google.common.util.concurrent.ThreadFactoryBuilder$1)
threadFactory (java.util.concurrent.ThreadPoolExecutor)
delegate (com.google.common.util.concurrent.MoreExecutors$ListeningDecorator)
blockingExecutor (com.datastax.driver.core.Cluster$Manager)
manager (com.datastax.driver.core.Host)
triedHosts (com.datastax.driver.core.ExecutionInfo)
info (com.datastax.driver.core.ArrayBackedResultSet$SinglePage)
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:82) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505) ~[kryo-2.24.0.jar:na]
  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:182) ~[flink-core-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  ... 10 common frames omitted
Caused by: java.util.ConcurrentModificationException: null
  at java.util.Vector$Itr.checkForComodification(Vector.java:1184) ~[na:1.8.0_60]
  at java.util.Vector$Itr.next(Vector.java:1137) ~[na:1.8.0_60]
  at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  ... 68 common frames omitted



This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275
Reply | Threaded
Open this post in threaded view
|

Re: Anyway to read Cassandra as DataStream/DataSet in Flink?

Fabian Hueske-2
Hi James,

The answer to your question depends on your use case.
The AsyncIOFunction approach works if you have a DataStream that you would like to enrich with data in a Cassandra table but not if you would like to create a DataStream from a Cassandra table.

The Flink code base contains a CassandraInputFormat [1], but I don't think it is extensively used. You might need to adjust it to your needs.
InputFormats are the source connectors for the DataSet API but can also be used to from the DataStream API to read bounded data. However, InputFormats do not guarantee the order of emitted records. Hence, they are not well suited for time-based applications.

Best, Fabian


2018-03-29 23:09 GMT+02:00 James Yu <[hidden email]>:
Hi,

I tried to treat Cassandra as the source of data in Flink with the information provided in the following links:
- https://stackoverflow.com/questions/43067681/read-data-from-cassandra-for-processing-in-flink
- https://www.javatips.net/api/flink-master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java

I got the AsyncWaitOperator exception when I run the task. According the the 1st link, this exception occurs due to network problem. However, the strange thing is that I am running Cassandra on my local VM with only 10 rows of data in the target table.

@Jicaar in 1st link also mentions that switching from RichAsyncFunction to RichMapFunction can avoid the AsyncWaitOperator exception, can someone with similar experience share how to do it in RichMapFunction?

AsyncWaitOperator exception trace -->
02:21:00.164 [AsyncIO-Emitter-Thread (Source: Custom Source -> async wait operator -> (Flat Map, Sink: Unnamed) (1/1))] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> async wait operator -> (Flat Map, Sink: Unnamed) (1/1) (2809cef511194e612b2cc65510f78c64) switched from RUNNING to FAILED.
java.lang.Exception: An async function call terminated with an exception. Failing the AsyncWaitOperator.
  at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137) [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85) [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:611) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:572) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:133) [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  ... 2 common frames omitted
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader)
contextClassLoader (java.lang.Thread)
threads (java.lang.ThreadGroup)
groups (java.lang.ThreadGroup)
threadGroup (io.netty.util.concurrent.DefaultThreadFactory)
val$backingThreadFactory (com.google.common.util.concurrent.ThreadFactoryBuilder$1)
threadFactory (java.util.concurrent.ThreadPoolExecutor)
delegate (com.google.common.util.concurrent.MoreExecutors$ListeningDecorator)
blockingExecutor (com.datastax.driver.core.Cluster$Manager)
manager (com.datastax.driver.core.Host)
triedHosts (com.datastax.driver.core.ExecutionInfo)
info (com.datastax.driver.core.ArrayBackedResultSet$SinglePage)
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:82) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505) ~[kryo-2.24.0.jar:na]
  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:182) ~[flink-core-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  ... 10 common frames omitted
Caused by: java.util.ConcurrentModificationException: null
  at java.util.Vector$Itr.checkForComodification(Vector.java:1184) ~[na:1.8.0_60]
  at java.util.Vector$Itr.next(Vector.java:1137) ~[na:1.8.0_60]
  at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  ... 68 common frames omitted



This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275