Hi,
I tried to treat Cassandra as the source of data in Flink with the information provided in the following links:
- https://stackoverflow.com/questions/43067681/read-data- from-cassandra-for-processing- in-flink
- https://www.javatips.net/api/flink-master/flink-examples/ flink-examples-streaming/src/ main/java/org/apache/flink/ streaming/examples/async/ AsyncIOExample.java
I got the AsyncWaitOperator exception when I run the task. According the the 1st link, this exception occurs due to network problem. However, the strange thing is that I am running Cassandra on my local VM with only 10 rows of data in the target table.@Jicaar in 1st link also mentions that switching from RichAsyncFunction to RichMapFunction can avoid the AsyncWaitOperatorexception, can someone with similar experience share how to do it in RichMapFunction? AsyncWaitOperator exception trace -->02:21:00.164 [AsyncIO-Emitter-Thread (Source: Custom Source -> async wait operator -> (Flat Map, Sink: Unnamed) (1/1))] INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> async wait operator -> (Flat Map, Sink: Unnamed) (1/1) ( 2809cef511194e612b2cc65510f78c 64) switched from RUNNING to FAILED. java.lang.Exception: An async function call terminated with an exception. Failing the AsyncWaitOperator.at org.apache.flink.streaming.api.operators.async.Emitter. output(Emitter.java:137) [flink-streaming-java_2.11-1. 4.2.jar:1.4.2] at org.apache.flink.streaming.api.operators.async.Emitter. run(Emitter.java:85) [flink-streaming-java_2.11-1. 4.2.jar:1.4.2] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60] Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorExce ption: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$ CopyingChainingOutput. pushToOperator(OperatorChain. java:566) ~[flink-streaming-java_2.11-1. 4.2.jar:1.4.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$ CopyingChainingOutput.collect( OperatorChain.java:524) ~[flink-streaming-java_2.11-1. 4.2.jar:1.4.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$ CopyingChainingOutput.collect( OperatorChain.java:504) ~[flink-streaming-java_2.11-1. 4.2.jar:1.4.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$ BroadcastingOutputCollector. collect(OperatorChain.java: 611) ~[flink-streaming-java_2.11-1. 4.2.jar:1.4.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$ BroadcastingOutputCollector. collect(OperatorChain.java: 572) ~[flink-streaming-java_2.11-1. 4.2.jar:1.4.2] at org.apache.flink.streaming.api.operators. AbstractStreamOperator$ CountingOutput.collect( AbstractStreamOperator.java: 830) ~[flink-streaming-java_2.11-1. 4.2.jar:1.4.2] at org.apache.flink.streaming.api.operators. AbstractStreamOperator$ CountingOutput.collect( AbstractStreamOperator.java: 808) ~[flink-streaming-java_2.11-1. 4.2.jar:1.4.2] at org.apache.flink.streaming.api.operators. TimestampedCollector.collect( TimestampedCollector.java:51) ~[flink-streaming-java_2.11-1. 4.2.jar:1.4.2] at org.apache.flink.streaming.api.operators.async.Emitter. output(Emitter.java:133) [flink-streaming-java_2.11-1. 4.2.jar:1.4.2] ... 2 common frames omittedCaused by: com.esotericsoftware.kryo.KryoException: java.util. ConcurrentModificationExceptio n Serialization trace:classes (sun.misc.Launcher$AppClassLoader) classloader (java.security.ProtectionDomain) context (java.security.AccessControlContext) acc (org.apache.flink.runtime.execution.librarycache. FlinkUserCodeClassLoaders$ ChildFirstClassLoader) contextClassLoader (java.lang.Thread)threads (java.lang.ThreadGroup)groups (java.lang.ThreadGroup)threadGroup (io.netty.util.concurrent.DefaultThreadFactory) val$backingThreadFactory (com.google.common.util.concurrent. ThreadFactoryBuilder$1) threadFactory (java.util.concurrent.ThreadPoolExecutor) delegate (com.google.common.util.concurrent.MoreExecutors$ ListeningDecorator) blockingExecutor (com.datastax.driver.core.Cluster$Manager) manager (com.datastax.driver.core.Host) triedHosts (com.datastax.driver.core.ExecutionInfo) info (com.datastax.driver.core.ArrayBackedResultSet$ SinglePage) at com.esotericsoftware.kryo.serializers.ObjectField.write( ObjectField.java:82) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer. write(FieldSerializer.java: 495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java: 523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write( ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer. write(FieldSerializer.java: 495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo. java:599) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers. DefaultArraySerializers$ ObjectArraySerializer.write( DefaultArraySerializers.java: 348) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers. DefaultArraySerializers$ ObjectArraySerializer.write( DefaultArraySerializers.java: 289) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java: 523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write( ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer. write(FieldSerializer.java: 495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo. java:577) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write( ObjectField.java:68) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer. write(FieldSerializer.java: 495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java: 523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write( ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer. write(FieldSerializer.java: 495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo. java:599) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers. DefaultArraySerializers$ ObjectArraySerializer.write( DefaultArraySerializers.java: 348) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers. DefaultArraySerializers$ ObjectArraySerializer.write( DefaultArraySerializers.java: 289) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java: 523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write( ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer. write(FieldSerializer.java: 495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo. java:599) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers. DefaultArraySerializers$ ObjectArraySerializer.write( DefaultArraySerializers.java: 348) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers. DefaultArraySerializers$ ObjectArraySerializer.write( DefaultArraySerializers.java: 289) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java: 523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write( ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer. write(FieldSerializer.java: 495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java: 523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write( ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer. write(FieldSerializer.java: 495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java: 523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write( ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer. write(FieldSerializer.java: 495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java: 523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write( ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer. write(FieldSerializer.java: 495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java: 523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write( ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer. write(FieldSerializer.java: 495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java: 523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write( ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer. write(FieldSerializer.java: 495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java: 523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write( ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer. write(FieldSerializer.java: 495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo. java:599) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers. CollectionSerializer.write( CollectionSerializer.java:82) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers. CollectionSerializer.write( CollectionSerializer.java:22) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java: 523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write( ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer. write(FieldSerializer.java: 495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java: 523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write( ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer. write(FieldSerializer.java: 495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java: 505) ~[kryo-2.24.0.jar:na] at org.apache.flink.api.java.typeutils.runtime.kryo. KryoSerializer.copy( KryoSerializer.java:182) ~[flink-core-1.4.2.jar:1.4.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$ CopyingChainingOutput. pushToOperator(OperatorChain. java:547) ~[flink-streaming-java_2.11-1. 4.2.jar:1.4.2] ... 10 common frames omittedCaused by: java.util.ConcurrentModificationExceptio n: null at java.util.Vector$Itr.checkForComodification(Vector. java:1184) ~[na:1.8.0_60] at java.util.Vector$Itr.next(Vector.java:1137) ~[na:1.8.0_60] at com.esotericsoftware.kryo.serializers. CollectionSerializer.write( CollectionSerializer.java:74) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers. CollectionSerializer.write( CollectionSerializer.java:22) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java: 523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write( ObjectField.java:61) ~[kryo-2.24.0.jar:na] ... 68 common frames omitted
This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275
Free forum by Nabble | Edit this page |