Re: Anyway to read Cassandra as DataStream/DataSet in Flink?

Posted by Fabian Hueske-2 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Anyway-to-read-Cassandra-as-DataStream-DataSet-in-Flink-tp19249p19326.html

Hi James,

The answer to your question depends on your use case.
The AsyncIOFunction approach works if you have a DataStream that you would like to enrich with data in a Cassandra table but not if you would like to create a DataStream from a Cassandra table.

The Flink code base contains a CassandraInputFormat [1], but I don't think it is extensively used. You might need to adjust it to your needs.
InputFormats are the source connectors for the DataSet API but can also be used to from the DataStream API to read bounded data. However, InputFormats do not guarantee the order of emitted records. Hence, they are not well suited for time-based applications.

Best, Fabian

[1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java


2018-03-29 23:09 GMT+02:00 James Yu <[hidden email]>:
Hi,

I tried to treat Cassandra as the source of data in Flink with the information provided in the following links:
- https://stackoverflow.com/questions/43067681/read-data-from-cassandra-for-processing-in-flink
- https://www.javatips.net/api/flink-master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java

I got the AsyncWaitOperator exception when I run the task. According the the 1st link, this exception occurs due to network problem. However, the strange thing is that I am running Cassandra on my local VM with only 10 rows of data in the target table.

@Jicaar in 1st link also mentions that switching from RichAsyncFunction to RichMapFunction can avoid the AsyncWaitOperator exception, can someone with similar experience share how to do it in RichMapFunction?

AsyncWaitOperator exception trace -->
02:21:00.164 [AsyncIO-Emitter-Thread (Source: Custom Source -> async wait operator -> (Flat Map, Sink: Unnamed) (1/1))] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> async wait operator -> (Flat Map, Sink: Unnamed) (1/1) (2809cef511194e612b2cc65510f78c64) switched from RUNNING to FAILED.
java.lang.Exception: An async function call terminated with an exception. Failing the AsyncWaitOperator.
  at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137) [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85) [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:611) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:572) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:133) [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  ... 2 common frames omitted
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader)
contextClassLoader (java.lang.Thread)
threads (java.lang.ThreadGroup)
groups (java.lang.ThreadGroup)
threadGroup (io.netty.util.concurrent.DefaultThreadFactory)
val$backingThreadFactory (com.google.common.util.concurrent.ThreadFactoryBuilder$1)
threadFactory (java.util.concurrent.ThreadPoolExecutor)
delegate (com.google.common.util.concurrent.MoreExecutors$ListeningDecorator)
blockingExecutor (com.datastax.driver.core.Cluster$Manager)
manager (com.datastax.driver.core.Host)
triedHosts (com.datastax.driver.core.ExecutionInfo)
info (com.datastax.driver.core.ArrayBackedResultSet$SinglePage)
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:82) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505) ~[kryo-2.24.0.jar:na]
  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:182) ~[flink-core-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  ... 10 common frames omitted
Caused by: java.util.ConcurrentModificationException: null
  at java.util.Vector$Itr.checkForComodification(Vector.java:1184) ~[na:1.8.0_60]
  at java.util.Vector$Itr.next(Vector.java:1137) ~[na:1.8.0_60]
  at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  ... 68 common frames omitted



This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275