HI, I have upgraded flink from 1.3.2 to 1.4.0. I am using cassandra sink in my scala application. Before sink, i was converting my scala datastream to java stream and sinking in Cassandra. I have created pojo class in scala liked that : @SerialVersionUID(507L) @Table(keyspace = "neofp", name = "order_detail") case class OrderFinal( @BeanProperty var order_name: String, @BeanProperty var user: String )extends Serializable { def this() { this("NA", "NA",) } } and this was working fine with sink. after upgrading to 1.4.0 it's giving error "Query must not be null or empty." After dig into the CassandraSink code, I have found it's treating it as case class and running CassandraScalaProductSinkBuilder which do sanityCheck of query existence. So how I can create POJO class in scala so CassandraSink treats it as CassandraPojoSinkBuilder? For workaround now I have downgraded the only connector to 1.3.2 Thanks Shashank |
Hi Shashank,
Scala case classes are treated as a special tuple type in Flink. If you want to make a POJO out of it, just remove the "case" keyword and make sure that the class is static (in the companion object). I hope that helps. Timo Am 12/19/17 um 11:04 AM schrieb shashank agarwal:
|
In reply to this post by shashank734
I have tried that by creating class with companion static object: @SerialVersionUID(507L) @Table(keyspace = "neofp", name = "order_detail") class OrderFinal( @BeanProperty var order_name: String, @BeanProperty var user: String )extends Serializable { def this() { this("NA", "NA",) } } object OrderFinal { } When running with 1.4.0 it's giving following error : java.lang.NoClassDefFoundError: Could not initialize class com.datastax.driver.core.NettyUtil at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96) at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706) at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365) at com.datastax.driver.core.Cluster.init(Cluster.java:162) at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333) at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308) at com.datastax.driver.core.Cluster.connect(Cluster.java:250) at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88) at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) 12/19/2017 18:04:33 Job execution switched to status FAILING. java.lang.NoClassDefFoundError: Could not initialize class com.datastax.driver.core.NettyUtil at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96) at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706) at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365) at com.datastax.driver.core.Cluster.init(Cluster.java:162) at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333) at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308) at com.datastax.driver.core.Cluster.connect(Cluster.java:250) at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88) at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) On Tue, Dec 19, 2017 at 3:34 PM, shashank agarwal <[hidden email]> wrote:
Thanks Regards SHASHANK AGARWAL --- Trying to mobilize the things.... |
Hi Shashank,
the exception you get is a known issue [0] that will be fixed with Flink 1.4.1. We improved the dependency management but it seems this causes some problems with the Cassandra connector right now. So as a workaround you can add netty (version 4.0) to your dependencies. This should fix the problem until the new Flink version. Please let us know if yes. Regards, Timo [0] https://issues.apache.org/jira/browse/FLINK-8295 Am 12/19/17 um 1:41 PM schrieb shashank agarwal:
|
In reply to this post by shashank734
Hi, I have added netty-all 4.0 as dependency now it's working fine. Only thing I had to create POJO class ion scala like this. @SerialVersionUID(507L) @Table(keyspace = "twtt", name = "order") class OrderFinal( @BeanProperty var name: String, @BeanProperty var userEmail: String)extends Serializable { def this() { this("NA", "NA") } } If I am removing @BeanProperty or converting var to Val. It's giving error of no setters or getters found or multiple found. This is the final workaround i found. On Tue, Dec 19, 2017 at 6:11 PM, shashank agarwal <[hidden email]> wrote:
Thanks Regards SHASHANK AGARWAL --- Trying to mobilize the things.... |
Thanks for letting us know. The netty
issue will be fixed in Flink 1.4.1.
For case classes there is also a dedicated cassandra sink (every case class is a Product): https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java Regards, Timo Am 12/21/17 um 1:39 PM schrieb shashank agarwal:
|
Yes but when CassandraScalaProductSinkBuilder called after identifying case class in CassandraSink class it will do sanityCheck and will throw the exception cause It won’t pass any query in that case. On Thu, 21 Dec 2017 at 7:36 PM, Timo Walther <[hidden email]> wrote:
-- Sent from iPhone 5
|
Hi, shashank agarwal
AFAIK, in java side, for a pojo data type, you don't need to set query since the CQL data mapping would take care of that whereas dealing with java tuples, you do need to provide a upsert query so that cassandra knows what to insert into the table. Scala tuple case is clear, same as java - providing a CQL query; however, I don't know what's up with Scala pojo case (class) though... Regards, Michael |
Hi, shashank agarwal Not sure if I can answer fully your question, but after digging some code, I am not sure if C* connector totally supports Scala case class + CQL data mapping at the moment. I may be totally wrong, and you need to ask the flink dev about this. However, I have some toy examples that you could check out to see which uses CassandraScalaProductSink The example snippet you may find @ Regards, On Thu, Dec 28, 2017 at 1:11 PM, Michael Fong <[hidden email]> wrote:
|
Hi Micheal, Thanks for the response actually I have solved the issue. I was sharing my knowledge how I solved that. For sinking scala classes like JAVA Pojo. We have to convert that to JavaStream first but in 1.4 that already done by connector so no need to do that in 1.4 We have to write scala class like this. @SerialVersionUID(507L) @Table(keyspace = "twtt", name = "order") class OrderFinal( @BeanProperty var name: String, @BeanProperty var userEmail: String)extends Serializable { def this() { this("NA", "NA") } } The variable name should be same as the column name in Cassandra table. If we use case class in case of class than that will call CassandraScalaProductSink Scala class should be handled in connector separately. I will request the feature for the same. While that this is workaround for scala developers. On Thu, Dec 28, 2017 at 11:28 AM, Michael Fong <[hidden email]> wrote:
Thanks Regards SHASHANK AGARWAL --- Trying to mobilize the things.... |
Free forum by Nabble | Edit this page |