Rowtime for Table from DataStream without explixit fieldnames

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Rowtime for Table from DataStream without explixit fieldnames

Johannes Schulte
Hi,

when converting a DataStream (with Watermarks) to a table like described here


I wonder on how to use the rowtime in a following window operation _without_ explicitly specifying all field names and hence rely on case class type inference.

Currently when operating on a stream of events 

case class Event(field1: String, ts: long)

val ds: DataStream[Event] = ...

I have to do 

tableEnv.fromDataStream(ds, 'field1, 'ts, 'myRowtime.rowtime) 

to do

.window(Tumble over 1.hours on 'myRowtime  as 'w)

afterwards. Is there a way to create the TimeAttribute column without specifiying all fields again?

Thanks for yout help,

Johannes
Reply | Threaded
Open this post in threaded view
|

Re: Rowtime for Table from DataStream without explixit fieldnames

Dawid Wysakowicz-2
Hi Johannes,

I am afraid that this is currently not possible and indeed you have to
pass all fields again, but Timo cced might want to correct me if I am wrong.

Best,

Dawid


On 04/10/18 15:08, Johannes Schulte wrote:

> Hi,
>
> when converting a DataStream (with Watermarks) to a table like
> described here
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html#event-time
>
> I wonder on how to use the rowtime in a following window operation
> _without_ explicitly specifying all field names and hence rely on case
> class type inference.
>
> Currently when operating on a stream of events 
>
> case class Event(field1: String, ts: long)
>
> val ds: DataStream[Event] = ...
>
> I have to do 
>
> tableEnv.fromDataStream(ds, 'field1, 'ts, 'myRowtime.rowtime) 
>
> to do
>
> .window(Tumble over 1.hours on 'myRowtime  as 'w)
>
> afterwards. Is there a way to create the TimeAttribute column without
> specifiying all fields again?
>
> Thanks for yout help,
>
> Johannes


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Rowtime for Table from DataStream without explixit fieldnames

Timo Walther
Hi Johannes,

this is not supported so far. You could write a little helper method like the following:

val s: Seq[Expression] = Types.of[WC].asInstanceOf[CaseClassTypeInfo[WC]].fieldNames.map(Symbol(_).toExpr)

val s2: Seq[Expression] = s :+ 'rowtime.rowtime

tEnv.fromDataSet(input, s2: _*)

Not a very nice solution, but it should work.

Regards,
Timo

Am 04.10.18 um 15:40 schrieb Dawid Wysakowicz:
Hi Johannes,

I am afraid that this is currently not possible and indeed you have to
pass all fields again, but Timo cced might want to correct me if I am wrong.

Best,

Dawid


On 04/10/18 15:08, Johannes Schulte wrote:
Hi,

when converting a DataStream (with Watermarks) to a table like
described here

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html#event-time

I wonder on how to use the rowtime in a following window operation
_without_ explicitly specifying all field names and hence rely on case
class type inference.

Currently when operating on a stream of events 

case class Event(field1: String, ts: long)

val ds: DataStream[Event] = ...

I have to do 

tableEnv.fromDataStream(ds, 'field1, 'ts, 'myRowtime.rowtime) 

to do

.window(Tumble over 1.hours on 'myRowtime  as 'w)

afterwards. Is there a way to create the TimeAttribute column without
specifiying all fields again?

Thanks for yout help,

Johannes


Reply | Threaded
Open this post in threaded view
|

Re: Rowtime for Table from DataStream without explixit fieldnames

Johannes Schulte
Thanks for the answer Dawid and the helper function, Timo

it's not too bad for my use case (small number of fields), I just wanted to make sure I am not missing something here.

Cheers,

Johannes

On Thu, Oct 4, 2018 at 5:07 PM Timo Walther <[hidden email]> wrote:
Hi Johannes,

this is not supported so far. You could write a little helper method like the following:

val s: Seq[Expression] = Types.of[WC].asInstanceOf[CaseClassTypeInfo[WC]].fieldNames.map(Symbol(_).toExpr)

val s2: Seq[Expression] = s :+ 'rowtime.rowtime

tEnv.fromDataSet(input, s2: _*)

Not a very nice solution, but it should work.

Regards,
Timo

Am 04.10.18 um 15:40 schrieb Dawid Wysakowicz:
Hi Johannes,

I am afraid that this is currently not possible and indeed you have to
pass all fields again, but Timo cced might want to correct me if I am wrong.

Best,

Dawid


On 04/10/18 15:08, Johannes Schulte wrote:
Hi,

when converting a DataStream (with Watermarks) to a table like
described here

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html#event-time

I wonder on how to use the rowtime in a following window operation
_without_ explicitly specifying all field names and hence rely on case
class type inference.

Currently when operating on a stream of events 

case class Event(field1: String, ts: long)

val ds: DataStream[Event] = ...

I have to do 

tableEnv.fromDataStream(ds, 'field1, 'ts, 'myRowtime.rowtime) 

to do

.window(Tumble over 1.hours on 'myRowtime  as 'w)

afterwards. Is there a way to create the TimeAttribute column without
specifiying all fields again?

Thanks for yout help,

Johannes