CsvSink

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

CsvSink

Karim Amer
Hi There,

 I am trying to write a CSVsink to disk but it's not getting written. I think the file is getting overwritten or truncated once The Stream process finishes. Does anyone know why the file is getting overwritten or truncated and how can i fix this ?

tableEnv.registerDataStream("table", watermarkedStream)

val results = tableEnv.sqlQuery( """
|SELECT
| A
| FROM table
""".stripMargin)



val result: Table = results

val path = "file:///path/test/1.txt"
val sink :TableSink[Row]= new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.NO_OVERWRITE)

result.writeToSink(sink)
env.execute("this job")



Thanks
Reply | Threaded
Open this post in threaded view
|

Re: CsvSink

Nico Kruber
Hi Karim,
when I was trying to reproduce your code, I got an exception with the
name 'table' being used - by replacing it and completing the job with
some input, I did see the csv file popping up. Also, the job was
crashing when the file 1.txt already existed.

The code I used (running Flink 1.5-SNAPSHOT):

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
      .assignAscendingTimestamps(_._2)
    tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)

    val results = tableEnv.sqlQuery( """
                                       |SELECT
                                       | A,C
                                       | FROM mytable
                                     """.stripMargin)

    val result: Table = results

    val path = "file:///tmp/test/1.txt"
    val sink :TableSink[Row]=   new CsvTableSink(
      path,                             // output path
      fieldDelim = "|",                 // optional: delimit files by '|'
      numFiles = 1,                     // optional: write to a single file
      writeMode = WriteMode.NO_OVERWRITE)

    result.writeToSink(sink)

    env.execute("this job")
  }

  def get3TupleDataStream(env: StreamExecutionEnvironment):
DataStream[(Int, Long, String)] = {
    val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))
    data.+=((5, 3L, "I am fine."))
    data.+=((6, 3L, "Luke Skywalker"))
    env.fromCollection(data)
  }


Nico

On 16/03/18 22:50, karim amer wrote:

> Hi There,
>
>  I am trying to write a CSVsink to disk but it's not getting written. I
> think the file is getting overwritten or truncated once The Stream
> process finishes. Does anyone know why the file is getting overwritten
> or truncated and how can i fix this ?
>
>
> tableEnv.registerDataStream("table", watermarkedStream)
>
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A
> | FROM table
> """.stripMargin)
>
>
>
> val result: Table = results
>
> val path = "file:///path/test/1.txt"
> val sink :TableSink[Row]=   new CsvTableSink(
>   path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
>
> result.writeToSink(sink)
>
> env.execute("this job")
>
>
>
>
> Thanks


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: CsvSink

Karim Amer
Hi Nico,

I tried to reproduce your code but registerDataStream keeps failing to register the fields even though i am following your code and the Docs.
here is the error
[error]  found   : Symbol
[error]  required: org.apache.flink.table.expressions.Expression
[error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
[error]  
I think my code snippet was misleading. Here is the full snippet Changing the name from table didn't fix it for

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.types.Row



object datastreamtotableapi {

case class Calls(a: String,
b: String,
c: String,
d: String,
e: String,
f: String,
g: String,
h: String,
i: String,
j: String,
k: String,
l: String,
m: String,
n: String,
p: String,
q: String,
r: String,
s: String,
t: String,
v: String,
w: String)


def main(args: Array[String]) {

val params = ParameterTool.fromArgs(args)
val input = params.getRequired("input")


val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tableEnv = TableEnvironment.getTableEnvironment(env)

val dataStream = env.readTextFile(input)

val namedStream = dataStream.map((value:String) => {

val columns = value.split(",")
Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
columns(18),columns(19), columns(20)
)
})


val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")

val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
})



tableEnv.registerDataStream("CDRS", watermarkedStream)
val results = tableEnv.sqlQuery( """
|SELECT
| a
| FROM CDRS
""".stripMargin)


val result: Table = results

val path = "file:///Users/test/1.txt"
val sink :TableSink[Row]= new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.OVERWRITE)

result.writeToSink(sink)


env.execute("this job")

}
}



On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <[hidden email]> wrote:
Hi Karim,
when I was trying to reproduce your code, I got an exception with the
name 'table' being used - by replacing it and completing the job with
some input, I did see the csv file popping up. Also, the job was
crashing when the file 1.txt already existed.

The code I used (running Flink 1.5-SNAPSHOT):

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
      .assignAscendingTimestamps(_._2)
    tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)

    val results = tableEnv.sqlQuery( """
                                       |SELECT
                                       | A,C
                                       | FROM mytable
                                     """.stripMargin)

    val result: Table = results

    val path = "file:///tmp/test/1.txt"
    val sink :TableSink[Row]=   new CsvTableSink(
      path,                             // output path
      fieldDelim = "|",                 // optional: delimit files by '|'
      numFiles = 1,                     // optional: write to a single file
      writeMode = WriteMode.NO_OVERWRITE)

    result.writeToSink(sink)

    env.execute("this job")
  }

  def get3TupleDataStream(env: StreamExecutionEnvironment):
DataStream[(Int, Long, String)] = {
    val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))
    data.+=((5, 3L, "I am fine."))
    data.+=((6, 3L, "Luke Skywalker"))
    env.fromCollection(data)
  }


Nico

On 16/03/18 22:50, karim amer wrote:
> Hi There,
>
>  I am trying to write a CSVsink to disk but it's not getting written. I
> think the file is getting overwritten or truncated once The Stream
> process finishes. Does anyone know why the file is getting overwritten
> or truncated and how can i fix this ?
>
>
> tableEnv.registerDataStream("table", watermarkedStream)
>
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A
> | FROM table
> """.stripMargin)
>
>
>
> val result: Table = results
>
> val path = "file:///path/test/1.txt"
> val sink :TableSink[Row]=   new CsvTableSink(
>   path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
>
> result.writeToSink(sink)
>
> env.execute("this job")
>
>
>
>
> Thanks




--
karim amer 

Reply | Threaded
Open this post in threaded view
|

Re: CsvSink

Fabian Hueske-2
Hi Karim,

I cannot find a method invocation "tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )" as shown in the error message in your example.
It would help if you would keep error message and code consistent. Otherwise it's not possible to figure out what's going on.

Best, Fabian

2018-03-20 0:24 GMT+01:00 karim amer <[hidden email]>:
Hi Nico,

I tried to reproduce your code but registerDataStream keeps failing to register the fields even though i am following your code and the Docs.
here is the error
[error]  found   : Symbol
[error]  required: org.apache.flink.table.expressions.Expression
[error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
[error]  
I think my code snippet was misleading. Here is the full snippet Changing the name from table didn't fix it for

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.types.Row



object datastreamtotableapi {

case class Calls(a: String,
b: String,
c: String,
d: String,
e: String,
f: String,
g: String,
h: String,
i: String,
j: String,
k: String,
l: String,
m: String,
n: String,
p: String,
q: String,
r: String,
s: String,
t: String,
v: String,
w: String)


def main(args: Array[String]) {

val params = ParameterTool.fromArgs(args)
val input = params.getRequired("input")


val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tableEnv = TableEnvironment.getTableEnvironment(env)

val dataStream = env.readTextFile(input)

val namedStream = dataStream.map((value:String) => {

val columns = value.split(",")
Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
columns(18),columns(19), columns(20)
)
})


val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")

val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
})



tableEnv.registerDataStream("CDRS", watermarkedStream)
val results = tableEnv.sqlQuery( """
|SELECT
| a
| FROM CDRS
""".stripMargin)


val result: Table = results

val path = "file:///Users/test/1.txt"
val sink :TableSink[Row]= new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.OVERWRITE)

