Hi, when converting a DataStream (with Watermarks) to a table like described here I wonder on how to use the rowtime in a following window operation _without_ explicitly specifying all field names and hence rely on case class type inference. Currently when operating on a stream of events case class Event(field1: String, ts: long) val ds: DataStream[Event] = ... I have to do tableEnv.fromDataStream(ds, 'field1, 'ts, 'myRowtime.rowtime) to do .window(Tumble over 1.hours on 'myRowtime as 'w) afterwards. Is there a way to create the TimeAttribute column without specifiying all fields again? Thanks for yout help, Johannes |
Hi Johannes,
I am afraid that this is currently not possible and indeed you have to pass all fields again, but Timo cced might want to correct me if I am wrong. Best, Dawid On 04/10/18 15:08, Johannes Schulte wrote: > Hi, > > when converting a DataStream (with Watermarks) to a table like > described here > > https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html#event-time > > I wonder on how to use the rowtime in a following window operation > _without_ explicitly specifying all field names and hence rely on case > class type inference. > > Currently when operating on a stream of events > > case class Event(field1: String, ts: long) > > val ds: DataStream[Event] = ... > > I have to do > > tableEnv.fromDataStream(ds, 'field1, 'ts, 'myRowtime.rowtime) > > to do > > .window(Tumble over 1.hours on 'myRowtime as 'w) > > afterwards. Is there a way to create the TimeAttribute column without > specifiying all fields again? > > Thanks for yout help, > > Johannes signature.asc (849 bytes) Download Attachment |
Hi Johannes,
this is not supported so far. You could write a little helper method like the following: val s: Seq[Expression] = Types.of[WC].asInstanceOf[CaseClassTypeInfo[WC]].fieldNames.map(Symbol(_).toExpr) val s2: Seq[Expression] = s :+ 'rowtime.rowtime tEnv.fromDataSet(input, s2: _*) Not a very nice solution, but it should work. Regards, Timo Am 04.10.18 um 15:40 schrieb Dawid Wysakowicz: Hi Johannes, I am afraid that this is currently not possible and indeed you have to pass all fields again, but Timo cced might want to correct me if I am wrong. Best, Dawid On 04/10/18 15:08, Johannes Schulte wrote:Hi, when converting a DataStream (with Watermarks) to a table like described here https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html#event-time I wonder on how to use the rowtime in a following window operation _without_ explicitly specifying all field names and hence rely on case class type inference. Currently when operating on a stream of events case class Event(field1: String, ts: long) val ds: DataStream[Event] = ... I have to do tableEnv.fromDataStream(ds, 'field1, 'ts, 'myRowtime.rowtime) to do .window(Tumble over 1.hours on 'myRowtime as 'w) afterwards. Is there a way to create the TimeAttribute column without specifiying all fields again? Thanks for yout help, Johannes
|
Thanks for the answer Dawid and the helper function, Timo it's not too bad for my use case (small number of fields), I just wanted to make sure I am not missing something here. Cheers, Johannes On Thu, Oct 4, 2018 at 5:07 PM Timo Walther <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |