Best, FabianIt would help if you would keep error message and code consistent. Otherwise it's not possible to figure out what's going on.Hi Karim,I cannot find a method invocation "tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )" as shown in the error message in your example. 2018-03-20 0:24 GMT+01:00 karim amer <[hidden email]>:Hi Nico,I tried to reproduce your code but registerDataStream keeps failing to register the fields even though i am following your code and the Docs.here is the errorI think my code snippet was misleading. Here is the full snippet Changing the name from table didn't fix it for
[error] found : Symbol
[error] required: org.apache.flink.table.expressions.Expression
[error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
[error]import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.timestamps.BoundedO utOfOrdernessTimestampExtracto r
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironm ent
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.types.Row
object datastreamtotableapi {
case class Calls(a: String,
b: String,
c: String,
d: String,
e: String,
f: String,
g: String,
h: String,
i: String,
j: String,
k: String,
l: String,
m: String,
n: String,
p: String,
q: String,
r: String,
s: String,
t: String,
v: String,
w: String)
def main(args: Array[String]) {
val params = ParameterTool.fromArgs(args)
val input = params.getRequired("input")
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime )
env.setParallelism(1)
val tableEnv = TableEnvironment.getTableEnvironment (env)
val dataStream = env.readTextFile(input)
val namedStream = dataStream.map((value:String) => {
val columns = value.split(",")
Calls(columns(0), columns(1),columns(2),columns(3 ), columns(4),columns(5),
columns(6), columns(7),columns(8),columns(9 ), columns(10), columns(11),
columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
columns(18),columns(19), columns(20)
)
})
val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")
val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestamp Extractor[Calls](Time.seconds( 100 )) {
override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
})
tableEnv.registerDataStream("CDRS" , watermarkedStream)
val results = tableEnv.sqlQuery( """
|SELECT
| a
| FROM CDRS
""".stripMargin)
val result: Table = results
val path = "file:///Users/test/1.txt"
val sink :TableSink[Row]= new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.OVERWRITE)
result.writeToSink(sink)
env.execute("this job")
}
}--On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber <[hidden email]> wrote:Hi Karim,
when I was trying to reproduce your code, I got an exception with the
name 'table' being used - by replacing it and completing the job with
some input, I did see the csv file popping up. Also, the job was
crashing when the file 1.txt already existed.
The code I used (running Flink 1.5-SNAPSHOT):
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
.assignAscendingTimestamps(_._2)
tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)
val results = tableEnv.sqlQuery( """
|SELECT
| A,C
| FROM mytable
""".stripMargin)
val result: Table = results
val path = "file:///tmp/test/1.txt"
val sink :TableSink[Row]= new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.NO_OVERWRITE)
result.writeToSink(sink)
env.execute("this job")
}
def get3TupleDataStream(env: StreamExecutionEnvironment):
DataStream[(Int, Long, String)] = {
val data = new mutable.MutableList[(Int, Long, String)]
data.+=((1, 1L, "Hi"))
data.+=((2, 2L, "Hello"))
data.+=((3, 2L, "Hello world"))
data.+=((4, 3L, "Hello world, how are you?"))
data.+=((5, 3L, "I am fine."))
data.+=((6, 3L, "Luke Skywalker"))
env.fromCollection(data)
}
Nico
On 16/03/18 22:50, karim amer wrote:
> Hi There,
>
> I am trying to write a CSVsink to disk but it's not getting written. I
> think the file is getting overwritten or truncated once The Stream
> process finishes. Does anyone know why the file is getting overwritten
> or truncated and how can i fix this ?
>
>
> tableEnv.registerDataStream("table", watermarkedStream)
>
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A
> | FROM table
> """.stripMargin)
>
>
>
> val result: Table = results
>
> val path = "file:///path/test/1.txt"
> val sink :TableSink[Row]= new CsvTableSink(
> path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
>
> result.writeToSink(sink)
>
> env.execute("this job")
>
>
>
>
> Thanks
karim amer
Free forum by Nabble | Edit this page |