|
this is the error.
org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to org.apache.flink.table.api.scala.StreamTableEnvironment
Hi Kali,
What's the exception thrown or error message hinted when executing the erroneous step? Please print them here so that we can investigate the problem.
Hi ,
I am trying to write flink table to streaming Sink it fails at casting Java to Scala or Scala to Java, it fails at below step can anyone help me out ? about this error.
val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"), new SimpleStringEncoder[Row]("UTF-8")).build()
table.addSink(sink2)
package com.aws.examples.kinesis.consumer.TransactionExample
import java.util.Properties
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, ConsumerConfigConstants} import org.apache.flink.table.api.{Table, TableEnvironment} import com.google.gson.{Gson, JsonObject} import org.apache.flink.api.java.tuple.{Tuple10, Tuple3} import java.sql.{DriverManager, Time}
import com.aws.SchemaJavaClasses.Row1 import org.apache.flink.types.Row import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat} import org.apache.flink.table.api.scala._ import org.apache.flink.table.sinks.CsvTableSink import org.apache.flink.api.java.io.jdbc import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat import org.apache.flink.table.api.java._ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.table.sinks.TableSink import com.aws.customSinks.CsvCustomSink import org.apache.flink.core.fs.Path
import scala.collection.JavaConversions._ import org.apache.flink.table.sources.CsvTableSource import org.apache.flink.table.api.Table import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.java.StreamTableEnvironment import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import com.aws.customSinks.CsvCustomSink import org.apache.flink.streaming.api.functions.sink.SinkFunction
object KinesisConsumer {
def main(args: Array[String]): Unit = {
// set up the streaming execution environment val env = StreamExecutionEnvironment.createLocalEnvironment //env.enableCheckpointing(10)
val tEnv = TableEnvironment.getTableEnvironment(env)
// Get AWS credentials val credentialsProvider = new DefaultAWSCredentialsProviderChain val credentials = credentialsProvider.getCredentials
// Configure Flink Kinesis consumer val consumerConfig = new Properties consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId) consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey) consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON")
// Create Kinesis stream val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), consumerConfig))
val mapFunction: MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]] = new MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]]() {
override def map(s: String): Tuple10[String, String, String,String,String,String,String,String,String,String] = { val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
val csvData = data.getCc_num+","+ data.getFirst+","+ data.getLast+","+ data.getTrans_num+","+ data.getTrans_time+","+ data.getCategory+","+ data.getMerchant+","+ data.getAmt+","+ data.getMerch_lat+","+ data.getMerch_long
//println(csvData)
val p:Array[String] = csvData.split(",") var cc_num:String = p(0) var first:String = p(1) var last:String = p(2) var trans_num:String = p(3) var trans_time:String = p(4) var category:String = p(5) var merchant:String = p(6) var amt:String = p(7) var merch_lat:String = p(8) var merch_long:String = p(9)
val creationDate: Time = new Time(System.currentTimeMillis()) return new Tuple10(cc_num, first, last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long) } }
val data = kinesis.map(mapFunction)
//data.print()
tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')" val table = tEnv.sqlQuery(query)
//println(table.toString())
//val test = new CsvCustomSink("")
val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"), new SimpleStringEncoder[Row]("UTF-8")).build()
table.addSink(sink2)
env.execute()
}
}
-- Thanks & Regards Sri Tummala
-- Thanks & Regards Sri Tummala
-- Thanks & Regards Sri Tummala
|