Cassandra POJO sink flink 1.4.0 in scala

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Cassandra POJO sink flink 1.4.0 in scala

shashank734
HI,

I have upgraded flink from 1.3.2 to 1.4.0. I am using cassandra sink in my scala application. Before sink, i was converting my scala datastream to java stream and sinking in Cassandra. I have created pojo class in scala liked that :

@SerialVersionUID(507L)
@Table(keyspace = "neofp", name = "order_detail")
case class OrderFinal(
                       @BeanProperty var order_name: String,
                       @BeanProperty var user: String )extends Serializable
{
  def this() {
    this("NA", "NA",)
  }
}

and this was working fine with sink. after upgrading to 1.4.0 it's giving error "Query must not be null or empty." 

After dig into the CassandraSink code, I have found it's treating it as case class and running CassandraScalaProductSinkBuilder which do sanityCheck of query existence.

So how I can create POJO class in scala so CassandraSink treats it as CassandraPojoSinkBuilder?

For workaround now I have downgraded the only connector to 1.3.2 


Thanks
Shashank

Reply | Threaded
Open this post in threaded view
|

Re: Cassandra POJO sink flink 1.4.0 in scala

Timo Walther
Hi Shashank,

Scala case classes are treated as a special tuple type in Flink. If you want to make a POJO out of it, just remove the "case" keyword and make sure that the class is static (in the companion object).

I hope that helps.

Timo


Am 12/19/17 um 11:04 AM schrieb shashank agarwal:
HI,

I have upgraded flink from 1.3.2 to 1.4.0. I am using cassandra sink in my scala application. Before sink, i was converting my scala datastream to java stream and sinking in Cassandra. I have created pojo class in scala liked that :

@SerialVersionUID(507L)
@Table(keyspace = "neofp", name = "order_detail")
case class OrderFinal(
                       @BeanProperty var order_name: String,
                       @BeanProperty var user: String )extends Serializable
{
  def this() {
    this("NA", "NA",)
  }
}

and this was working fine with sink. after upgrading to 1.4.0 it's giving error "Query must not be null or empty." 

After dig into the CassandraSink code, I have found it's treating it as case class and running CassandraScalaProductSinkBuilder which do sanityCheck of query existence.

So how I can create POJO class in scala so CassandraSink treats it as CassandraPojoSinkBuilder?

For workaround now I have downgraded the only connector to 1.3.2 


Thanks
Shashank


Reply | Threaded
Open this post in threaded view
|

Re: Cassandra POJO sink flink 1.4.0 in scala

shashank734
In reply to this post by shashank734
I have tried that by creating class with companion static object: 

@SerialVersionUID(507L)
@Table(keyspace = "neofp", name = "order_detail")
class OrderFinal(
                       @BeanProperty var order_name: String,
                       @BeanProperty var user: String )extends Serializable
{
  def this() {
    this("NA", "NA",)
  }
}
object OrderFinal
{

}



When running with 1.4.0 it's giving following error :


java.lang.NoClassDefFoundError: Could not initialize class com.datastax.driver.core.NettyUtil
at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

12/19/2017 18:04:33 Job execution switched to status FAILING.
java.lang.NoClassDefFoundError: Could not initialize class com.datastax.driver.core.NettyUtil
at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

On Tue, Dec 19, 2017 at 3:34 PM, shashank agarwal <[hidden email]> wrote:
HI,

I have upgraded flink from 1.3.2 to 1.4.0. I am using cassandra sink in my scala application. Before sink, i was converting my scala datastream to java stream and sinking in Cassandra. I have created pojo class in scala liked that :

@SerialVersionUID(507L)
@Table(keyspace = "neofp", name = "order_detail")
case class OrderFinal(
                       @BeanProperty var order_name: String,
                       @BeanProperty var user: String )extends Serializable
{
  def this() {
    this("NA", "NA",)
  }
}

and this was working fine with sink. after upgrading to 1.4.0 it's giving error "Query must not be null or empty." 

After dig into the CassandraSink code, I have found it's treating it as case class and running CassandraScalaProductSinkBuilder which do sanityCheck of query existence.

So how I can create POJO class in scala so CassandraSink treats it as CassandraPojoSinkBuilder?

For workaround now I have downgraded the only connector to 1.3.2 


Thanks
Shashank




--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....

Reply | Threaded
Open this post in threaded view
|

Re: Cassandra POJO sink flink 1.4.0 in scala

Timo Walther
Hi Shashank,

the exception you get is a known issue [0] that will be fixed with Flink 1.4.1. We improved the dependency management but it seems this causes some problems with the Cassandra connector right now.