result.writeToSink(sink)


env.execute("this job")

}
}



On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <[hidden email]> wrote:
Hi Karim,
when I was trying to reproduce your code, I got an exception with the
name 'table' being used - by replacing it and completing the job with
some input, I did see the csv file popping up. Also, the job was
crashing when the file 1.txt already existed.

The code I used (running Flink 1.5-SNAPSHOT):

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
      .assignAscendingTimestamps(_._2)
    tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)

    val results = tableEnv.sqlQuery( """
                                       |SELECT
                                       | A,C
                                       | FROM mytable
                                     """.stripMargin)

    val result: Table = results

    val path = "file:///tmp/test/1.txt"
    val sink :TableSink[Row]=   new CsvTableSink(
      path,                             // output path
      fieldDelim = "|",                 // optional: delimit files by '|'
      numFiles = 1,                     // optional: write to a single file
      writeMode = WriteMode.NO_OVERWRITE)

    result.writeToSink(sink)

    env.execute("this job")
  }

  def get3TupleDataStream(env: StreamExecutionEnvironment):
DataStream[(Int, Long, String)] = {
    val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))
    data.+=((5, 3L, "I am fine."))
    data.+=((6, 3L, "Luke Skywalker"))
    env.fromCollection(data)
  }


Nico

On 16/03/18 22:50, karim amer wrote:
> Hi There,
>
>  I am trying to write a CSVsink to disk but it's not getting written. I
> think the file is getting overwritten or truncated once The Stream
> process finishes. Does anyone know why the file is getting overwritten
> or truncated and how can i fix this ?
>
>
> tableEnv.registerDataStream("table", watermarkedStream)
>
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A
> | FROM table
> """.stripMargin)
>
>
>
> val result: Table = results
>
> val path = "file:///path/test/1.txt"
> val sink :TableSink[Row]=   new CsvTableSink(
>   path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
>
> result.writeToSink(sink)
>
> env.execute("this job")
>
>
>
>
> Thanks




--
karim amer 


Reply | Threaded
Open this post in threaded view
|

Re: CsvSink

Karim Amer
Hi Fabian
Sorry if i confused you The first error is from Nico's code Not my code or  snippet
I am still having the original problem in my snippet where it's writing a blank csv file even though i get
[success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
After running the job

Cheers,
karim

On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <[hidden email]> wrote:
Hi Karim,

I cannot find a method invocation "tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )" as shown in the error message in your example.
It would help if you would keep error message and code consistent. Otherwise it's not possible to figure out what's going on.

Best, Fabian

2018-03-20 0:24 GMT+01:00 karim amer <[hidden email]>:
Hi Nico,

I tried to reproduce your code but registerDataStream keeps failing to register the fields even though i am following your code and the Docs.
here is the error
[error]  found   : Symbol
[error]  required: org.apache.flink.table.expressions.Expression
[error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
[error]  
I think my code snippet was misleading. Here is the full snippet Changing the name from table didn't fix it for

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.types.Row



object datastreamtotableapi {

case class Calls(a: String,
b: String,
c: String,
d: String,
e: String,
f: String,
g: String,
h: String,
i: String,
j: String,
k: String,
l: String,
m: String,
n: String,
p: String,
q: String,
r: String,
s: String,
t: String,
v: String,
w: String)


def main(args: Array[String]) {

val params = ParameterTool.fromArgs(args)
val input = params.getRequired("input")


val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tableEnv = TableEnvironment.getTableEnvironment(env)

val dataStream = env.readTextFile(input)

val namedStream = dataStream.map((value:String) => {

val columns = value.split(",")
Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
columns(18),columns(19), columns(20)
)
})


val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")

val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
})



tableEnv.registerDataStream("CDRS", watermarkedStream)
val results = tableEnv.sqlQuery( """
|SELECT
| a
| FROM CDRS
""".stripMargin)


val result: Table = results

val path = "file:///Users/test/1.txt"
val sink :TableSink[Row]= new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.OVERWRITE)

result.writeToSink(sink)


env.execute("this job")

}
}



On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <[hidden email]> wrote:
Hi Karim,
when I was trying to reproduce your code, I got an exception with the
name 'table' being used - by replacing it and completing the job with
some input, I did see the csv file popping up. Also, the job was
crashing when the file 1.txt already existed.

The code I used (running Flink 1.5-SNAPSHOT):

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
      .assignAscendingTimestamps(_._2)
    tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)

    val results = tableEnv.sqlQuery( """
                                       |SELECT
                                       | A,C
                                       | FROM mytable
                                     """.stripMargin)

    val result: Table = results

    val path = "file:///tmp/test/1.txt"
    val sink :TableSink[Row]=   new CsvTableSink(
      path,                             // output path
      fieldDelim = "|",                 // optional: delimit files by '|'
      numFiles = 1,                     // optional: write to a single file
      writeMode = WriteMode.NO_OVERWRITE)

    result.writeToSink(sink)

    env.execute("this job")
  }

  def get3TupleDataStream(env: StreamExecutionEnvironment):
DataStream[(Int, Long, String)] = {
    val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))
    data.+=((5, 3L, "I am fine."))
    data.+=((6, 3L, "Luke Skywalker"))
    env.fromCollection(data)
  }


Nico

On 16/03/18 22:50, karim amer wrote:
> Hi There,
>
>  I am trying to write a CSVsink to disk but it's not getting written. I
> think the file is getting overwritten or truncated once The Stream
> process finishes. Does anyone know why the file is getting overwritten
> or truncated and how can i fix this ?
>
>
> tableEnv.registerDataStream("table", watermarkedStream)
>
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A
> | FROM table
> """.stripMargin)
>
>
>
> val result: Table = results
>
> val path = "file:///path/test/1.txt"
> val sink :TableSink[Row]=   new CsvTableSink(
>   path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
>
> result.writeToSink(sink)
>
> env.execute("this job")
>
>
>
>
> Thanks




--
karim amer 





--
karim amer 

Reply | Threaded
Open this post in threaded view
|

Re: CsvSink

Karim Amer
After switching to Maven from Sbt I got these errors
Error:(63, 37) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls]
    val namedStream = dataStream.map((value:String) => {



Error:(63, 37) not enough arguments for method map: (implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls])org.apache.flink.streaming.api.scala.DataStream[org.apache.flink.quickstart.DataStreamtotableapi.Calls].
Unspecified value parameter evidence$7.
    val namedStream = dataStream.map((value:String) => {


Should i file a bug report  ?

On Tue, Mar 20, 2018 at 9:30 AM, karim amer <[hidden email]> wrote:
Hi Fabian
Sorry if i confused you The first error is from Nico's code Not my code or  snippet
I am still having the original problem in my snippet where it's writing a blank csv file even though i get
[success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
After running the job

Cheers,
karim

On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <[hidden email]> wrote:
Hi Karim,

I cannot find a method invocation "tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )" as shown in the error message in your example.
It would help if you would keep error message and code consistent. Otherwise it's not possible to figure out what's going on.

Best, Fabian

2018-03-20 0:24 GMT+01:00 karim amer <[hidden email]>:
Hi Nico,

I tried to reproduce your code but registerDataStream keeps failing to register the fields even though i am following your code and the Docs.
here is the error
[error]  found   : Symbol
[error]  required: org.apache.flink.table.expressions.Expression
[error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
[error]  
I think my code snippet was misleading. Here is the full snippet Changing the name from table didn't fix it for

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.types.Row



object datastreamtotableapi {

case class Calls(a: String,
b: String,
c: String,
d: String,
e: String,
f: String,
g: String,
h: String,
i: String,
j: String,
k: String,
l: String,
m: String,
n: String,
p: String,
q: String,
r: String,
s: String,
t: String,
v: String,
w: String)


def main(args: Array[String]) {

val params = ParameterTool.fromArgs(args)
val input = params.getRequired("input")


val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tableEnv = TableEnvironment.getTableEnvironment(env)

val dataStream = env.readTextFile(input)

val namedStream = dataStream.map((value:String) => {

val columns = value.split(",")
Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
columns(18),columns(19), columns(20)
)
})


val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")

val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
})



tableEnv.registerDataStream("CDRS", watermarkedStream)
val results = tableEnv.sqlQuery( """
|SELECT
| a
| FROM CDRS
""".stripMargin)


val result: Table = results

val path = "file:///Users/test/1.txt"
val sink :TableSink[Row]= new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.OVERWRITE)

result.writeToSink(sink)


env.execute("this job")

}
}



On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <[hidden email]> wrote:
Hi Karim,
when I was trying to reproduce your code, I got an exception with the
name 'table' being used - by replacing it and completing the job with
some input, I did see the csv file popping up. Also, the job was
crashing when the file 1.txt already existed.

The code I used (running Flink 1.5-SNAPSHOT):

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
      .assignAscendingTimestamps(_._2)
    tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)

    val results = tableEnv.sqlQuery( """
                                       |SELECT
                                       | A,C
                                       | FROM mytable
                                     """.stripMargin)

    val result: Table = results

    val path = "file:///tmp/test/1.txt"
    val sink :TableSink[Row]=   new CsvTableSink(
      path,                             // output path
      fieldDelim = "|",                 // optional: delimit files by '|'
      numFiles = 1,                     // optional: write to a single file
      writeMode = WriteMode.NO_OVERWRITE)

    result.writeToSink(sink)

    env.execute("this job")
  }

  def get3TupleDataStream(env: StreamExecutionEnvironment):
