Fwd: org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Fwd: org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.

sri hari kali charan Tummala

Hi All, 

I am trying to convert sql query results value to distinct and writing to CSV which is failing with below error.

Exception in thread "main" org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.
at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:145)
at com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:153)
at com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala)

Code Example:-
val data = kinesis.map(mapFunction)
tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)
import org.apache.flink.streaming.api.scala._
tEnv.sqlQuery(query).distinct().toRetractStream[Row]
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
FileSystem.WriteMode.NO_OVERWRITE,"~","|")
 
Thanks & Regards
Sri Tummala


Screen Shot 2019-07-16 at 10.58.24 AM.png (105K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.

sri hari kali charan Tummala
is this a Bug in Flink Scala?

Full code and Maven POM:-

package com.aws.examples.kinesis.consumer.TransactionExample

import java.lang
import java.util.Properties

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, ConsumerConfigConstants}
import com.google.gson.{Gson, JsonObject}
import org.apache.flink.api.java.tuple.{Tuple10, Tuple3}
import java.sql.{DriverManager, Time}

import com.aws.SchemaJavaClasses.Row1
import org.apache.flink.types.Row
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.core.fs.{FileSystem, Path}

import scala.collection.JavaConversions._
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import com.aws.customSinks.CsvCustomSink
import org.apache.flink.api.java.tuple
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.table.sinks.AppendStreamTableSink
import org.apache.flink.table.sinks.RetractStreamTableSink
import org.apache.flink.api.java.DataSet



object KinesisConsumer extends RetractStreamTableSink[Row] {

override def configure(strings: Array[String], typeInformations: Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = ???

override def getFieldNames: Array[String] = ???

override def getFieldTypes: Array[TypeInformation[_]] = ???

override def emitDataStream(dataStream: DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ???

override def getOutputType(): TupleTypeInfo[tuple.Tuple2[lang.Boolean, Row]] = super.getOutputType()

override def getRecordType: TypeInformation[Row] = ???


def main(args: Array[String]): Unit = {

// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
//env.enableCheckpointing(10)

val tEnv: org.apache.flink.table.api.java.StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)

// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials

// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON")

// Create Kinesis stream
val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), consumerConfig))

val mapFunction: MapFunction[String, Tuple10[String, String, String, String, String, String, String, String, String, String]] =
new MapFunction[String, Tuple10[String, String, String, String, String, String, String, String, String, String]]() {

override def map(s: String): Tuple10[String, String, String, String, String, String, String, String, String, String] = {

val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

val csvData = data.getCc_num + "," +
data.getFirst + "," +
data.getLast + "," +
data.getTrans_num + "," +
data.getTrans_time + "," +
data.getCategory + "," +
data.getMerchant + "," +
data.getAmt + "," +
data.getMerch_lat + "," +
data.getMerch_long

//println(csvData)

val p: Array[String] = csvData.split(",")
var cc_num: String = p(0)
var first: String = p(1)
var last: String = p(2)
var trans_num: String = p(3)
var trans_time: String = p(4)
var category: String = p(5)
var merchant: String = p(6)
var amt: String = p(7)
var merch_lat: String = p(8)
var merch_long: String = p(9)

val creationDate: Time = new Time(System.currentTimeMillis())
return new Tuple10(cc_num, first, last, trans_num, trans_time, category, merchant, amt, merch_lat, merch_long)
}
}

val data = kinesis.map(mapFunction)
tEnv.registerDataStream("transactions", data, "cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)
import org.apache.flink.streaming.api.scala._
tEnv.sqlQuery(query).distinct().toRetractStream[Row]
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
FileSystem.WriteMode.NO_OVERWRITE, "~", "|")

env.execute()
}
}
POM:-

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>FlinkStreamAndSql</groupId>
<artifactId>FlinkStreamAndSql</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<!-- see http://davidb.github.com/scala-maven-plugin -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.3</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.13</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<!-- If you have classpath issue like NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>

<!-- "package" command plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.13.1.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_2.11</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.8.1</version>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.8.1</version>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis_2.11</artifactId>
<version>1.8.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.8.8</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>1.11.579</version>
</dependency>

<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.1</version>
</dependency>

<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-csv -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.7</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.4.1</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>dynamodb-streams-kinesis-adapter</artifactId>
<version>1.4.0</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>dynamodb-streams-kinesis-adapter</artifactId>
<version>1.4.0</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.11.579</version>
</dependency>

</dependencies>

</project>


On Tue, Jul 16, 2019 at 11:00 AM sri hari kali charan Tummala <[hidden email]> wrote:

Hi All, 

I am trying to convert sql query results value to distinct and writing to CSV which is failing with below error.

Exception in thread "main" org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.
at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:145)
at com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:153)
at com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala)

