|
Hi Sri,
For scala jobs, we should import the corresponding scala Environment and DataStream.
e.g, import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.scala.StreamTableEnvironment
See example here[1].
Best, Hequn
On Tue, Jul 16, 2019 at 11:03 PM sri hari kali charan Tummala < [hidden email]> wrote: is this a Bug in Flink Scala?
Full code and Maven POM:-
package com.aws.examples.kinesis.consumer.TransactionExample
import java.lang import java.util.Properties
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, ConsumerConfigConstants} import com.google.gson.{Gson, JsonObject} import org.apache.flink.api.java.tuple.{Tuple10, Tuple3} import java.sql.{DriverManager, Time}
import com.aws.SchemaJavaClasses.Row1 import org.apache.flink.types.Row import org.apache.flink.table.api.scala._ import org.apache.flink.table.sinks.CsvTableSink import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.sinks.TableSink import org.apache.flink.core.fs.{FileSystem, Path}
import scala.collection.JavaConversions._ import org.apache.flink.table.sources.CsvTableSource import org.apache.flink.table.api.Table import org.apache.flink.table.api.TableEnvironment import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import com.aws.customSinks.CsvCustomSink import org.apache.flink.api.java.tuple import org.apache.flink.api.java.typeutils.TupleTypeInfo import org.apache.flink.table.sinks.AppendStreamTableSink import org.apache.flink.table.sinks.RetractStreamTableSink import org.apache.flink.api.java.DataSet
object KinesisConsumer extends RetractStreamTableSink[Row] {
override def configure(strings: Array[String], typeInformations: Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = ???
override def getFieldNames: Array[String] = ???
override def getFieldTypes: Array[TypeInformation[_]] = ???
override def emitDataStream(dataStream: DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ???
override def getOutputType(): TupleTypeInfo[tuple.Tuple2[lang.Boolean, Row]] = super.getOutputType()
override def getRecordType: TypeInformation[Row] = ???
def main(args: Array[String]): Unit = {
// set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment //env.enableCheckpointing(10)
val tEnv: org.apache.flink.table.api.java.StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)
// Get AWS credentials val credentialsProvider = new DefaultAWSCredentialsProviderChain val credentials = credentialsProvider.getCredentials
// Configure Flink Kinesis consumer val consumerConfig = new Properties consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId) consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey) consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON")
// Create Kinesis stream val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), consumerConfig))
val mapFunction: MapFunction[String, Tuple10[String, String, String, String, String, String, String, String, String, String]] = new MapFunction[String, Tuple10[String, String, String, String, String, String, String, String, String, String]]() {
override def map(s: String): Tuple10[String, String, String, String, String, String, String, String, String, String] = {
val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
val csvData = data.getCc_num + "," + data.getFirst + "," + data.getLast + "," + data.getTrans_num + "," + data.getTrans_time + "," + data.getCategory + "," + data.getMerchant + "," + data.getAmt + "," + data.getMerch_lat + "," + data.getMerch_long
//println(csvData)
val p: Array[String] = csvData.split(",") var cc_num: String = p(0) var first: String = p(1) var last: String = p(2) var trans_num: String = p(3) var trans_time: String = p(4) var category: String = p(5) var merchant: String = p(6) var amt: String = p(7) var merch_lat: String = p(8) var merch_long: String = p(9)
val creationDate: Time = new Time(System.currentTimeMillis()) return new Tuple10(cc_num, first, last, trans_num, trans_time, category, merchant, amt, merch_lat, merch_long) } }
val data = kinesis.map(mapFunction) tEnv.registerDataStream("transactions", data, "cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')" val table = tEnv.sqlQuery(query) import org.apache.flink.streaming.api.scala._ tEnv.sqlQuery(query).distinct().toRetractStream[Row] .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4", FileSystem.WriteMode.NO_OVERWRITE, "~", "|")
env.execute() } } POM:-
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>FlinkStreamAndSql</groupId> <artifactId>FlinkStreamAndSql</artifactId> <version>1.0-SNAPSHOT</version> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <!-- see http://davidb.github.com/scala-maven-plugin --> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.1.3</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.13</version> <configuration> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <!-- If you have classpath issue like NoDefClassError,... --> <!-- useManifestOnlyJar>false</useManifestOnlyJar --> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin>
<!-- "package" command plugin --> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.4.1</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <dependencies>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.derby</groupId> <artifactId>derby</artifactId> <version>10.13.1.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis_2.11</artifactId> <version>1.8.0</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency> <groupId>com.amazonaws</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>1.8.8</version> </dependency>
<dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-kinesis</artifactId> <version>1.11.579</version> </dependency>
<dependency> <groupId>commons-dbcp</groupId> <artifactId>commons-dbcp</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.1</version> </dependency>
<dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.4</version> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-csv --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-csv</artifactId> <version>1.7</version> </dependency>
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-compress</artifactId> <version>1.4.1</version> </dependency>
<dependency> <groupId>com.amazonaws</groupId> <artifactId>dynamodb-streams-kinesis-adapter</artifactId> <version>1.4.0</version> </dependency>
<dependency> <groupId>com.amazonaws</groupId> <artifactId>dynamodb-streams-kinesis-adapter</artifactId> <version>1.4.0</version> </dependency>
<dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk</artifactId> <version>1.11.579</version> </dependency>
</dependencies>
</project>
On Tue, Jul 16, 2019 at 11:00 AM sri hari kali charan Tummala < [hidden email]> wrote: Hi All,
I am trying to convert sql query results value to distinct and writing to CSV which is failing with below error.
Exception in thread "main" org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams. at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:145) at com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:153) at com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala)
Code Example:- val data = kinesis.map(mapFunction) tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')" val table = tEnv.sqlQuery(query) import org.apache.flink.streaming.api.scala._ tEnv.sqlQuery(query).distinct().toRetractStream[Row] .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4", FileSystem.WriteMode.NO_OVERWRITE,"~","|")
Thanks & Regards Sri Tummala
--
Thanks & Regards Sri Tummala
|