table toRetractStream missing last record and adding extra column (True)

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

table toRetractStream missing last record and adding extra column (True)

sri hari kali charan Tummala
Hi All, 

I am trying to write toRetractSream to CSV which is kind of working ok but I get extra values like True and then my output data values.

Question1 :-
I dont want true in my output data how to achieve this? 

Scree

Question 2:- 
in the output file (CSV) I am missing data in the last line is the toRetractStram closing before writing to file? 

Screen Shot attached

Code:-
val data = kinesis.map(mapFunction)
tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)
tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.OVERWRITE,"\n","|")


--
Thanks & Regards
Sri Tummala


Screen Shot 2019-07-16 at 11.52.26 AM.png (107K) Download Attachment
Screen Shot 2019-07-16 at 11.53.05 AM.png (44K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: table toRetractStream missing last record and adding extra column (True)

taher koitawala-2
Looks like you need a window

On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala <[hidden email]> wrote:
Hi All, 

I am trying to write toRetractSream to CSV which is kind of working ok but I get extra values like True and then my output data values.

Question1 :-
I dont want true in my output data how to achieve this? 

Scree

Question 2:- 
in the output file (CSV) I am missing data in the last line is the toRetractStram closing before writing to file? 

Screen Shot attached

Code:-
val data = kinesis.map(mapFunction)
tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)
tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.OVERWRITE,"\n","|")


--
Thanks & Regards
Sri Tummala

Reply | Threaded
Open this post in threaded view
|

Re: table toRetractStream missing last record and adding extra column (True)

sri hari kali charan Tummala
windows for question 1 or question 2 or both ? 

Thanks
Sri 

On Tue, Jul 16, 2019 at 12:25 PM taher koitawala <[hidden email]> wrote:
Looks like you need a window

On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala <[hidden email]> wrote:
Hi All, 

I am trying to write toRetractSream to CSV which is kind of working ok but I get extra values like True and then my output data values.

Question1 :-
I dont want true in my output data how to achieve this? 

Scree

Question 2:- 
in the output file (CSV) I am missing data in the last line is the toRetractStram closing before writing to file? 

Screen Shot attached

Code:-
val data = kinesis.map(mapFunction)
tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)
tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.OVERWRITE,"\n","|")


--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala

Reply | Threaded
Open this post in threaded view
|

Re: table toRetractStream missing last record and adding extra column (True)

Hequn Cheng
Hi Sri,

Question1:
You can use a map to filter the "true", i.e, ds.map(_._2).
Note, it's ok to remove the "true" flag for distinct as it does not generate updates. For other query contains updates, such as a non-window group by, we should not filter the flag or the result is not correct.

Question 2:
I can't reproduce this problem in my local environment. Maybe there is something wrong with the source data?

Best, Hequn

On Wed, Jul 17, 2019 at 12:53 AM sri hari kali charan Tummala <[hidden email]> wrote:
windows for question 1 or question 2 or both ? 

Thanks
Sri 

On Tue, Jul 16, 2019 at 12:25 PM taher koitawala <[hidden email]> wrote:
Looks like you need a window

On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala <[hidden email]> wrote:
Hi All, 

I am trying to write toRetractSream to CSV which is kind of working ok but I get extra values like True and then my output data values.

Question1 :-
I dont want true in my output data how to achieve this? 

Scree

Question 2:- 
in the output file (CSV) I am missing data in the last line is the toRetractStram closing before writing to file? 

Screen Shot attached

Code:-
val data = kinesis.map(mapFunction)
tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)
tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.OVERWRITE,"\n","|")


--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala

Reply | Threaded
Open this post in threaded view
|

Re: table toRetractStream missing last record and adding extra column (True)

sri hari kali charan Tummala
Question 1:- 


I am trying to convert a Tuple[Boolean,Row] to Row using map function, I am getting this error asking me for InferedR , what is InferedR in FLink?
  val mymapFunction: MapFunction[tuple.Tuple2[Boolean, Row],AnyVal] =
new MapFunction[tuple.Tuple2[Boolean, Row],AnyVal]() {
override def map(t: tuple.Tuple2[Boolean, Row]): Row = {
t.f1
}
/*override def map(t: tuple.Tuple2[Boolean, Row], collector: Collector[Object]): Unit = {
collector.collect(t.f1)
}
*/
}
tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).map(mymapFunction)
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.OVERWRITE,"\n","|")
and when I try to I get a different type of error.

Error:(143, 74) type mismatch;
 found   : org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[scala.Boolean,org.apache.flink.types.Row],AnyVal]
 required: org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[java.lang.Boolean,org.apache.flink.types.Row],?]
    tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).map(mymapFunction)

Question 2:- 
I dont have any source data issue, to regenerate this issue for testing its simple.

create a kinesis stream 
run the producer 

then run the consumer:-

Thanks
Sri 







On Wed, Jul 17, 2019 at 10:03 AM Hequn Cheng <[hidden email]> wrote:
Hi Sri,

Question1:
You can use a map to filter the "true", i.e, ds.map(_._2).
Note, it's ok to remove the "true" flag for distinct as it does not generate updates. For other query contains updates, such as a non-window group by, we should not filter the flag or the result is not correct.

Question 2:
I can't reproduce this problem in my local environment. Maybe there is something wrong with the source data?

Best, Hequn

On Wed, Jul 17, 2019 at 12:53 AM sri hari kali charan Tummala <[hidden email]> wrote:
windows for question 1 or question 2 or both ? 

Thanks
Sri 

On Tue, Jul 16, 2019 at 12:25 PM taher koitawala <[hidden email]> wrote:
Looks like you need a window

On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala <[hidden email]> wrote:
Hi All, 

I am trying to write toRetractSream to CSV which is kind of working ok but I get extra values like True and then my output data values.

Question1 :-
I dont want true in my output data how to achieve this? 

Scree

Question 2:- 
in the output file (CSV) I am missing data in the last line is the toRetractStram closing before writing to file? 

Screen Shot attached

Code:-
val data = kinesis.map(mapFunction)
tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)
tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.OVERWRITE,"\n","|")


--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala

Reply | Threaded
Open this post in threaded view
|

Re: table toRetractStream missing last record and adding extra column (True)

sri hari kali charan Tummala
Amazing all issues resolved in one go thanks Cheng , one issue though I can't write map.(_._2) to CSV looks like it doesn't support right now have to be TextFile.

below is a full code if someone wants in Scala.

Git Code is here:-

package com.aws.examples.kinesis.consumer.transactionExampleScala

import java.util.Properties
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.aws.examples.kinesis.consumer.TransactionExample.TransactionJsonClass
import com.google.gson.Gson
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, ConsumerConfigConstants}
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import java.sql.{DriverManager, Time}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.core.fs.{FileSystem, Path}