Code Example:-
val data = kinesis.map(mapFunction)
tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)
import org.apache.flink.streaming.api.scala._
tEnv.sqlQuery(query).distinct().toRetractStream[Row]
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
FileSystem.WriteMode.NO_OVERWRITE,"~","|")
 
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala

Reply | Threaded
Open this post in threaded view
|

Re: org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.

Hequn Cheng
Hi Sri,

For scala jobs, we should import the corresponding scala Environment and DataStream.

e.g,
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.scala.StreamTableEnvironment
See example here[1].

Best,
Hequn



On Tue, Jul 16, 2019 at 11:03 PM sri hari kali charan Tummala <[hidden email]> wrote:
is this a Bug in Flink Scala?

Full code and Maven POM:-

package com.aws.examples.kinesis.consumer.TransactionExample

import java.lang
import java.util.Properties

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, ConsumerConfigConstants}
import com.google.gson.{Gson, JsonObject}
import org.apache.flink.api.java.tuple.{Tuple10, Tuple3}
import java.sql.{DriverManager, Time}

import com.aws.SchemaJavaClasses.Row1
import org.apache.flink.types.Row
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.core.fs.{FileSystem, Path}

import scala.collection.JavaConversions._
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import com.aws.customSinks.CsvCustomSink
import org.apache.flink.api.java.tuple
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.table.sinks.AppendStreamTableSink
import org.apache.flink.table.sinks.RetractStreamTableSink
import org.apache.flink.api.java.DataSet



object KinesisConsumer extends RetractStreamTableSink[Row] {

override def configure(strings: Array[String], typeInformations: Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = ???

override def getFieldNames: Array[String] = ???

override def getFieldTypes: Array[TypeInformation[_]] = ???

override def emitDataStream(dataStream: DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ???

override def getOutputType(): TupleTypeInfo[tuple.Tuple2[lang.Boolean, Row]] = super.getOutputType()

override def getRecordType: TypeInformation[Row] = ???


def main(args: Array[String]): Unit = {

// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
//env.enableCheckpointing(10)

val tEnv: org.apache.flink.table.api.java.StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)

// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials

// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON")

// Create Kinesis stream
val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), consumerConfig))

val mapFunction: MapFunction[String, Tuple10[String, String, String, String, String, String, String, String, String, String]] =
new MapFunction[String, Tuple10[String, String, String, String, String, String, String, String, String, String]]() {

override def map(s: String): Tuple10[String, String, String, String, String, String, String, String, String, String] = {

val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

val csvData = data.getCc_num + "," +
data.getFirst + "," +
data.getLast + "," +
data.getTrans_num + "," +
data.getTrans_time + "," +
data.getCategory + "," +
data.getMerchant + "," +
data.getAmt + "," +
data.getMerch_lat + "," +
data.getMerch_long

//println(csvData)

val p: Array[String] = csvData.split(",")
var cc_num: String = p(0)
var first: String = p(1)
var last: String = p(2)
var trans_num: String = p(3)
var trans_time: String = p(4)
var category: String = p(5)
var merchant: String = p(6)
var amt: String = p(7)
var merch_lat: String = p(8)
var merch_long: String = p(9)

val creationDate: Time = new Time(System.currentTimeMillis())
return new Tuple10(cc_num, first, last, trans_num, trans_time, category, merchant, amt, merch_lat, merch_long)
}
}

val data = kinesis.map(mapFunction)
tEnv.registerDataStream("transactions", data, "cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)
import org.apache.flink.streaming.api.scala._
tEnv.sqlQuery(query).distinct().toRetractStream[Row]
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
FileSystem.WriteMode.NO_OVERWRITE, "~", "|")

env.execute()
}
}
POM:-

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>FlinkStreamAndSql</groupId>
<artifactId>FlinkStreamAndSql</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<!-- see http://davidb.github.com/scala-maven-plugin -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.3</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.13</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<!-- If you have classpath issue like NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>

<!-- "package" command plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.13.1.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_2.11</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.8.1</version>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.8.1</version>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis_2.11</artifactId>
<version>1.8.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.8.8</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>1.11.579</version>
</dependency>

<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.1</version>
</dependency>

<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-csv -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.7</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.4.1</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>dynamodb-streams-kinesis-adapter</artifactId>
<version>1.4.0</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>dynamodb-streams-kinesis-adapter</artifactId>
<version>1.4.0</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.11.579</version>
</dependency>

</dependencies>

</project>


On Tue, Jul 16, 2019 at 11:00 AM sri hari kali charan Tummala <[hidden email]> wrote:

Hi All, 

I am trying to convert sql query results value to distinct and writing to CSV which is failing with below error.

Exception in thread "main" org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.
at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:145)
at com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:153)
at com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala)

Code Example:-
val data = kinesis.map(mapFunction)
tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)
import org.apache.flink.streaming.api.scala._
tEnv.sqlQuery(query).distinct().toRetractStream[Row]
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
FileSystem.WriteMode.NO_OVERWRITE,"~","|")
 
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala