table toRetractStream missing last record and adding extra column (True)

table toRetractStream missing last record and adding extra column (True)

sri hari kali charan Tummala
Hi All, 

I am trying to write toRetractSream to CSV which is kind of working ok but I get extra values like True and then my output data values.

Question1 :-
I dont want true in my output data how to achieve this? 


Question 2:- 
in the output file (CSV) I am missing data in the last line is the toRetractStram closing before writing to file? 

Screen Shot attached

val data =
val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)
tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])

Thanks & Regards
Sri Tummala

taher koitawala
Looks like you need a window

sri hari kali charan Tummala
windows for question 1 or question 2 or both ? 


Hequn Cheng
Hi Sri,

You can use a map to filter the "true", i.e,
Note, it's ok to remove the "true" flag for distinct as it does not generate updates. For other query contains updates, such as a non-window group by, we should not filter the flag or the result is not correct.

Question 2:
I can't reproduce this problem in my local environment. Maybe there is something wrong with the source data?

Best, Hequn

sri hari kali charan Tummala
Question 1:- 

I am trying to convert a Tuple[Boolean,Row] to Row using map function, I am getting this error asking me for InferedR , what is InferedR in FLink?
  val mymapFunction: MapFunction[tuple.Tuple2[Boolean, Row],AnyVal] =
new MapFunction[tuple.Tuple2[Boolean, Row],AnyVal]() {
override def map(t: tuple.Tuple2[Boolean, Row]): Row = {
/*override def map(t: tuple.Tuple2[Boolean, Row], collector: Collector[Object]): Unit = {
tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).map(mymapFunction)
and when I try to I get a different type of error.

Error:(143, 74) type mismatch;
 found   : org.apache.flink.api.common.functions.MapFunction[[scala.Boolean,org.apache.flink.types.Row],AnyVal]
 required: org.apache.flink.api.common.functions.MapFunction[[java.lang.Boolean,org.apache.flink.types.Row],?]
    tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).map(mymapFunction)

Question 2:- 
I dont have any source data issue, to regenerate this issue for testing its simple.

create a kinesis stream 
run the producer 

then run the consumer:-


sri hari kali charan Tummala
Amazing all issues resolved in one go thanks Cheng , one issue though I can't write map.(_._2) to CSV looks like it doesn't support right now have to be TextFile.

below is a full code if someone wants in Scala.

Git Code is here:-


import java.util.Properties
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, ConsumerConfigConstants}
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import java.sql.{DriverManager, Time}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.core.fs.{FileSystem, Path}

object TransactionScalaTest {

extends RetractStreamTableSink[Row]
override def configure(strings: Array[String], typeInformations: Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = ???

override def getFieldNames: Array[String] = ???

override def getFieldTypes: Array[TypeInformation[_]] = ???

override def emitDataStream(dataStream: DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ???

override def getOutputType: TupleTypeInfo[tuple.Tuple2[lang.Boolean, Row]] = super.getOutputType

override def getRecordType: TypeInformation[Row] = ???


def main(args: Array[String]): Unit = {

// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials

// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON")

// Create Kinesis stream
val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions3", new SimpleStringSchema(), consumerConfig))

val mapFunction: MapFunction[String, (String, String, String, String, String, String, String, String, String, String)] =
new MapFunction[String, (String, String, String, String, String, String, String, String, String, String)]() {

override def map(s: String): (String, String, String, String, String, String, String, String, String, String) = {

val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

val csvData = data.getCc_num + "," +
data.getFirst + "," +
data.getLast + "," +
data.getTrans_num + "," +
data.getTrans_time + "," +
data.getCategory + "," +
data.getMerchant + "," +
data.getAmt + "," +
data.getMerch_lat + "," +


val p: Array[String] = csvData.split(",")
var cc_num: String = p(0)
var first: String = p(1)
var last: String = p(2)
var trans_num: String = p(3)
var trans_time: String = p(4)
var category: String = p(5)
var merchant: String = p(6)
var amt: String = p(7)
var merch_lat: String = p(8)
var merch_long: String = p(9)

val creationDate:Time = new Time(System.currentTimeMillis())
return (cc_num, first, last, trans_num, trans_time, category, merchant, amt, merch_lat, merch_long)

val data =

tEnv.registerDataStream("transactions", data, 'cc_num,'first_column,'last_column,'trans_num,
//tEnv.registerDataStream("transactions", data, "cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num') "
val table = tEnv.sqlQuery(query)







val test = table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2)



import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.typeinfo.TypeInformation
implicit val typeInfo = TypeInformation.of(classOf[Row])

val ds = table.toRetractStream(TypeInformation.of(classOf[Row]))


tEnv.toRetractStream(table, TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15",
FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")


import org.apache.flink.api.common.time.Time
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.common.typeinfo.TypeInformation

implicit val typeInfo = TypeInformation.of(classOf[Row])

FileSystem.WriteMode.OVERWRITE, "\n", "|")

FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")

FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")


FileSystem.WriteMode.OVERWRITE, "\n", "|")





sri hari kali charan Tummala
yes even the delimiter can be replaced, have to test what happens if the data itself has a comma in it I need to test.

.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut125", FileSystem.WriteMode.OVERWRITE)

