Re: CsvSink

Posted by Karim Amer on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/CsvSink-tp18974p19031.html

Hi Nico,

I tried to reproduce your code but registerDataStream keeps failing to register the fields even though i am following your code and the Docs.
here is the error
[error]  found   : Symbol
[error]  required: org.apache.flink.table.expressions.Expression
[error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
[error]  
I think my code snippet was misleading. Here is the full snippet Changing the name from table didn't fix it for

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.types.Row



object datastreamtotableapi {

case class Calls(a: String,
b: String,
c: String,
d: String,
e: String,
f: String,
g: String,
h: String,
i: String,
j: String,
k: String,
l: String,
m: String,
n: String,
p: String,
q: String,
r: String,
s: String,
t: String,
v: String,
w: String)


def main(args: Array[String]) {

val params = ParameterTool.fromArgs(args)
val input = params.getRequired("input")


val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tableEnv = TableEnvironment.getTableEnvironment(env)

val dataStream = env.readTextFile(input)

val namedStream = dataStream.map((value:String) => {

val columns = value.split(",")
Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
columns(18),columns(19), columns(20)
)
})


val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")

val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
})



tableEnv.registerDataStream("CDRS", watermarkedStream)
val results = tableEnv.sqlQuery( """
|SELECT
| a
| FROM CDRS
""".stripMargin)


val result: Table = results

val path = "file:///Users/test/1.txt"
val sink :TableSink[Row]= new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.OVERWRITE)

result.writeToSink(sink)


env.execute("this job")

}
}



On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <[hidden email]> wrote:
Hi Karim,
when I was trying to reproduce your code, I got an exception with the
name 'table' being used - by replacing it and completing the job with
some input, I did see the csv file popping up. Also, the job was
crashing when the file 1.txt already existed.

The code I used (running Flink 1.5-SNAPSHOT):

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
      .assignAscendingTimestamps(_._2)
    tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)

    val results = tableEnv.sqlQuery( """
                                       |SELECT
                                       | A,C
                                       | FROM mytable
                                     """.stripMargin)

    val result: Table = results

    val path = "file:///tmp/test/1.txt"
    val sink :TableSink[Row]=   new CsvTableSink(
      path,                             // output path
      fieldDelim = "|",                 // optional: delimit files by '|'
      numFiles = 1,                     // optional: write to a single file
      writeMode = WriteMode.NO_OVERWRITE)

    result.writeToSink(sink)

    env.execute("this job")
  }

  def get3TupleDataStream(env: StreamExecutionEnvironment):
DataStream[(Int, Long, String)] = {
    val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))
    data.+=((5, 3L, "I am fine."))
    data.+=((6, 3L, "Luke Skywalker"))
    env.fromCollection(data)
  }


Nico

On 16/03/18 22:50, karim amer wrote:
> Hi There,
>
>  I am trying to write a CSVsink to disk but it's not getting written. I
> think the file is getting overwritten or truncated once The Stream
> process finishes. Does anyone know why the file is getting overwritten
> or truncated and how can i fix this ?
>
>
> tableEnv.registerDataStream("table", watermarkedStream)
>
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A
> | FROM table
> """.stripMargin)
>
>
>
> val result: Table = results
>
> val path = "file:///path/test/1.txt"
> val sink :TableSink[Row]=   new CsvTableSink(
>   path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
>
> result.writeToSink(sink)
>
> env.execute("this job")
>
>
>
>
> Thanks




--
karim amer