Hello,
the stream table API is currently under heavy development. So far, we support selection, filtering, and union operations. For these operations we use the stream SQL syntax of Apache Calcite [1]. This is as simple as adding the "STREAM" keyword.
Registering a datastream table and running a stream SQL query works the same way as for Datasets.
Here's a filtering example in Scala:
--------------
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val dataStream = env.addSource(...)
val t = dataStream.toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", t)
val sqlQuery = "SELECT STREAM * FROM MyTable WHERE a = 3"
val result = tEnv.sql(sqlQuery).toDataStream[Row]
---------------
You can find more details on our plans to support windows and aggregations in the design document [2]. Feedback and ideas are very welcome!
Cheers,
-Vasia.