Posted by
Nico Kruber on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/CsvSink-tp18974p19017.html
Hi Karim,
when I was trying to reproduce your code, I got an exception with the
name 'table' being used - by replacing it and completing the job with
some input, I did see the csv file popping up. Also, the job was
crashing when the file 1.txt already existed.
The code I used (running Flink 1.5-SNAPSHOT):
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
.assignAscendingTimestamps(_._2)
tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
val results = tableEnv.sqlQuery( """
|SELECT
| A,C
| FROM mytable
""".stripMargin)
val result: Table = results
val path = "file:///tmp/test/1.txt"
val sink :TableSink[Row]= new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.NO_OVERWRITE)
result.writeToSink(sink)
env.execute("this job")
}
def get3TupleDataStream(env: StreamExecutionEnvironment):
DataStream[(Int, Long, String)] = {
val data = new mutable.MutableList[(Int, Long, String)]
data.+=((1, 1L, "Hi"))
data.+=((2, 2L, "Hello"))
data.+=((3, 2L, "Hello world"))
data.+=((4, 3L, "Hello world, how are you?"))
data.+=((5, 3L, "I am fine."))
data.+=((6, 3L, "Luke Skywalker"))
env.fromCollection(data)
}
Nico
On 16/03/18 22:50, karim amer wrote:
> Hi There,
>
> I am trying to write a CSVsink to disk but it's not getting written. I
> think the file is getting overwritten or truncated once The Stream
> process finishes. Does anyone know why the file is getting overwritten
> or truncated and how can i fix this ?
>
>
> tableEnv.registerDataStream("table", watermarkedStream)
>
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A
> | FROM table
> """.stripMargin)
>
>
>
> val result: Table = results
>
> val path = "file:///path/test/1.txt"
> val sink :TableSink[Row]= new CsvTableSink(
> path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
>
> result.writeToSink(sink)
>
> env.execute("this job")
>
>
>
>
> Thanks