So as a workaround you can add netty (version 4.0) to your dependencies. This should fix the problem until the new Flink version. Please let us know if yes.

Regards,
Timo

[0] https://issues.apache.org/jira/browse/FLINK-8295



Am 12/19/17 um 1:41 PM schrieb shashank agarwal:
I have tried that by creating class with companion static object: 

@SerialVersionUID(507L)
@Table(keyspace = "neofp", name = "order_detail")
class OrderFinal(
                       @BeanProperty var order_name: String,
                       @BeanProperty var user: String )extends Serializable
{
  def this() {
    this("NA", "NA",)
  }
}
object OrderFinal
{

}



When running with 1.4.0 it's giving following error :


java.lang.NoClassDefFoundError: Could not initialize class com.datastax.driver.core.NettyUtil
at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

12/19/2017 18:04:33 Job execution switched to status FAILING.
java.lang.NoClassDefFoundError: Could not initialize class com.datastax.driver.core.NettyUtil
at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

On Tue, Dec 19, 2017 at 3:34 PM, shashank agarwal <[hidden email]> wrote:
HI,

I have upgraded flink from 1.3.2 to 1.4.0. I am using cassandra sink in my scala application. Before sink, i was converting my scala datastream to java stream and sinking in Cassandra. I have created pojo class in scala liked that :

@SerialVersionUID(507L)
@Table(keyspace = "neofp", name = "order_detail")
case class OrderFinal(
                       @BeanProperty var order_name: String,
                       @BeanProperty var user: String )extends Serializable
{
  def this() {
    this("NA", "NA",)
  }
}

and this was working fine with sink. after upgrading to 1.4.0 it's giving error "Query must not be null or empty." 

After dig into the CassandraSink code, I have found it's treating it as case class and running CassandraScalaProductSinkBuilder which do sanityCheck of query existence.

So how I can create POJO class in scala so CassandraSink treats it as CassandraPojoSinkBuilder?

For workaround now I have downgraded the only connector to 1.3.2 


Thanks
Shashank




--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....


Reply | Threaded
Open this post in threaded view
|

Re: Cassandra POJO sink flink 1.4.0 in scala

shashank734
In reply to this post by shashank734
Hi,

I have added netty-all 4.0 as dependency now it's working fine. Only thing I had to create POJO class ion scala like this.

@SerialVersionUID(507L)
@Table(keyspace = "twtt", name = "order")
class OrderFinal(
                       @BeanProperty var name: String,
                       @BeanProperty var userEmail: String)extends Serializable
{
  def this() {
    this("NA", "NA")
  }
}


If I am removing @BeanProperty or converting var to Val. It's giving error of no setters or getters found or multiple found. This is the final workaround i found. 




On Tue, Dec 19, 2017 at 6:11 PM, shashank agarwal <[hidden email]> wrote:
I have tried that by creating class with companion static object: 

@SerialVersionUID(507L)
@Table(keyspace = "neofp", name = "order_detail")
class OrderFinal(
                       @BeanProperty var order_name: String,
                       @BeanProperty var user: String )extends Serializable
{
  def this() {
    this("NA", "NA",)
  }
}
object OrderFinal
{

}



When running with 1.4.0 it's giving following error :


java.lang.NoClassDefFoundError: Could not initialize class com.datastax.driver.core.NettyUtil
at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

12/19/2017 18:04:33 Job execution switched to status FAILING.
java.lang.NoClassDefFoundError: Could not initialize class com.datastax.driver.core.NettyUtil
at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

On Tue, Dec 19, 2017 at 3:34 PM, shashank agarwal <[hidden email]> wrote:
HI,

I have upgraded flink from 1.3.2 to 1.4.0. I am using cassandra sink in my scala application. Before sink, i was converting my scala datastream to java stream and sinking in Cassandra. I have created pojo class in scala liked that :

@SerialVersionUID(507L)
@Table(keyspace = "neofp", name = "order_detail")
case class OrderFinal(
                       @BeanProperty var order_name: String,
                       @BeanProperty var user: String )extends Serializable
{
  def this() {
    this("NA", "NA",)
  }
}

and this was working fine with sink. after upgrading to 1.4.0 it's giving error "Query must not be null or empty." 

After dig into the CassandraSink code, I have found it's treating it as case class and running CassandraScalaProductSinkBuilder which do sanityCheck of query existence.

So how I can create POJO class in scala so CassandraSink treats it as CassandraPojoSinkBuilder?

For workaround now I have downgraded the only connector to 1.3.2 


Thanks
Shashank




--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....




--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....

Reply | Threaded
Open this post in threaded view
|

Re: Cassandra POJO sink flink 1.4.0 in scala

