Hi,
I tried to treat Cassandra as the source of data in Flink with the information provided in the following links: - https://stackoverflow.com/questions/43067681/read-data-from-cassandra-for-processing-in-flink - https://www.javatips.net/api/flink-master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java I got the AsyncWaitOperator exception when I run the task. According the the 1st link, this exception occurs due to network problem. However, the strange thing is that I am running Cassandra on my local VM with only 10 rows of data in the target table. AsyncWaitOperator exception trace --> 02:21:00.164 [AsyncIO-Emitter-Thread (Source: Custom Source -> async wait operator -> (Flat Map, Sink: Unnamed) (1/1))] INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> async wait operator -> (Flat Map, Sink: Unnamed) (1/1) (2809cef511194e612b2cc65510f78c64) switched from RUNNING to FAILED. java.lang.Exception: An async function call terminated with an exception. Failing the AsyncWaitOperator. at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137) [flink-streaming-java_2.11-1.4.2.jar:1.4.2] at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85) [flink-streaming-java_2.11-1.4.2.jar:1.4.2] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60] Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:611) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:572) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2] at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2] at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2] at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2] at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:133) [flink-streaming-java_2.11-1.4.2.jar:1.4.2] ... 2 common frames omitted Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException Serialization trace: classes (sun.misc.Launcher$AppClassLoader) classloader (java.security.ProtectionDomain) context (java.security.AccessControlContext) acc (org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader) contextClassLoader (java.lang.Thread) threads (java.lang.ThreadGroup) groups (java.lang.ThreadGroup) threadGroup (io.netty.util.concurrent.DefaultThreadFactory) val$backingThreadFactory (com.google.common.util.concurrent.ThreadFactoryBuilder$1) threadFactory (java.util.concurrent.ThreadPoolExecutor) delegate (com.google.common.util.concurrent.MoreExecutors$ListeningDecorator) blockingExecutor (com.datastax.driver.core.Cluster$Manager) manager (com.datastax.driver.core.Host) triedHosts (com.datastax.driver.core.ExecutionInfo) info (com.datastax.driver.core.ArrayBackedResultSet$SinglePage) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:82) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505) ~[kryo-2.24.0.jar:na] at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:182) ~[flink-core-1.4.2.jar:1.4.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2] ... 10 common frames omitted Caused by: java.util.ConcurrentModificationException: null at java.util.Vector$Itr.checkForComodification(Vector.java:1184) ~[na:1.8.0_60] at java.util.Vector$Itr.next(Vector.java:1137) ~[na:1.8.0_60] at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na] ... 68 common frames omitted This is a UTF-8 formatted mail ----------------------------------------------- James C.-C.Yu +886988713275 |
Hi James, The answer to your question depends on your use case. The AsyncIOFunction approach works if you have a DataStream that you would like to enrich with data in a Cassandra table but not if you would like to create a DataStream from a Cassandra table. The Flink code base contains a CassandraInputFormat [1], but I don't think it is extensively used. You might need to adjust it to your needs. InputFormats are the source connectors for the DataSet API but can also be used to from the DataStream API to read bounded data. However, InputFormats do not guarantee the order of emitted records. Hence, they are not well suited for time-based applications. Best, Fabian 2018-03-29 23:09 GMT+02:00 James Yu <[hidden email]>:
|
Free forum by Nabble | Edit this page |