DataStream[(Int, Long, String)] = {
    val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))
    data.+=((5, 3L, "I am fine."))
    data.+=((6, 3L, "Luke Skywalker"))
    env.fromCollection(data)
  }


Nico

On 16/03/18 22:50, karim amer wrote:
> Hi There,
>
>  I am trying to write a CSVsink to disk but it's not getting written. I
> think the file is getting overwritten or truncated once The Stream
> process finishes. Does anyone know why the file is getting overwritten
> or truncated and how can i fix this ?
>
>
> tableEnv.registerDataStream("table", watermarkedStream)
>
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A
> | FROM table
> """.stripMargin)
>
>
>
> val result: Table = results
>
> val path = "file:///path/test/1.txt"
> val sink :TableSink[Row]=   new CsvTableSink(
>   path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
>
> result.writeToSink(sink)
>
> env.execute("this job")
>
>
>
>
> Thanks




--
karim amer 





--
karim amer 




--
karim amer 

Reply | Threaded
Open this post in threaded view
|

Re: CsvSink

Karim Amer
To clarify should i file a bug report on sbt hiding the errors in the previous email ?

On Tue, Mar 20, 2018 at 9:44 AM, karim amer <[hidden email]> wrote:
After switching to Maven from Sbt I got these errors
Error:(63, 37) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls]
    val namedStream = dataStream.map((value:String) => {



Error:(63, 37) not enough arguments for method map: (implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls])org.apache.flink.streaming.api.scala.DataStream[org.apache.flink.quickstart.DataStreamtotableapi.Calls].
Unspecified value parameter evidence$7.
    val namedStream = dataStream.map((value:String) => {


Should i file a bug report  ?

On Tue, Mar 20, 2018 at 9:30 AM, karim amer <[hidden email]> wrote:
Hi Fabian
Sorry if i confused you The first error is from Nico's code Not my code or  snippet
I am still having the original problem in my snippet where it's writing a blank csv file even though i get
[success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
After running the job

Cheers,
karim

On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <[hidden email]> wrote:
Hi Karim,

I cannot find a method invocation "tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )" as shown in the error message in your example.
It would help if you would keep error message and code consistent. Otherwise it's not possible to figure out what's going on.

Best, Fabian

2018-03-20 0:24 GMT+01:00 karim amer <[hidden email]>:
Hi Nico,

I tried to reproduce your code but registerDataStream keeps failing to register the fields even though i am following your code and the Docs.
here is the error
[error]  found   : Symbol
[error]  required: org.apache.flink.table.expressions.Expression
[error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
[error]  
I think my code snippet was misleading. Here is the full snippet Changing the name from table didn't fix it for

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.types.Row



object datastreamtotableapi {

case class Calls(a: String,
b: String,
c: String,
d: String,
e: String,
f: String,
g: String,
h: String,
i: String,
j: String,
k: String,
l: String,
m: String,
n: String,
p: String,
q: String,
r: String,
s: String,
t: String,
v: String,
w: String)


def main(args: Array[String]) {

val params = ParameterTool.fromArgs(args)
val input = params.getRequired("input")


val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tableEnv = TableEnvironment.getTableEnvironment(env)

val dataStream = env.readTextFile(input)

val namedStream = dataStream.map((value:String) => {

val columns = value.split(",")
Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
columns(18),columns(19), columns(20)
)
})


val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")

val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
})



tableEnv.registerDataStream("CDRS", watermarkedStream)
val results = tableEnv.sqlQuery( """
|SELECT
| a
| FROM CDRS
""".stripMargin)


val result: Table = results

val path = "file:///Users/test/1.txt"
val sink :TableSink[Row]= new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.OVERWRITE)

result.writeToSink(sink)


env.execute("this job")

}
}



On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <[hidden email]> wrote:
Hi Karim,
when I was trying to reproduce your code, I got an exception with the
name 'table' being used - by replacing it and completing the job with
some input, I did see the csv file popping up. Also, the job was
crashing when the file 1.txt already existed.

The code I used (running Flink 1.5-SNAPSHOT):

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
      .assignAscendingTimestamps(_._2)
    tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)

    val results = tableEnv.sqlQuery( """
                                       |SELECT
                                       | A,C
                                       | FROM mytable
                                     """.stripMargin)

    val result: Table = results

    val path = "file:///tmp/test/1.txt"
    val sink :TableSink[Row]=   new CsvTableSink(
      path,                             // output path
      fieldDelim = "|",                 // optional: delimit files by '|'
      numFiles = 1,                     // optional: write to a single file
      writeMode = WriteMode.NO_OVERWRITE)

    result.writeToSink(sink)

    env.execute("this job")
  }

  def get3TupleDataStream(env: StreamExecutionEnvironment):
DataStream[(Int, Long, String)] = {
    val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))
    data.+=((5, 3L, "I am fine."))
    data.+=((6, 3L, "Luke Skywalker"))
    env.fromCollection(data)
  }


Nico

On 16/03/18 22:50, karim amer wrote:
> Hi There,
>
>  I am trying to write a CSVsink to disk but it's not getting written. I
> think the file is getting overwritten or truncated once The Stream
> process finishes. Does anyone know why the file is getting overwritten
> or truncated and how can i fix this ?
>
>
> tableEnv.registerDataStream("table", watermarkedStream)
>
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A
> | FROM table
> """.stripMargin)
>
>
>
> val result: Table = results
>
> val path = "file:///path/test/1.txt"
> val sink :TableSink[Row]=   new CsvTableSink(
>   path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
>
> result.writeToSink(sink)
>
> env.execute("this job")
>
>
>
>
> Thanks




--
karim amer 





--
karim amer 




--
karim amer 




--
karim amer 

Reply | Threaded
Open this post in threaded view
|

Re: CsvSink

Karim Amer
Never mind after importing
import org.apache.flink.api.scala._
theses errors went away and i still have the original problem.
Sorry my bad

On Tue, Mar 20, 2018 at 11:04 AM, karim amer <[hidden email]> wrote:
To clarify should i file a bug report on sbt hiding the errors in the previous email ?

On Tue, Mar 20, 2018 at 9:44 AM, karim amer <[hidden email]> wrote:
After switching to Maven from Sbt I got these errors
Error:(63, 37) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls]
    val namedStream = dataStream.map((value:String) => {



Error:(63, 37) not enough arguments for method map: (implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls])org.apache.flink.streaming.api.scala.DataStream[org.apache.flink.quickstart.DataStreamtotableapi.Calls].
Unspecified value parameter evidence$7.
    val namedStream = dataStream.map((value:String) => {


Should i file a bug report  ?

On Tue, Mar 20, 2018 at 9:30 AM, karim amer <[hidden email]> wrote:
Hi Fabian
Sorry if i confused you The first error is from Nico's code Not my code or  snippet
I am still having the original problem in my snippet where it's writing a blank csv file even though i get
[success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
After running the job

Cheers,
karim

On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <[hidden email]> wrote:
Hi Karim,

I cannot find a method invocation "tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )" as shown in the error message in your example.
It would help if you would keep error message and code consistent. Otherwise it's not possible to figure out what's going on.

Best, Fabian

2018-03-20 0:24 GMT+01:00 karim amer <[hidden email]>:
Hi Nico,

I tried to reproduce your code but registerDataStream keeps failing to register the fields even though i am following your code and the Docs.
here is the error
[error]  found   : Symbol
[error]  required: org.apache.flink.table.expressions.Expression
[error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
[error]  
I think my code snippet was misleading. Here is the full snippet Changing the name from table didn't fix it for

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.types.Row



object datastreamtotableapi {

case class Calls(a: String,
b: String,
c: String,
d: String,
e: String,
f: String,
g: String,
h: String,
i: String,
j: String,
k: String,
l: String,
m: String,
n: String,
p: String,
q: String,
r: String,
s: String,
t: String,
v: String,
w: String)


def main(args: Array[String]) {

val params = ParameterTool.fromArgs(args)
val input = params.getRequired("input")


val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tableEnv = TableEnvironment.getTableEnvironment(env)

val dataStream = env.readTextFile(input)

val namedStream = dataStream.map((value:String) => {

val columns = value.split(",")
Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
columns(18),columns(19), columns(20)
)
})


val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")

val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
})



tableEnv.registerDataStream("CDRS", watermarkedStream)
val results = tableEnv.sqlQuery( """
|SELECT
| a
| FROM CDRS
""".stripMargin)


val result: Table = results

val path = "file:///Users/test/1.txt"
val sink :TableSink[Row]= new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.OVERWRITE)

result.writeToSink(sink)


env.execute("this job")

}
}



On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <[hidden email]> wrote:
Hi Karim,
when I was trying to reproduce your code, I got an exception with the
name 'table' being used - by replacing it and completing the job with
some input, I did see the csv file popping up. Also, the job was
crashing when the file 1.txt already existed.

The code I used (running Flink 1.5-SNAPSHOT):

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
      .assignAscendingTimestamps(_._2)
    tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)

    val results = tableEnv.sqlQuery( """
                                       |SELECT
                                       | A,C
                                       | FROM mytable
                                     """.stripMargin)

    val result: Table = results

    val path = "file:///tmp/test/1.txt"
    val sink :TableSink[Row]=   new CsvTableSink(
      path,                             // output path
      fieldDelim = "|",                 // optional: delimit files by '|'
      numFiles = 1,                     // optional: write to a single file
      writeMode = WriteMode.NO_OVERWRITE)

    result.writeToSink(sink)

    env.execute("this job")
  }

  def get3TupleDataStream(env: StreamExecutionEnvironment):
