Hi There, I am trying to write a CSVsink to disk but it's not getting written. I think the file is getting overwritten or truncated once The Stream process finishes. Does anyone know why the file is getting overwritten or truncated and how can i fix this ?
Thanks |
Hi Karim,
when I was trying to reproduce your code, I got an exception with the name 'table' being used - by replacing it and completing the job with some input, I did see the csv file popping up. Also, the job was crashing when the file 1.txt already existed. The code I used (running Flink 1.5-SNAPSHOT): def main(args: Array[String]) { // set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env) .assignAscendingTimestamps(_._2) tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C) val results = tableEnv.sqlQuery( """ |SELECT | A,C | FROM mytable """.stripMargin) val result: Table = results val path = "file:///tmp/test/1.txt" val sink :TableSink[Row]= new CsvTableSink( path, // output path fieldDelim = "|", // optional: delimit files by '|' numFiles = 1, // optional: write to a single file writeMode = WriteMode.NO_OVERWRITE) result.writeToSink(sink) env.execute("this job") } def get3TupleDataStream(env: StreamExecutionEnvironment): DataStream[(Int, Long, String)] = { val data = new mutable.MutableList[(Int, Long, String)] data.+=((1, 1L, "Hi")) data.+=((2, 2L, "Hello")) data.+=((3, 2L, "Hello world")) data.+=((4, 3L, "Hello world, how are you?")) data.+=((5, 3L, "I am fine.")) data.+=((6, 3L, "Luke Skywalker")) env.fromCollection(data) } Nico On 16/03/18 22:50, karim amer wrote: > Hi There, > > I am trying to write a CSVsink to disk but it's not getting written. I > think the file is getting overwritten or truncated once The Stream > process finishes. Does anyone know why the file is getting overwritten > or truncated and how can i fix this ? > > > tableEnv.registerDataStream("table", watermarkedStream) > > val results = tableEnv.sqlQuery( """ > |SELECT > | A > | FROM table > """.stripMargin) > > > > val result: Table = results > > val path = "file:///path/test/1.txt" > val sink :TableSink[Row]= new CsvTableSink( > path, // output path > fieldDelim = "|", // optional: delimit files by '|' > numFiles = 1, // optional: write to a single file > writeMode = WriteMode.NO_OVERWRITE) > > result.writeToSink(sink) > > env.execute("this job") > > > > > Thanks signature.asc (201 bytes) Download Attachment |
Hi Nico, I tried to reproduce your code but registerDataStream keeps failing to register the fields even though i am following your code and the Docs.here is the error I think my code snippet was misleading. Here is the full snippet Changing the name from table didn't fix it for [error] found : Symbol [error] required: org.apache.flink.table.expressions.Expression [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C ) [error] import org.apache.flink.streaming.api.scala._ On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <[hidden email]> wrote: Hi Karim, -- karim amer |
Hi Karim, I cannot find a method invocation "tableEnv.registerDataStream("2018-03-20 0:24 GMT+01:00 karim amer <[hidden email]>:
|
Hi Fabian Sorry if i confused you The first error is from Nico's code Not my code or snippet I am still having the original problem in my snippet where it's writing a blank csv file even though i get [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM After running the job Cheers, karim On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <[hidden email]> wrote:
-- karim amer |
After switching to Maven from Sbt I got these errors Should i file a bug report ?Error:(63, 37) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls] val namedStream = dataStream.map((value:String) => { Error:(63, 37) not enough arguments for method map: (implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls])org.apache.flink.streaming.api.scala.DataStream[org.apache.flink.quickstart.DataStreamtotableapi.Calls]. Unspecified value parameter evidence$7. val namedStream = dataStream.map((value:String) => { On Tue, Mar 20, 2018 at 9:30 AM, karim amer <[hidden email]> wrote:
-- karim amer |
To clarify should i file a bug report on sbt hiding the errors in the previous email ? On Tue, Mar 20, 2018 at 9:44 AM, karim amer <[hidden email]> wrote:
-- karim amer |
Never mind after importing Sorry my badimport org.apache.flink.api.scala._theses errors went away and i still have the original problem. On Tue, Mar 20, 2018 at 11:04 AM, karim amer <[hidden email]> wrote:
-- karim amer |
here is the output after fixing the scala issues https://gist.github.com/karimamer/9e3bcf0a6d9110c01caa2ebd14aa7a8c On Tue, Mar 20, 2018 at 11:39 AM, karim amer <[hidden email]> wrote:
-- karim amer |
Never mind I found the error and has nothing to do with flink. Sorry On Tue, Mar 20, 2018 at 12:12 PM, karim amer <[hidden email]> wrote:
-- karim amer |
Great, thanks for reporting back! Best, Fabian2018-03-20 22:40 GMT+01:00 karim amer <[hidden email]>:
|
Free forum by Nabble | Edit this page |