Error while sinking results to Cassandra using Flink Cassandra Connector

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Error while sinking results to Cassandra using Flink Cassandra Connector

manvmali
Hi, I am facing the issue of writing the data stream result into Cassandra. I tried with a sample job of just sinking a Tuple into Cassandra. And I keep on getting “java.lang.NoSuchMethodError” while running the job on Flink deployed on Kubernetes.


Here is the sample WriteToCassandraJob:

public class WriteToCassandra {

private static final String INSERT = "INSERT INTO kmv.testtable (element1, element2) VALUES (?, ?)";
private static final ArrayList<Tuple2<String, Integer>> collection = new ArrayList<>(20);

static {
for (int i = 0; i < 20; i++) {
collection.add(new Tuple2<>("cassandra-" + i, i));
}
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(collection);

CassandraSink.addSink(source)
.setQuery(INSERT)
.setClusterBuilder(new ClusterBuilder() {
@Override
protected Cluster buildCluster(Builder builder) {
return builder.addContactPoint("172.28.232.144:9042").build();
}
})
.build();

env.execute("WriteTupleIntoCassandra");
}


}


StackTrace:

org.apache.flink.client.program.ProgramInvocationException: The program caused an error:
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Z)V
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.<init>(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.AbstractCassandraTupleSink.<init>(AbstractCassandraTupleSink.java:40)
at org.apache.flink.streaming.connectors.cassandra.CassandraTupleSink.<init>(CassandraTupleSink.java:46)
at org.apache.flink.streaming.connectors.cassandra.CassandraSink$CassandraTupleSinkBuilder.createSink(CassandraSink.java:454)
at org.apache.flink.streaming.connectors.cassandra.CassandraSink$CassandraSinkBuilder.build(CassandraSink.java:415)
at com.odc.atlas.testjob.WriteToCassandra.main(WriteToCassandra.java:37)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)


I also verified that my Flink and Scala versions are consistent. My grade dependencies looks like this:
implementation "org.scala-lang:scala-library:2.12.8"
implementation "org.apache.avro:avro:1.8.1"
implementation "org.apache.flink:flink-scala_2.12:1.8.0"
implementation "org.apache.flink:flink-streaming-scala_2.12:1.8.0"
implementation "org.apache.flink:flink-avro:1.8.0"
implementation "org.apache.flink:flink-connector-kafka-0.11_2.12:1.8.0"
implementation "org.apache.flink:flink-connector-cassandra_2.12:1.8.0"


Any pointers would be appreciated!

Thanks,
Manvi
Reply | Threaded
Open this post in threaded view
|

Re: Error while sinking results to Cassandra using Flink Cassandra Connector

Fabian Hueske-2
Hi Manvi,

A NoSuchMethodError typically indicates a version mismatch.
I would check if the Flink versions of your program, the client, and the cluster are the same.

Best, Fabian

Am Di., 20. Aug. 2019 um 21:09 Uhr schrieb manvmali <[hidden email]>:
Hi, I am facing the issue of writing the data stream result into Cassandra. I tried with a sample job of just sinking a Tuple into Cassandra. And I keep on getting “java.lang.NoSuchMethodError” while running the job on Flink deployed on Kubernetes.


Here is the sample WriteToCassandraJob:

public class WriteToCassandra {

private static final String INSERT = "INSERT INTO kmv.testtable (element1, element2) VALUES (?, ?)";
private static final ArrayList<Tuple2<String, Integer>> collection = new ArrayList<>(20);

static {
for (int i = 0; i < 20; i++) {
collection.add(new Tuple2<>("cassandra-" + i, i));
}
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(collection);

CassandraSink.addSink(source)
.setQuery(INSERT)
.setClusterBuilder(new ClusterBuilder() {
@Override
protected Cluster buildCluster(Builder builder) {
return builder.addContactPoint("172.28.232.144:9042").build();
}
})
.build();

env.execute("WriteTupleIntoCassandra");
}


}


StackTrace:

org.apache.flink.client.program.ProgramInvocationException: The program caused an error:
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Z)V
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.<init>(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.AbstractCassandraTupleSink.<init>(AbstractCassandraTupleSink.java:40)
at org.apache.flink.streaming.connectors.cassandra.CassandraTupleSink.<init>(CassandraTupleSink.java:46)
at org.apache.flink.streaming.connectors.cassandra.CassandraSink$CassandraTupleSinkBuilder.createSink(CassandraSink.java:454)
at org.apache.flink.streaming.connectors.cassandra.CassandraSink$CassandraSinkBuilder.build(CassandraSink.java:415)
at com.odc.atlas.testjob.WriteToCassandra.main(WriteToCassandra.java:37)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)


I also verified that my Flink and Scala versions are consistent. My grade dependencies looks like this:
implementation "org.scala-lang:scala-library:2.12.8"
implementation "org.apache.avro:avro:1.8.1"
implementation "org.apache.flink:flink-scala_2.12:1.8.0"
implementation "org.apache.flink:flink-streaming-scala_2.12:1.8.0"
implementation "org.apache.flink:flink-avro:1.8.0"
implementation "org.apache.flink:flink-connector-kafka-0.11_2.12:1.8.0"
implementation "org.apache.flink:flink-connector-cassandra_2.12:1.8.0"


Any pointers would be appreciated!

Thanks,
Manvi