object TransactionScalaTest {

/*
extends RetractStreamTableSink[Row]
override def configure(strings: Array[String], typeInformations: Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = ???

override def getFieldNames: Array[String] = ???

override def getFieldTypes: Array[TypeInformation[_]] = ???

override def emitDataStream(dataStream: DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ???

override def getOutputType: TupleTypeInfo[tuple.Tuple2[lang.Boolean, Row]] = super.getOutputType

override def getRecordType: TypeInformation[Row] = ???

*/

def main(args: Array[String]): Unit = {



// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
//env.enableCheckpointing(10000)

val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials

// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON")

// Create Kinesis stream
val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions3", new SimpleStringSchema(), consumerConfig))

val mapFunction: MapFunction[String, (String, String, String, String, String, String, String, String, String, String)] =
new MapFunction[String, (String, String, String, String, String, String, String, String, String, String)]() {

override def map(s: String): (String, String, String, String, String, String, String, String, String, String) = {

val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

val csvData = data.getCc_num + "," +
data.getFirst + "," +
data.getLast + "," +
data.getTrans_num + "," +
data.getTrans_time + "," +
data.getCategory + "," +
data.getMerchant + "," +
data.getAmt + "," +
data.getMerch_lat + "," +
data.getMerch_long

//println(csvData)

val p: Array[String] = csvData.split(",")
var cc_num: String = p(0)
var first: String = p(1)
var last: String = p(2)
var trans_num: String = p(3)
var trans_time: String = p(4)
var category: String = p(5)
var merchant: String = p(6)
var amt: String = p(7)
var merch_lat: String = p(8)
var merch_long: String = p(9)

val creationDate:Time = new Time(System.currentTimeMillis())
return (cc_num, first, last, trans_num, trans_time, category, merchant, amt, merch_lat, merch_long)
}
}


val data = kinesis.map(mapFunction)

tEnv.registerDataStream("transactions", data, 'cc_num,'first_column,'last_column,'trans_num,
'trans_time,'category_column,'merchant_column,'amt_column,'merch_lat,'merch_long)
//tEnv.registerDataStream("transactions", data, "cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num') "
val table = tEnv.sqlQuery(query)

table
.toRetractStream(TypeInformation.of(classOf[Row]))
.map(_._2)
.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut125",FileSystem.WriteMode.OVERWRITE)

table.printSchema()

table.toRetractStream(TypeInformation.of(classOf[Row])).print()

env.execute()

/*

table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut122",
FileSystem.WriteMode.OVERWRITE,
"\n","|")

val test = table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2)

test.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut123",FileSystem.WriteMode.OVERWRITE)

test.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut122",
FileSystem.WriteMode.OVERWRITE,
"\n","|")

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.typeinfo.TypeInformation
implicit val typeInfo = TypeInformation.of(classOf[Row])

val ds = table.toRetractStream(TypeInformation.of(classOf[Row]))

ds.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15",FileSystem.WriteMode.OVERWRITE,
"\n","|")

tEnv.toRetractStream(table, TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15",
FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")

table.distinct().writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
"\n","|")

import org.apache.flink.api.common.time.Time
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.common.typeinfo.TypeInformation

implicit val typeInfo = TypeInformation.of(classOf[Row])

table.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.OVERWRITE, "\n", "|")

table.toRetractStream(TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")

ds.
writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")

tEnv.queryConfig.withIdleStateRetentionTime(Time.minutes(1),Time.minutes(6))

tEnv.toRetractStream(table)
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.OVERWRITE, "\n", "|")

tEnv.toRetractStream(table,classOf[T])

*/

}

}






On Wed, Jul 17, 2019 at 10:11 AM sri hari kali charan Tummala <[hidden email]> wrote:
Question 1:- 


I am trying to convert a Tuple[Boolean,Row] to Row using map function, I am getting this error asking me for InferedR , what is InferedR in FLink?
  val mymapFunction: MapFunction[tuple.Tuple2[Boolean, Row],AnyVal] =
new MapFunction[tuple.Tuple2[Boolean, Row],AnyVal]() {
override def map(t: tuple.Tuple2[Boolean, Row]): Row = {
t.f1
}
/*override def map(t: tuple.Tuple2[Boolean, Row], collector: Collector[Object]): Unit = {
collector.collect(t.f1)
}
*/
}
tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).map(mymapFunction)
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.OVERWRITE,"\n","|")
and when I try to I get a different type of error.

Error:(143, 74) type mismatch;
 found   : org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[scala.Boolean,org.apache.flink.types.Row],AnyVal]
 required: org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[java.lang.Boolean,org.apache.flink.types.Row],?]
    tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).map(mymapFunction)

Question 2:- 
I dont have any source data issue, to regenerate this issue for testing its simple.

create a kinesis stream 
run the producer 

then run the consumer:-

Thanks
Sri 







On Wed, Jul 17, 2019 at 10:03 AM Hequn Cheng <[hidden email]> wrote:
Hi Sri,

Question1:
You can use a map to filter the "true", i.e, ds.map(_._2).
Note, it's ok to remove the "true" flag for distinct as it does not generate updates. For other query contains updates, such as a non-window group by, we should not filter the flag or the result is not correct.

Question 2:
I can't reproduce this problem in my local environment. Maybe there is something wrong with the source data?

Best, Hequn

