http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/CsvSink-tp18974p19031.html
Hi Nico,
I tried to reproduce your code but
registerDataStream keeps failing to register the fields even though i am following your code and the Docs.
here is the error
[error] found : Symbol
[error] required: org.apache.flink.table.expressions.Expression
[error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
[error]
I think my code snippet was misleading. Here is the full snippet Changing the name from table didn't fix it for
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.types.Row
object datastreamtotableapi {
case class Calls(a: String,
b: String,
c: String,
d: String,
e: String,
f: String,
g: String,
h: String,
i: String,
j: String,
k: String,
l: String,
m: String,
n: String,
p: String,
q: String,
r: String,
s: String,
t: String,
v: String,
w: String)
def main(args: Array[String]) {
val params = ParameterTool.fromArgs(args)
val input = params.getRequired("input")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tableEnv = TableEnvironment.getTableEnvironment(env)
val dataStream = env.readTextFile(input)
val namedStream = dataStream.map((value:String) => {
val columns = value.split(",")
Calls(columns(0), columns(1),columns(2),columns(3), columns(4),columns(5),
columns(6), columns(7),columns(8),columns(9), columns(10), columns(11),
columns(12), columns(13),columns(14),columns(15), columns(16), columns(17),
columns(18),columns(19), columns(20)
)
})
val cleanedStream = namedStream.filter(_.j == " ").filter(_.k==" ")
val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
override def extractTimestamp(element: Calls): Long = (element.j.concat(element.k)).toLong
})
tableEnv.registerDataStream("CDRS", watermarkedStream)
val results = tableEnv.sqlQuery( """
|SELECT
| a
| FROM CDRS
""".stripMargin)
val result: Table = results
val path = "file:///Users/test/1.txt"
val sink :TableSink[Row]= new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.OVERWRITE)
result.writeToSink(sink)
env.execute("this job")
}
}