DataStream[(Int, Long, String)] = {
    val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))
    data.+=((5, 3L, "I am fine."))
    data.+=((6, 3L, "Luke Skywalker"))
    env.fromCollection(data)
  }


Nico

On 16/03/18 22:50, karim amer wrote:
> Hi There,
>
>  I am trying to write a CSVsink to disk but it's not getting written. I
> think the file is getting overwritten or truncated once The Stream
> process finishes. Does anyone know why the file is getting overwritten
> or truncated and how can i fix this ?
>
>
> tableEnv.registerDataStream("table", watermarkedStream)
>
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A
> | FROM table
> """.stripMargin)
>
>
>
> val result: Table = results
>
> val path = "file:///path/test/1.txt"
> val sink :TableSink[Row]=   new CsvTableSink(
>   path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
>
> result.writeToSink(sink)
>
> env.execute("this job")
>
>
>
>
> Thanks




--
karim amer 





--
karim amer 




--
karim amer 




--
karim amer 




--
karim amer 

Reply | Threaded
Open this post in threaded view
|

Re: CsvSink

Karim Amer
here is the output after fixing the scala issues

https://gist.github.com/karimamer/9e3bcf0a6d9110c01caa2ebd14aa7a8c

On Tue, Mar 20, 2018 at 11:39 AM, karim amer <[hidden email]> wrote:
Never mind after importing
import org.apache.flink.api.scala._
theses errors went away and i still have the original problem.
Sorry my bad

On Tue, Mar 20, 2018 at 11:04 AM, karim amer <[hidden email]> wrote:
To clarify should i file a bug report on sbt hiding the errors in the previous email ?

On Tue, Mar 20, 2018 at 9:44 AM, karim amer <[hidden email]> wrote:
After switching to Maven from Sbt I got these errors
Error:(63, 37) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls]
    val namedStream = dataStream.map((value:String) => {



Error:(63, 37) not enough arguments for method map: (implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls])org.apache.flink.streaming.api.scala.DataStream[org.apache.flink.quickstart.DataStreamtotableapi.Calls].
Unspecified value parameter evidence$7.
    val namedStream = dataStream.map((value:String) => {


Should i file a bug report  ?

On Tue, Mar 20, 2018 at 9:30 AM, karim amer <[hidden email]> wrote:
Hi Fabian
Sorry if i confused you The first error is from Nico's code Not my code or  snippet
I am still having the original problem in my snippet where it's writing a blank csv file even though i get
[success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
After running the job

Cheers,
karim

On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <[hidden email]> wrote:
Hi Karim,

I cannot find a method invocation "tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )" as shown in the error message in your example.
It would help if you would keep error message and code consistent. Otherwise it's not possible to figure out what's going on.

Best, Fabian

2018-03-20 0:24 GMT+01:00 karim amer <[hidden email]>:
Hi Nico,

I tried to reproduce your code but registerDataStream keeps failing to register the fields even though i am following your code and the Docs.
here is the error
[error]  found   : Symbol
[error]  required: org.apache.flink.table.expressions.Expression
[error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
[error]  
I think my code snippet was misleading. Here is the full snippet Changing the name from table didn't fix it for

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.types.Row



object datastreamtotableapi {

case class Calls(a: String,
b: String,
c: String,
d: String,
e: String,
f: String,
g: String,
h: String,
i: String,
j: String,
k: String,
l: String,
m: String,
n: String,
p: String,
q: String,
r: String,
s: String,
t: String,
v: String,
w: String)


def main(args: Array[String]) {

val params = ParameterTool.fromArgs(args)
val input = params.getRequired("input")


val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tableEnv = TableEnvironment.getTableEnvironment(env)

val dataStream = env.readTextFile(input)

val namedStream = dataStream.map((value:String) => {

val columns = value.split(",")
Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
columns(18),columns(19), columns(20)
)
})


val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")

val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
})



tableEnv.registerDataStream("CDRS", watermarkedStream)
val results = tableEnv.sqlQuery( """
|SELECT
| a
| FROM CDRS
""".stripMargin)


val result: Table = results

val path = "file:///Users/test/1.txt"
val sink :TableSink[Row]= new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.OVERWRITE)

result.writeToSink(sink)


env.execute("this job")

}
}



On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <[hidden email]> wrote:
Hi Karim,
when I was trying to reproduce your code, I got an exception with the
name 'table' being used - by replacing it and completing the job with
some input, I did see the csv file popping up. Also, the job was
crashing when the file 1.txt already existed.

The code I used (running Flink 1.5-SNAPSHOT):

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
      .assignAscendingTimestamps(_._2)
    tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)

    val results = tableEnv.sqlQuery( """
                                       |SELECT
                                       | A,C
                                       | FROM mytable
                                     """.stripMargin)

    val result: Table = results

    val path = "file:///tmp/test/1.txt"
    val sink :TableSink[Row]=   new CsvTableSink(
      path,                             // output path
      fieldDelim = "|",                 // optional: delimit files by '|'
      numFiles = 1,                     // optional: write to a single file
      writeMode = WriteMode.NO_OVERWRITE)

    result.writeToSink(sink)

    env.execute("this job")
  }

  def get3TupleDataStream(env: StreamExecutionEnvironment):
DataStream[(Int, Long, String)] = {
    val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))
    data.+=((5, 3L, "I am fine."))
    data.+=((6, 3L, "Luke Skywalker"))
    env.fromCollection(data)
  }


Nico

On 16/03/18 22:50, karim amer wrote:
> Hi There,
>
>  I am trying to write a CSVsink to disk but it's not getting written. I
> think the file is getting overwritten or truncated once The Stream
> process finishes. Does anyone know why the file is getting overwritten
> or truncated and how can i fix this ?
>
>
> tableEnv.registerDataStream("table", watermarkedStream)
>
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A
> | FROM table
> """.stripMargin)
>
>
>
> val result: Table = results
>
> val path = "file:///path/test/1.txt"
> val sink :TableSink[Row]=   new CsvTableSink(
>   path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
>
> result.writeToSink(sink)
>
> env.execute("this job")
>
>
>
>
> Thanks




--
karim amer 





--
karim amer 




--
karim amer 




--
karim amer 




--
karim amer 




--
karim amer 

Reply | Threaded
Open this post in threaded view
|

Re: CsvSink

Karim Amer
Never mind I found the error and has nothing to do with flink.
Sorry

On Tue, Mar 20, 2018 at 12:12 PM, karim amer <[hidden email]> wrote:
here is the output after fixing the scala issues

https://gist.github.com/karimamer/9e3bcf0a6d9110c01caa2ebd14aa7a8c

On Tue, Mar 20, 2018 at 11:39 AM, karim amer <[hidden email]> wrote:
Never mind after importing
import org.apache.flink.api.scala._
theses errors went away and i still have the original problem.
Sorry my bad

On Tue, Mar 20, 2018 at 11:04 AM, karim amer <[hidden email]> wrote:
To clarify should i file a bug report on sbt hiding the errors in the previous email ?

On Tue, Mar 20, 2018 at 9:44 AM, karim amer <[hidden email]> wrote:
After switching to Maven from Sbt I got these errors
Error:(63, 37) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls]
    val namedStream = dataStream.map((value:String) => {



Error:(63, 37) not enough arguments for method map: (implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls])org.apache.flink.streaming.api.scala.DataStream[org.apache.flink.quickstart.DataStreamtotableapi.Calls].
Unspecified value parameter evidence$7.
    val namedStream = dataStream.map((value:String) => {


Should i file a bug report  ?

On Tue, Mar 20, 2018 at 9:30 AM, karim amer <[hidden email]> wrote:
Hi Fabian
Sorry if i confused you The first error is from Nico's code Not my code or  snippet
I am still having the original problem in my snippet where it's writing a blank csv file even though i get
[success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
After running the job

Cheers,
karim

On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <[hidden email]> wrote:
Hi Karim,

I cannot find a method invocation "tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )" as shown in the error message in your example.
It would help if you would keep error message and code consistent. Otherwise it's not possible to figure out what's going on.

Best, Fabian

2018-03-20 0:24 GMT+01:00 karim amer <[hidden email]>:
Hi Nico,

I tried to reproduce your code but registerDataStream keeps failing to register the fields even though i am following your code and the Docs.
here is the error
[error]  found   : Symbol
[error]  required: org.apache.flink.table.expressions.Expression
[error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
[error]  
I think my code snippet was misleading. Here is the full snippet Changing the name from table didn't fix it for

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.types.Row



object datastreamtotableapi {

case class Calls(a: String,
b: String,
c: String,
d: String,
e: String,
f: String,
g: String,
h: String,
i: String,
j: String,
k: String,
l: String,
m: String,
n: String,
p: String,
q: String,
r: String,
s: String,
t: String,
v: String,
w: String)


def main(args: Array[String]) {

val params = ParameterTool.fromArgs(args)
val input = params.getRequired("input")


val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tableEnv = TableEnvironment.getTableEnvironment(env)

val dataStream = env.readTextFile(input)

val namedStream = dataStream.map((value:String) => {

val columns = value.split(",")
Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
columns(18),columns(19), columns(20)
)
})


val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")

val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
})



tableEnv.registerDataStream("CDRS", watermarkedStream)
val results = tableEnv.sqlQuery( """
|SELECT
| a
| FROM CDRS
""".stripMargin)


val result: Table = results

val path = "file:///Users/test/1.txt"
val sink :TableSink[Row]= new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.OVERWRITE)

result.writeToSink(sink)


env.execute("this job")

}
}



On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <[hidden email]> wrote:
Hi Karim,
when I was trying to reproduce your code, I got an exception with the
name 'table' being used - by replacing it and completing the job with
some input, I did see the csv file popping up. Also, the job was
crashing when the file 1.txt already existed.

The code I used (running Flink 1.5-SNAPSHOT):

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
      .assignAscendingTimestamps(_._2)
    tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)

    val results = tableEnv.sqlQuery( """
                                       |SELECT
                                       | A,C
                                       | FROM mytable
                                     """.stripMargin)

    val result: Table = results

    val path = "file:///tmp/test/1.txt"
    val sink :TableSink[Row]=   new CsvTableSink(
      path,                             // output path
      fieldDelim = "|",                 // optional: delimit files by '|'
      numFiles = 1,                     // optional: write to a single file
      writeMode = WriteMode.NO_OVERWRITE)

    result.writeToSink(sink)

    env.execute("this job")
  }

  def get3TupleDataStream(env: StreamExecutionEnvironment):
DataStream[(Int, Long, String)] = {
    val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))
    data.+=((5, 3L, "I am fine."))
    data.+=((6, 3L, "Luke Skywalker"))
    env.fromCollection(data)
  }


Nico

On 16/03/18 22:50, karim amer wrote:
> Hi There,
>
>  I am trying to write a CSVsink to disk but it's not getting written. I
> think the file is getting overwritten or truncated once The Stream
> process finishes. Does anyone know why the file is getting overwritten
> or truncated and how can i fix this ?
>
>
> tableEnv.registerDataStream("table", watermarkedStream)
>
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A
> | FROM table
> """.stripMargin)
>
>
>
> val result: Table = results
>
> val path = "file:///path/test/1.txt"
> val sink :TableSink[Row]=   new CsvTableSink(
>   path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
>
> result.writeToSink(sink)
>
> env.execute("this job")
>
>
>
>
> Thanks




--
karim amer 





--
karim amer 




--
karim amer 




--
karim amer 




--
karim amer 




--
karim amer 




--
karim amer 

Reply | Threaded
Open this post in threaded view
|

Re: CsvSink

Fabian Hueske-2
Great, thanks for reporting back!

Best, Fabian

2018-03-20 22:40 GMT+01:00 karim amer <[hidden email]>:
Never mind I found the error and has nothing to do with flink.
Sorry

On Tue, Mar 20, 2018 at 12:12 PM, karim amer <[hidden email]> wrote:
here is the output after fixing the scala issues

https://gist.github.com/karimamer/9e3bcf0a6d9110c01caa2ebd14aa7a8c

On Tue, Mar 20, 2018 at 11:39 AM, karim amer <[hidden email]> wrote:
Never mind after importing
import org.apache.flink.api.scala._
theses errors went away and i still have the original problem.
Sorry my bad

On Tue, Mar 20, 2018 at 11:04 AM, karim amer <[hidden email]> wrote:
To clarify should i file a bug report on sbt hiding the errors in the previous email ?

On Tue, Mar 20, 2018 at 9:44 AM, karim amer <[hidden email]> wrote:
After switching to Maven from Sbt I got these errors
Error:(63, 37) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls]
    val namedStream = dataStream.map((value:String) => {



Error:(63, 37) not enough arguments for method map: (implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls])org.apache.flink.streaming.api.scala.DataStream[org.apache.flink.quickstart.DataStreamtotableapi.Calls].
Unspecified value parameter evidence$7.
    val namedStream = dataStream.map((value:String) => {


Should i file a bug report  ?

On Tue, Mar 20, 2018 at 9:30 AM, karim amer <[hidden email]> wrote:
Hi Fabian
Sorry if i confused you The first error is from Nico's code Not my code or  snippet
I am still having the original problem in my snippet where it's writing a blank csv file even though i get
[success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
After running the job

Cheers,
karim

On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske <[hidden email]> wrote:
Hi Karim,

I cannot find a method invocation "tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )" as shown in the error message in your example.
It would help if you would keep error message and code consistent. Otherwise it's not possible to figure out what's going on.

Best, Fabian

2018-03-20 0:24 GMT+01:00 karim amer <[hidden email]>:
Hi Nico,

I tried to reproduce your code but registerDataStream keeps failing to register the fields even though i am following your code and the Docs.
here is the error
[error]  found   : Symbol
[error]  required: org.apache.flink.table.expressions.Expression
[error]     tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
[error]  
I think my code snippet was misleading. Here is the full snippet Changing the name from table didn't fix it for

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.types.Row



object datastreamtotableapi {

case class Calls(a: String,
b: String,
c: String,
d: String,
e: String,
f: String,
g: String,
h: String,
i: String,
j: String,
k: String,
l: String,
m: String,
n: String,
p: String,
q: String,
r: String,
s: String,
t: String,
v: String,
w: String)


def main(args: Array[String]) {

val params = ParameterTool.fromArgs(args)
val input = params.getRequired("input")


val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tableEnv = TableEnvironment.getTableEnvironment(env)

val dataStream = env.readTextFile(input)

val namedStream = dataStream.map((value:String) => {

val columns = value.split(",")
Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
columns(18),columns(19), columns(20)
)
})


val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")

val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
})



tableEnv.registerDataStream("CDRS", watermarkedStream)
val results = tableEnv.sqlQuery( """
|SELECT
| a
| FROM CDRS
""".stripMargin)


val result: Table = results

val path = "file:///Users/test/1.txt"
val sink :TableSink[Row]= new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.OVERWRITE)

result.writeToSink(sink)


env.execute("this job")

}
}



On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <[hidden email]> wrote:
Hi Karim,
when I was trying to reproduce your code, I got an exception with the
name 'table' being used - by replacing it and completing the job with
some input, I did see the csv file popping up. Also, the job was
crashing when the file 1.txt already existed.

The code I used (running Flink 1.5-SNAPSHOT):

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
      .assignAscendingTimestamps(_._2)
    tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)

    val results = tableEnv.sqlQuery( """
                                       |SELECT
                                       | A,C
                                       | FROM mytable
                                     """.stripMargin)

    val result: Table = results

    val path = "file:///tmp/test/1.txt"
    val sink :TableSink[Row]=   new CsvTableSink(
      path,                             // output path
      fieldDelim = "|",                 // optional: delimit files by '|'
      numFiles = 1,                     // optional: write to a single file
      writeMode = WriteMode.NO_OVERWRITE)

    result.writeToSink(sink)

    env.execute("this job")
  }

  def get3TupleDataStream(env: StreamExecutionEnvironment):
DataStream[(Int, Long, String)] = {
    val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))
    data.+=((5, 3L, "I am fine."))
    data.+=((6, 3L, "Luke Skywalker"))
    env.fromCollection(data)
  }


Nico

On 16/03/18 22:50, karim amer wrote:
> Hi There,
>
>  I am trying to write a CSVsink to disk but it's not getting written. I
> think the file is getting overwritten or truncated once The Stream
> process finishes. Does anyone know why the file is getting overwritten
> or truncated and how can i fix this ?
>
>
> tableEnv.registerDataStream("table", watermarkedStream)
>
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A
> | FROM table
> """.stripMargin)
>
>
>
> val result: Table = results
>
> val path = "file:///path/test/1.txt"
> val sink :TableSink[Row]=   new CsvTableSink(
>   path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
>
> result.writeToSink(sink)
>
> env.execute("this job")
>
>
>
>
> Thanks




--
karim amer 





--
karim amer 




--
karim amer 




--
karim amer 




--
karim amer 




--
karim amer 




--
karim amer