Timo Walther
Thanks for letting us know. The netty issue will be fixed in Flink 1.4.1.

For case classes there is also a dedicated cassandra sink (every case class is a Product):

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java

Regards,
Timo



Am 12/21/17 um 1:39 PM schrieb shashank agarwal:
Hi,

I have added netty-all 4.0 as dependency now it's working fine. Only thing I had to create POJO class ion scala like this.

@SerialVersionUID(507L)
@Table(keyspace = "twtt", name = "order")
class OrderFinal(
                       @BeanProperty var name: String,
                       @BeanProperty var userEmail: String)extends Serializable
{
  def this() {
    this("NA", "NA")
  }
}


If I am removing @BeanProperty or converting var to Val. It's giving error of no setters or getters found or multiple found. This is the final workaround i found. 




On Tue, Dec 19, 2017 at 6:11 PM, shashank agarwal <[hidden email]> wrote:
I have tried that by creating class with companion static object: 

@SerialVersionUID(507L)
@Table(keyspace = "neofp", name = "order_detail")
class OrderFinal(
                       @BeanProperty var order_name: String,
                       @BeanProperty var user: String )extends Serializable
{
  def this() {
    this("NA", "NA",)
  }
}
object OrderFinal
{

}



When running with 1.4.0 it's giving following error :


java.lang.NoClassDefFoundError: Could not initialize class com.datastax.driver.core.NettyUtil
at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

12/19/2017 18:04:33 Job execution switched to status FAILING.
java.lang.NoClassDefFoundError: Could not initialize class com.datastax.driver.core.NettyUtil
at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

On Tue, Dec 19, 2017 at 3:34 PM, shashank agarwal <[hidden email]> wrote:
HI,

I have upgraded flink from 1.3.2 to 1.4.0. I am using cassandra sink in my scala application. Before sink, i was converting my scala datastream to java stream and sinking in Cassandra. I have created pojo class in scala liked that :

@SerialVersionUID(507L)
@Table(keyspace = "neofp", name = "order_detail")
case class OrderFinal(
                       @BeanProperty var order_name: String,
                       @BeanProperty var user: String )extends Serializable
{
  def this() {
    this("NA", "NA",)
  }
}

and this was working fine with sink. after upgrading to 1.4.0 it's giving error "Query must not be null or empty." 

After dig into the CassandraSink code, I have found it's treating it as case class and running CassandraScalaProductSinkBuilder which do sanityCheck of query existence.

So how I can create POJO class in scala so CassandraSink treats it as CassandraPojoSinkBuilder?

For workaround now I have downgraded the only connector to 1.3.2 


Thanks
Shashank




--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....




--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....


Reply | Threaded
Open this post in threaded view
|

Re: Cassandra POJO sink flink 1.4.0 in scala

shashank734
Yes but when CassandraScalaProductSinkBuilder called after identifying case class in CassandraSink class it will do sanityCheck and will throw the exception cause It won’t pass any query in that case.

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java

On Thu, 21 Dec 2017 at 7:36 PM, Timo Walther <[hidden email]> wrote:
Thanks for letting us know. The netty issue will be fixed in Flink 1.4.1.

For case classes there is also a dedicated cassandra sink (every case class is a Product):

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java

Regards,
Timo



Am 12/21/17 um 1:39 PM schrieb shashank agarwal:
Hi,

I have added netty-all 4.0 as dependency now it's working fine. Only thing I had to create POJO class ion scala like this.

@SerialVersionUID(507L)
@Table(keyspace = "twtt", name = "order")
class OrderFinal(
                       @BeanProperty var name: String,
                       @BeanProperty var userEmail: String)extends Serializable
{
  def this() {
    this("NA", "NA")
  }
}


If I am removing @BeanProperty or converting var to Val. It's giving error of no setters or getters found or multiple found. This is the final workaround i found. 




On Tue, Dec 19, 2017 at 6:11 PM, shashank agarwal <[hidden email]> wrote:
I have tried that by creating class with companion static object: 

@SerialVersionUID(507L)
@Table(keyspace = "neofp", name = "order_detail")
class OrderFinal(
                       @BeanProperty var order_name: String,
                       @BeanProperty var user: String )extends Serializable
{
  def this() {
    this("NA", "NA",)
  }
}
object OrderFinal
{

}



When running with 1.4.0 it's giving following error :


java.lang.NoClassDefFoundError: Could not initialize class com.datastax.driver.core.NettyUtil
at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

12/19/2017 18:04:33 Job execution switched to status FAILING.
java.lang.NoClassDefFoundError: Could not initialize class com.datastax.driver.core.NettyUtil
at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

On Tue, Dec 19, 2017 at 3:34 PM, shashank agarwal <[hidden email]> wrote:
HI,

I have upgraded flink from 1.3.2 to 1.4.0. I am using cassandra sink in my scala application. Before sink, i was converting my scala datastream to java stream and sinking in Cassandra. I have created pojo class in scala liked that :

@SerialVersionUID(507L)
@Table(keyspace = "neofp", name = "order_detail")
case class OrderFinal(
                       @BeanProperty var order_name: String,
                       @BeanProperty var user: String )extends Serializable
{
  def this() {
    this("NA", "NA",)
  }
}

and this was working fine with sink. after upgrading to 1.4.0 it's giving error "Query must not be null or empty." 

After dig into the CassandraSink code, I have found it's treating it as case class and running CassandraScalaProductSinkBuilder which do sanityCheck of query existence.

So how I can create POJO class in scala so CassandraSink treats it as CassandraPojoSinkBuilder?

For workaround now I have downgraded the only connector to 1.3.2 


Thanks
Shashank




--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....




--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....


--
Sent from iPhone 5
Reply | Threaded
Open this post in threaded view
|

Re: Cassandra POJO sink flink 1.4.0 in scala

Michael Fong
Hi, shashank agarwal


AFAIK, in java side, for a pojo data type, you don't need to set query since the CQL data mapping would take care of that whereas dealing with java tuples, you do need to provide a upsert query so that cassandra knows what to insert into the table. 
Scala tuple case is clear, same as java - providing a CQL query; however, I don't know what's up with Scala pojo case (class) though...

Regards,

Michael
Reply | Threaded
Open this post in threaded view
|

Re: Cassandra POJO sink flink 1.4.0 in scala

Michael Fong
Hi, shashank agarwal


Not sure if I can answer fully your question, but after digging some code, I am not sure if C* connector totally supports Scala case class + CQL data mapping at the moment. I may be totally wrong, and you need to ask the flink dev about this. However, I have some toy examples that you could check out to see which useCassandraScalaProductSinkBuilder + predefined CQL query + entity. I am not using Scala case class so may not fit your need.

The example snippet you may find @

Regards,

On Thu, Dec 28, 2017 at 1:11 PM, Michael Fong <[hidden email]> wrote:
Hi, shashank agarwal


AFAIK, in java side, for a pojo data type, you don't need to set query since the CQL data mapping would take care of that whereas dealing with java tuples, you do need to provide a upsert query so that cassandra knows what to insert into the table. 
Scala tuple case is clear, same as java - providing a CQL query; however, I don't know what's up with Scala pojo case (class) though...

Regards,

Michael

Reply | Threaded
Open this post in threaded view
|

Re: Cassandra POJO sink flink 1.4.0 in scala

shashank734
 Hi Micheal, 

Thanks for the response actually I have solved the issue. I was sharing my knowledge how I solved that. For sinking scala classes like JAVA Pojo. We have to convert that to
JavaStream first but in 1.4 that already done by connector so no need to do that in 1.4

We have to write scala class like this.

@SerialVersionUID(507L)
@Table(keyspace = "twtt", name = "order")
class OrderFinal(
                       @BeanProperty var name: String,
                       @BeanProperty var userEmail: String)extends Serializable
{
  def this() {
    this("NA", "NA")
  }
}

The variable name should be same as the column name in Cassandra table. If we use case class in case of class than that will call 
CassandraScalaProductSinkBuilder in the connector at that required query. So we are making scala class as POJO class. 
Scala class should be handled in connector separately. I will request the feature for the same. While that this is workaround for scala developers.
 





On Thu, Dec 28, 2017 at 11:28 AM, Michael Fong <[hidden email]> wrote:
Hi, shashank agarwal


Not sure if I can answer fully your question, but after digging some code, I am not sure if C* connector totally supports Scala case class + CQL data mapping at the moment. I may be totally wrong, and you need to ask the flink dev about this. However, I have some toy examples that you could check out to see which useCassandraScalaProductSinkBuilder + predefined CQL query + entity. I am not using Scala case class so may not fit your need.

The example snippet you may find @

Regards,

On Thu, Dec 28, 2017 at 1:11 PM, Michael Fong <[hidden email]> wrote:
Hi, shashank agarwal


AFAIK, in java side, for a pojo data type, you don't need to set query since the CQL data mapping would take care of that whereas dealing with java tuples, you do need to provide a upsert query so that cassandra knows what to insert into the table. 
Scala tuple case is clear, same as java - providing a CQL query; however, I don't know what's up with Scala pojo case (class) though...

Regards,

Michael




--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....