On Wed, Jul 17, 2019 at 12:53 AM sri hari kali charan Tummala <[hidden email]> wrote:
windows for question 1 or question 2 or both ? 

Thanks
Sri 

On Tue, Jul 16, 2019 at 12:25 PM taher koitawala <[hidden email]> wrote:
Looks like you need a window

On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala <[hidden email]> wrote:
Hi All, 

I am trying to write toRetractSream to CSV which is kind of working ok but I get extra values like True and then my output data values.

Question1 :-
I dont want true in my output data how to achieve this? 

Scree

Question 2:- 
in the output file (CSV) I am missing data in the last line is the toRetractStram closing before writing to file? 

Screen Shot attached

Code:-
val data = kinesis.map(mapFunction)
tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)
tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.OVERWRITE,"\n","|")


--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala

Reply | Threaded
Open this post in threaded view
|

Re: table toRetractStream missing last record and adding extra column (True)

sri hari kali charan Tummala
yes even the delimiter can be replaced, have to test what happens if the data itself has a comma in it I need to test.

table.toRetractStream(TypeInformation.of(classOf[Row]))
.map(_._2.toString.replaceAll(",","~"))
.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut125", FileSystem.WriteMode.OVERWRITE)

On Wed, Jul 17, 2019 at 6:47 PM sri hari kali charan Tummala <[hidden email]> wrote:
Amazing all issues resolved in one go thanks Cheng , one issue though I can't write map.(_._2) to CSV looks like it doesn't support right now have to be TextFile.

below is a full code if someone wants in Scala.

Git Code is here:-

package com.aws.examples.kinesis.consumer.transactionExampleScala

import java.util.Properties
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.aws.examples.kinesis.consumer.TransactionExample.TransactionJsonClass
import com.google.gson.Gson
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, ConsumerConfigConstants}
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import java.sql.{DriverManager, Time}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.core.fs.{FileSystem, Path}

object TransactionScalaTest {

/*
extends RetractStreamTableSink[Row]
override def configure(strings: Array[String], typeInformations: Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = ???

override def getFieldNames: Array[String] = ???

override def getFieldTypes: Array[TypeInformation[_]] = ???

override def emitDataStream(dataStream: DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ???

override def getOutputType: TupleTypeInfo[tuple.Tuple2[lang.Boolean, Row]] = super.getOutputType

override def getRecordType: TypeInformation[Row] = ???

*/

def main(args: Array[String]): Unit = {



// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
//env.enableCheckpointing(10000)

val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials

// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON")

// Create Kinesis stream
val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions3", new SimpleStringSchema(), consumerConfig))

val mapFunction: MapFunction[String, (String, String, String, String, String, String, String, String, String, String)] =
new MapFunction[String, (String, String, String, String, String, String, String, String, String, String)]() {

override def map(s: String): (String, String, String, String, String, String, String, String, String, String) = {

val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

val csvData = data.getCc_num + "," +
data.getFirst + "," +
data.getLast + "," +
data.getTrans_num + "," +
data.getTrans_time + "," +
data.getCategory + "," +
data.getMerchant + "," +
data.getAmt + "," +
data.getMerch_lat + "," +
data.getMerch_long

//println(csvData)

val p: Array[String] = csvData.split(",")
var cc_num: String = p(0)
var first: String = p(1)
var last: String = p(2)
var trans_num: String = p(3)
var trans_time: String = p(4)
var category: String = p(5)
var merchant: String = p(6)
var amt: String = p(7)
var merch_lat: String = p(8)
var merch_long: String = p(9)

val creationDate:Time = new Time(System.currentTimeMillis())
return (cc_num, first, last, trans_num, trans_time, category, merchant, amt, merch_lat, merch_long)
}
}


val data = kinesis.map(mapFunction)

tEnv.registerDataStream("transactions", data, 'cc_num,'first_column,'last_column,'trans_num,
'trans_time,'category_column,'merchant_column,'amt_column,'merch_lat,'merch_long)
//tEnv.registerDataStream("transactions", data, "cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num') "
val table = tEnv.sqlQuery(query)

table
.toRetractStream(TypeInformation.of(classOf[Row]))
.map(_._2)
.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut125",FileSystem.WriteMode.OVERWRITE)

table.printSchema()

table.toRetractStream(TypeInformation.of(classOf[Row])).print()

env.execute()

/*

table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut122",
FileSystem.WriteMode.OVERWRITE,
"\n","|")

val test = table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2)

test.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut123",FileSystem.WriteMode.OVERWRITE)

test.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut122",
FileSystem.WriteMode.OVERWRITE,
"\n","|")

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.typeinfo.TypeInformation
implicit val typeInfo = TypeInformation.of(classOf[Row])

val ds = table.toRetractStream(TypeInformation.of(classOf[Row]))

ds.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15",FileSystem.WriteMode.OVERWRITE,
"\n","|")

tEnv.toRetractStream(table, TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15",
FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")

table.distinct().writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
"\n","|")

import org.apache.flink.api.common.time.Time
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.common.typeinfo.TypeInformation

implicit val typeInfo = TypeInformation.of(classOf[Row])

table.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.OVERWRITE, "\n", "|")

table.toRetractStream(TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")

ds.
writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")

tEnv.queryConfig.withIdleStateRetentionTime(Time.minutes(1),Time.minutes(6))

tEnv.toRetractStream(table)
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.OVERWRITE, "\n", "|")

tEnv.toRetractStream(table,classOf[T])

*/

}

}






On Wed, Jul 17, 2019 at 10:11 AM sri hari kali charan Tummala <[hidden email]> wrote:
Question 1:- 


I am trying to convert a Tuple[Boolean,Row] to Row using map function, I am getting this error asking me for InferedR , what is InferedR in FLink?
  val mymapFunction: MapFunction[tuple.Tuple2[Boolean, Row],AnyVal] =
new MapFunction[tuple.Tuple2[Boolean, Row],AnyVal]() {
override def map(t: tuple.Tuple2[Boolean, Row]): Row = {
t.f1
}
/*override def map(t: tuple.Tuple2[Boolean, Row], collector: Collector[Object]): Unit = {
collector.collect(t.f1)
}
*/
}
tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).map(mymapFunction)
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.OVERWRITE,"\n","|")
and when I try to I get a different type of error.

Error:(143, 74) type mismatch;
 found   : org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[scala.Boolean,org.apache.flink.types.Row],AnyVal]
 required: org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[java.lang.Boolean,org.apache.flink.types.Row],?]
    tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).map(mymapFunction)

Question 2:- 
I dont have any source data issue, to regenerate this issue for testing its simple.

create a kinesis stream 
run the producer 

then run the consumer:-

Thanks
Sri 







On Wed, Jul 17, 2019 at 10:03 AM Hequn Cheng <[hidden email]> wrote:
Hi Sri,

Question1:
You can use a map to filter the "true", i.e, ds.map(_._2).
Note, it's ok to remove the "true" flag for distinct as it does not generate updates. For other query contains updates, such as a non-window group by, we should not filter the flag or the result is not correct.

Question 2:
I can't reproduce this problem in my local environment. Maybe there is something wrong with the source data?

Best, Hequn

On Wed, Jul 17, 2019 at 12:53 AM sri hari kali charan Tummala <[hidden email]> wrote:
windows for question 1 or question 2 or both ? 

Thanks
Sri 

On Tue, Jul 16, 2019 at 12:25 PM taher koitawala <[hidden email]> wrote:
Looks like you need a window

On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala <[hidden email]> wrote:
Hi All, 

I am trying to write toRetractSream to CSV which is kind of working ok but I get extra values like True and then my output data values.

Question1 :-
I dont want true in my output data how to achieve this? 

Scree

Question 2:- 
in the output file (CSV) I am missing data in the last line is the toRetractStram closing before writing to file? 

Screen Shot attached

Code:-
val data = kinesis.map(mapFunction)
tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)
tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.OVERWRITE,"\n","|")


--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala



--
Thanks & Regards
Sri Tummala