Fwd: org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to org.apache.flink.table.api.scala.StreamTableEnvironment

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Fwd: org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to org.apache.flink.table.api.scala.StreamTableEnvironment

sri hari kali charan Tummala
Hi , 

I am trying to write flink table to streaming Sink it fails at casting Java to Scala or Scala to Java, it fails at below step can anyone help me out ? about this error.


val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"),
new SimpleStringEncoder[Row]("UTF-8")).build()

table.addSink(sink2)

package com.aws.examples.kinesis.consumer.TransactionExample

import java.util.Properties

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, ConsumerConfigConstants}
import org.apache.flink.table.api.{Table, TableEnvironment}
import com.google.gson.{Gson, JsonObject}
import org.apache.flink.api.java.tuple.{Tuple10, Tuple3}
import java.sql.{DriverManager, Time}

import com.aws.SchemaJavaClasses.Row1
import org.apache.flink.types.Row
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.api.java.io.jdbc
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat
import org.apache.flink.table.api.java._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.sinks.TableSink
import com.aws.customSinks.CsvCustomSink
import org.apache.flink.core.fs.Path

import scala.collection.JavaConversions._
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.java.StreamTableEnvironment
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import com.aws.customSinks.CsvCustomSink
import org.apache.flink.streaming.api.functions.sink.SinkFunction

object KinesisConsumer {

def main(args: Array[String]): Unit = {

// set up the streaming execution environment
val env = StreamExecutionEnvironment.createLocalEnvironment
//env.enableCheckpointing(10)

val tEnv = TableEnvironment.getTableEnvironment(env)

// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials

// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON")

// Create Kinesis stream
val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), consumerConfig))

val mapFunction: MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]] =
new MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]]() {

override def map(s: String): Tuple10[String, String, String,String,String,String,String,String,String,String] = {
val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

val csvData = data.getCc_num+","+
data.getFirst+","+
data.getLast+","+
data.getTrans_num+","+
data.getTrans_time+","+
data.getCategory+","+
data.getMerchant+","+
data.getAmt+","+
data.getMerch_lat+","+
data.getMerch_long

//println(csvData)

val p:Array[String] = csvData.split(",")
var cc_num:String = p(0)
var first:String = p(1)
var last:String = p(2)
var trans_num:String = p(3)
var trans_time:String = p(4)
var category:String = p(5)
var merchant:String = p(6)
var amt:String = p(7)
var merch_lat:String = p(8)
var merch_long:String = p(9)

val creationDate: Time = new Time(System.currentTimeMillis())
return new Tuple10(cc_num, first, last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
}
}

val data = kinesis.map(mapFunction)

//data.print()

tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")

val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)

//println(table.toString())

//val test = new CsvCustomSink("")

val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"),
new SimpleStringEncoder[Row]("UTF-8")).build()

table.addSink(sink2)

env.execute()

}

}


--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala

Reply | Threaded
Open this post in threaded view
|

Re: org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to org.apache.flink.table.api.scala.StreamTableEnvironment

Caizhi Weng
Hi Kali,

What's the exception thrown or error message hinted when executing the erroneous step? Please print them here so that we can investigate the problem.

sri hari kali charan Tummala <[hidden email]> 于2019年7月16日周二 上午4:49写道:
Hi , 

I am trying to write flink table to streaming Sink it fails at casting Java to Scala or Scala to Java, it fails at below step can anyone help me out ? about this error.


val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"),
new SimpleStringEncoder[Row]("UTF-8")).build()

table.addSink(sink2)

package com.aws.examples.kinesis.consumer.TransactionExample

import java.util.Properties

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, ConsumerConfigConstants}
import org.apache.flink.table.api.{Table, TableEnvironment}
import com.google.gson.{Gson, JsonObject}
import org.apache.flink.api.java.tuple.{Tuple10, Tuple3}
import java.sql.{DriverManager, Time}

import com.aws.SchemaJavaClasses.Row1
import org.apache.flink.types.Row
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.api.java.io.jdbc
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat
import org.apache.flink.table.api.java._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.sinks.TableSink
import com.aws.customSinks.CsvCustomSink
import org.apache.flink.core.fs.Path

import scala.collection.JavaConversions._
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.java.StreamTableEnvironment
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import com.aws.customSinks.CsvCustomSink
import org.apache.flink.streaming.api.functions.sink.SinkFunction

object KinesisConsumer {

def main(args: Array[String]): Unit = {

// set up the streaming execution environment
val env = StreamExecutionEnvironment.createLocalEnvironment
//env.enableCheckpointing(10)

val tEnv = TableEnvironment.getTableEnvironment(env)

// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials

// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON")

// Create Kinesis stream
val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), consumerConfig))

val mapFunction: MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]] =
new MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]]() {

override def map(s: String): Tuple10[String, String, String,String,String,String,String,String,String,String] = {
val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

val csvData = data.getCc_num+","+
data.getFirst+","+
data.getLast+","+
data.getTrans_num+","+
data.getTrans_time+","+
data.getCategory+","+
data.getMerchant+","+
data.getAmt+","+
data.getMerch_lat+","+
data.getMerch_long

//println(csvData)

val p:Array[String] = csvData.split(",")
var cc_num:String = p(0)
var first:String = p(1)
var last:String = p(2)
var trans_num:String = p(3)
var trans_time:String = p(4)
var category:String = p(5)
var merchant:String = p(6)
var amt:String = p(7)
var merch_lat:String = p(8)
var merch_long:String = p(9)

val creationDate: Time = new Time(System.currentTimeMillis())
return new Tuple10(cc_num, first, last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
}
}

val data = kinesis.map(mapFunction)

//data.print()

tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")

val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)

//println(table.toString())

//val test = new CsvCustomSink("")

val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"),
new SimpleStringEncoder[Row]("UTF-8")).build()

table.addSink(sink2)

env.execute()

}

}


--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala

Reply | Threaded
Open this post in threaded view
|

Re: org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to org.apache.flink.table.api.scala.StreamTableEnvironment

sri hari kali charan Tummala
this is the error.

org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to org.apache.flink.table.api.scala.StreamTableEnvironment


On Mon, Jul 15, 2019 at 9:54 PM Caizhi Weng <[hidden email]> wrote:
Hi Kali,

What's the exception thrown or error message hinted when executing the erroneous step? Please print them here so that we can investigate the problem.

sri hari kali charan Tummala <[hidden email]> 于2019年7月16日周二 上午4:49写道:
Hi , 

I am trying to write flink table to streaming Sink it fails at casting Java to Scala or Scala to Java, it fails at below step can anyone help me out ? about this error.


val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"),
new SimpleStringEncoder[Row]("UTF-8")).build()

table.addSink(sink2)

package com.aws.examples.kinesis.consumer.TransactionExample

import java.util.Properties

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, ConsumerConfigConstants}
import org.apache.flink.table.api.{Table, TableEnvironment}
import com.google.gson.{Gson, JsonObject}
import org.apache.flink.api.java.tuple.{Tuple10, Tuple3}
import java.sql.{DriverManager, Time}

import com.aws.SchemaJavaClasses.Row1
import org.apache.flink.types.Row
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.api.java.io.jdbc
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat
import org.apache.flink.table.api.java._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.sinks.TableSink
import com.aws.customSinks.CsvCustomSink
import org.apache.flink.core.fs.Path

import scala.collection.JavaConversions._
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.java.StreamTableEnvironment
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import com.aws.customSinks.CsvCustomSink
import org.apache.flink.streaming.api.functions.sink.SinkFunction

object KinesisConsumer {

def main(args: Array[String]): Unit = {

// set up the streaming execution environment
val env = StreamExecutionEnvironment.createLocalEnvironment
//env.enableCheckpointing(10)

val tEnv = TableEnvironment.getTableEnvironment(env)

// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials

// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON")

// Create Kinesis stream
val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), consumerConfig))

val mapFunction: MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]] =
new MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]]() {

override def map(s: String): Tuple10[String, String, String,String,String,String,String,String,String,String] = {
val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

val csvData = data.getCc_num+","+
data.getFirst+","+
data.getLast+","+
data.getTrans_num+","+
data.getTrans_time+","+
data.getCategory+","+
data.getMerchant+","+
data.getAmt+","+
data.getMerch_lat+","+
data.getMerch_long

//println(csvData)

val p:Array[String] = csvData.split(",")
var cc_num:String = p(0)
var first:String = p(1)
var last:String = p(2)
var trans_num:String = p(3)
var trans_time:String = p(4)
var category:String = p(5)
var merchant:String = p(6)
var amt:String = p(7)
var merch_lat:String = p(8)
var merch_long:String = p(9)

val creationDate: Time = new Time(System.currentTimeMillis())
return new Tuple10(cc_num, first, last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
}
}

val data = kinesis.map(mapFunction)

//data.print()

tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")

val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)

//println(table.toString())

//val test = new CsvCustomSink("")

val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"),
new SimpleStringEncoder[Row]("UTF-8")).build()

table.addSink(sink2)

env.execute()

}

}


--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala