Table API and ProcessWindowFunction

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Table API and ProcessWindowFunction

Flavio Pompermaier
Hi to all,
from what I understood a ProcessWindowFunction can only be used in the Streaming API.
Is there any plan to port them also in the Table API (in the near future)?
I'd like to do with Table API the equivalent of:

final DataStream<MyPojoEvent> events = env.addSource(src);
events.filter(e -> e.getCode() != null)
    .keyBy(event -> Integer.valueOf(event.getCode()))
    .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
    .process(new ProcessWindowFunction<MyPojoEvent, MyPojoEvent, Integer, TimeWindow>()              {.....});

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Table API and ProcessWindowFunction

Hequn Cheng
Hi Flavio,

Nice to hear your ideas on Table API!

Could you be more specific about your requirements? A detailed scenario would be quite helpful. For example, do you want to emit multi records through the collector or do you want to use the timer?

BTW, Table API introduces flatAggregate recently(both non-window flatAggregate and window flatAggregate) and will be included in the near coming release-1.9. The flatAggregate can emit multi records for a single group. More details here[1][2].
Hope this can solve your problem.

Best, Hequn


On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
from what I understood a ProcessWindowFunction can only be used in the Streaming API.
Is there any plan to port them also in the Table API (in the near future)?
I'd like to do with Table API the equivalent of:

final DataStream<MyPojoEvent> events = env.addSource(src);
events.filter(e -> e.getCode() != null)
    .keyBy(event -> Integer.valueOf(event.getCode()))
    .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
    .process(new ProcessWindowFunction<MyPojoEvent, MyPojoEvent, Integer, TimeWindow>()              {.....});

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Table API and ProcessWindowFunction

Flavio Pompermaier
Hi Hequn, thanks for your answer.
What I'm trying to do is to read a stream of events that basically contains a UserId field and, every X minutes (i.e. using a Time Window) and for each different UserId key, query 3 different REST services to enrich my POJOs*.
For the moment what I do is to use a ProcessWindowFunction after the .keyBy().window() as shown in the  previous mail example to contact those 3 services and enrich my object.

However I don't like this solution because I'd like to use Flink to it's full potential so I'd like to enrich my object using LATERAL TABLEs or ASYNC IO..
The main problem I'm facing right now is that  I can't find a way to pass the thumbing window start/end to the LATERAL JOIN table functions (because this is a parameter of the REST query).
Moreover I don't know whether this use case is something that Table API aims to solve..

* Of course this could kill the REST endpoint if the number of users is very big ..because of this I'd like to keep the external state of source tables as an internal Flink state and then do a JOIN on the UserId. The problem here is that I need to "materialize" them using Debezium (or similar) via Kafka and dynamic tables..is there any example of keeping multiple tables synched with Flink state through Debezium (without the need of rewriting all the logic for managing UPDATE/INSERT/DELETE)?

On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng <[hidden email]> wrote:
Hi Flavio,

Nice to hear your ideas on Table API!

Could you be more specific about your requirements? A detailed scenario would be quite helpful. For example, do you want to emit multi records through the collector or do you want to use the timer?

BTW, Table API introduces flatAggregate recently(both non-window flatAggregate and window flatAggregate) and will be included in the near coming release-1.9. The flatAggregate can emit multi records for a single group. More details here[1][2].
Hope this can solve your problem.

Best, Hequn


On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
from what I understood a ProcessWindowFunction can only be used in the Streaming API.
Is there any plan to port them also in the Table API (in the near future)?
I'd like to do with Table API the equivalent of:

final DataStream<MyPojoEvent> events = env.addSource(src);
events.filter(e -> e.getCode() != null)
    .keyBy(event -> Integer.valueOf(event.getCode()))
    .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
    .process(new ProcessWindowFunction<MyPojoEvent, MyPojoEvent, Integer, TimeWindow>()              {.....});

Best,
Flavio


Reply | Threaded
Open this post in threaded view
|

Re: Table API and ProcessWindowFunction

Hequn Cheng
Hi Flavio,

Thanks for your information.

From your description, it seems that you only use the window to get the start and end time. There are no aggregations happen. If this is the case, you can get the start and end time by yourself(the `TimeWindow.getWindowStartWithOffset()` shows how to get window start according to the timestamp). To be more specific, if you use processing time, you can get your timestamp with System.currentTimeMillis(), and then use it to get the window start and end with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get the timestamp from the rowtime field.

With the start and end time, you can then perform LATERAL JOIN to enrich the information. You can add a cache in your table function to avoid frequent contacting with the REST endpoint.

Best, Hequn


On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier <[hidden email]> wrote:
Hi Hequn, thanks for your answer.
What I'm trying to do is to read a stream of events that basically contains a UserId field and, every X minutes (i.e. using a Time Window) and for each different UserId key, query 3 different REST services to enrich my POJOs*.
For the moment what I do is to use a ProcessWindowFunction after the .keyBy().window() as shown in the  previous mail example to contact those 3 services and enrich my object.

However I don't like this solution because I'd like to use Flink to it's full potential so I'd like to enrich my object using LATERAL TABLEs or ASYNC IO..
The main problem I'm facing right now is that  I can't find a way to pass the thumbing window start/end to the LATERAL JOIN table functions (because this is a parameter of the REST query).
Moreover I don't know whether this use case is something that Table API aims to solve..

* Of course this could kill the REST endpoint if the number of users is very big ..because of this I'd like to keep the external state of source tables as an internal Flink state and then do a JOIN on the UserId. The problem here is that I need to "materialize" them using Debezium (or similar) via Kafka and dynamic tables..is there any example of keeping multiple tables synched with Flink state through Debezium (without the need of rewriting all the logic for managing UPDATE/INSERT/DELETE)?

On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng <[hidden email]> wrote:
Hi Flavio,

Nice to hear your ideas on Table API!

Could you be more specific about your requirements? A detailed scenario would be quite helpful. For example, do you want to emit multi records through the collector or do you want to use the timer?

BTW, Table API introduces flatAggregate recently(both non-window flatAggregate and window flatAggregate) and will be included in the near coming release-1.9. The flatAggregate can emit multi records for a single group. More details here[1][2].
Hope this can solve your problem.

Best, Hequn


On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
from what I understood a ProcessWindowFunction can only be used in the Streaming API.
Is there any plan to port them also in the Table API (in the near future)?
I'd like to do with Table API the equivalent of:

final DataStream<MyPojoEvent> events = env.addSource(src);
events.filter(e -> e.getCode() != null)
    .keyBy(event -> Integer.valueOf(event.getCode()))
    .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
    .process(new ProcessWindowFunction<MyPojoEvent, MyPojoEvent, Integer, TimeWindow>()              {.....});

Best,
Flavio


Reply | Threaded
Open this post in threaded view
|

Re: Table API and ProcessWindowFunction

Flavio Pompermaier
The problem with the LATERAL JOIN (via a LookupableTableSource+TableFunction because I need to call that function using the userId a a parameter)  is that I cannot know the window start/end..to me it's not clear how to get that from TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)...
Can you provide a pseudo-code example of how to implement this?

On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng <[hidden email]> wrote:
Hi Flavio,

Thanks for your information.

From your description, it seems that you only use the window to get the start and end time. There are no aggregations happen. If this is the case, you can get the start and end time by yourself(the `TimeWindow.getWindowStartWithOffset()` shows how to get window start according to the timestamp). To be more specific, if you use processing time, you can get your timestamp with System.currentTimeMillis(), and then use it to get the window start and end with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get the timestamp from the rowtime field.

With the start and end time, you can then perform LATERAL JOIN to enrich the information. You can add a cache in your table function to avoid frequent contacting with the REST endpoint.

Best, Hequn


On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier <[hidden email]> wrote:
Hi Hequn, thanks for your answer.
What I'm trying to do is to read a stream of events that basically contains a UserId field and, every X minutes (i.e. using a Time Window) and for each different UserId key, query 3 different REST services to enrich my POJOs*.
For the moment what I do is to use a ProcessWindowFunction after the .keyBy().window() as shown in the  previous mail example to contact those 3 services and enrich my object.

However I don't like this solution because I'd like to use Flink to it's full potential so I'd like to enrich my object using LATERAL TABLEs or ASYNC IO..
The main problem I'm facing right now is that  I can't find a way to pass the thumbing window start/end to the LATERAL JOIN table functions (because this is a parameter of the REST query).
Moreover I don't know whether this use case is something that Table API aims to solve..

* Of course this could kill the REST endpoint if the number of users is very big ..because of this I'd like to keep the external state of source tables as an internal Flink state and then do a JOIN on the UserId. The problem here is that I need to "materialize" them using Debezium (or similar) via Kafka and dynamic tables..is there any example of keeping multiple tables synched with Flink state through Debezium (without the need of rewriting all the logic for managing UPDATE/INSERT/DELETE)?

On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng <[hidden email]> wrote:
Hi Flavio,

Nice to hear your ideas on Table API!

Could you be more specific about your requirements? A detailed scenario would be quite helpful. For example, do you want to emit multi records through the collector or do you want to use the timer?

BTW, Table API introduces flatAggregate recently(both non-window flatAggregate and window flatAggregate) and will be included in the near coming release-1.9. The flatAggregate can emit multi records for a single group. More details here[1][2].
Hope this can solve your problem.

Best, Hequn


On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
from what I understood a ProcessWindowFunction can only be used in the Streaming API.
Is there any plan to port them also in the Table API (in the near future)?
I'd like to do with Table API the equivalent of:

final DataStream<MyPojoEvent> events = env.addSource(src);
events.filter(e -> e.getCode() != null)
    .keyBy(event -> Integer.valueOf(event.getCode()))
    .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
    .process(new ProcessWindowFunction<MyPojoEvent, MyPojoEvent, Integer, TimeWindow>()              {.....});

Best,
Flavio




--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809
Reply | Threaded
Open this post in threaded view
|

Re: Table API and ProcessWindowFunction

Hequn Cheng
Hi,

> Can you provide a pseudo-code example of how to implement this?
Processing time
If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each record, you get the timestamp from System.currentTimeMillis(), say t, and w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end = w_start + 1000.

Event time
If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each record, get the timestamp from the corresponding timestamp field, say t, and get w_start and w_end same as above. 

More examples can be found in TimeWindowTest[1].

Best, Hequn
 


On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier <[hidden email]> wrote:
The problem with the LATERAL JOIN (via a LookupableTableSource+TableFunction because I need to call that function using the userId a a parameter)  is that I cannot know the window start/end..to me it's not clear how to get that from TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)...
Can you provide a pseudo-code example of how to implement this?

On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng <[hidden email]> wrote:
Hi Flavio,

Thanks for your information.

From your description, it seems that you only use the window to get the start and end time. There are no aggregations happen. If this is the case, you can get the start and end time by yourself(the `TimeWindow.getWindowStartWithOffset()` shows how to get window start according to the timestamp). To be more specific, if you use processing time, you can get your timestamp with System.currentTimeMillis(), and then use it to get the window start and end with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get the timestamp from the rowtime field.

With the start and end time, you can then perform LATERAL JOIN to enrich the information. You can add a cache in your table function to avoid frequent contacting with the REST endpoint.

Best, Hequn


On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier <[hidden email]> wrote:
Hi Hequn, thanks for your answer.
What I'm trying to do is to read a stream of events that basically contains a UserId field and, every X minutes (i.e. using a Time Window) and for each different UserId key, query 3 different REST services to enrich my POJOs*.
For the moment what I do is to use a ProcessWindowFunction after the .keyBy().window() as shown in the  previous mail example to contact those 3 services and enrich my object.

However I don't like this solution because I'd like to use Flink to it's full potential so I'd like to enrich my object using LATERAL TABLEs or ASYNC IO..
The main problem I'm facing right now is that  I can't find a way to pass the thumbing window start/end to the LATERAL JOIN table functions (because this is a parameter of the REST query).
Moreover I don't know whether this use case is something that Table API aims to solve..

* Of course this could kill the REST endpoint if the number of users is very big ..because of this I'd like to keep the external state of source tables as an internal Flink state and then do a JOIN on the UserId. The problem here is that I need to "materialize" them using Debezium (or similar) via Kafka and dynamic tables..is there any example of keeping multiple tables synched with Flink state through Debezium (without the need of rewriting all the logic for managing UPDATE/INSERT/DELETE)?

On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng <[hidden email]> wrote:
Hi Flavio,

Nice to hear your ideas on Table API!

Could you be more specific about your requirements? A detailed scenario would be quite helpful. For example, do you want to emit multi records through the collector or do you want to use the timer?

BTW, Table API introduces flatAggregate recently(both non-window flatAggregate and window flatAggregate) and will be included in the near coming release-1.9. The flatAggregate can emit multi records for a single group. More details here[1][2].
Hope this can solve your problem.

Best, Hequn


On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
from what I understood a ProcessWindowFunction can only be used in the Streaming API.
Is there any plan to port them also in the Table API (in the near future)?
I'd like to do with Table API the equivalent of:

final DataStream<MyPojoEvent> events = env.addSource(src);
events.filter(e -> e.getCode() != null)
    .keyBy(event -> Integer.valueOf(event.getCode()))
    .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
    .process(new ProcessWindowFunction<MyPojoEvent, MyPojoEvent, Integer, TimeWindow>()              {.....});

Best,
Flavio




--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809
Reply | Threaded
Open this post in threaded view
|

Re: Table API and ProcessWindowFunction

Flavio Pompermaier
Thanks Hequn, I'll give it a try!

Best, Flavio

On Thu, Jul 11, 2019 at 3:38 AM Hequn Cheng <[hidden email]> wrote:
Hi,

> Can you provide a pseudo-code example of how to implement this?
Processing time
If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each record, you get the timestamp from System.currentTimeMillis(), say t, and w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end = w_start + 1000.

Event time
If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each record, get the timestamp from the corresponding timestamp field, say t, and get w_start and w_end same as above. 

More examples can be found in TimeWindowTest[1].

Best, Hequn
 


On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier <[hidden email]> wrote:
The problem with the LATERAL JOIN (via a LookupableTableSource+TableFunction because I need to call that function using the userId a a parameter)  is that I cannot know the window start/end..to me it's not clear how to get that from TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)...
Can you provide a pseudo-code example of how to implement this?

On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng <[hidden email]> wrote:
Hi Flavio,

Thanks for your information.

From your description, it seems that you only use the window to get the start and end time. There are no aggregations happen. If this is the case, you can get the start and end time by yourself(the `TimeWindow.getWindowStartWithOffset()` shows how to get window start according to the timestamp). To be more specific, if you use processing time, you can get your timestamp with System.currentTimeMillis(), and then use it to get the window start and end with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get the timestamp from the rowtime field.

With the start and end time, you can then perform LATERAL JOIN to enrich the information. You can add a cache in your table function to avoid frequent contacting with the REST endpoint.

Best, Hequn


On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier <[hidden email]> wrote:
Hi Hequn, thanks for your answer.
What I'm trying to do is to read a stream of events that basically contains a UserId field and, every X minutes (i.e. using a Time Window) and for each different UserId key, query 3 different REST services to enrich my POJOs*.
For the moment what I do is to use a ProcessWindowFunction after the .keyBy().window() as shown in the  previous mail example to contact those 3 services and enrich my object.

However I don't like this solution because I'd like to use Flink to it's full potential so I'd like to enrich my object using LATERAL TABLEs or ASYNC IO..
The main problem I'm facing right now is that  I can't find a way to pass the thumbing window start/end to the LATERAL JOIN table functions (because this is a parameter of the REST query).
Moreover I don't know whether this use case is something that Table API aims to solve..

* Of course this could kill the REST endpoint if the number of users is very big ..because of this I'd like to keep the external state of source tables as an internal Flink state and then do a JOIN on the UserId. The problem here is that I need to "materialize" them using Debezium (or similar) via Kafka and dynamic tables..is there any example of keeping multiple tables synched with Flink state through Debezium (without the need of rewriting all the logic for managing UPDATE/INSERT/DELETE)?

On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng <[hidden email]> wrote:
Hi Flavio,

Nice to hear your ideas on Table API!

Could you be more specific about your requirements? A detailed scenario would be quite helpful. For example, do you want to emit multi records through the collector or do you want to use the timer?

BTW, Table API introduces flatAggregate recently(both non-window flatAggregate and window flatAggregate) and will be included in the near coming release-1.9. The flatAggregate can emit multi records for a single group. More details here[1][2].
Hope this can solve your problem.

Best, Hequn


On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
from what I understood a ProcessWindowFunction can only be used in the Streaming API.
Is there any plan to port them also in the Table API (in the near future)?
I'd like to do with Table API the equivalent of:

final DataStream<MyPojoEvent> events = env.addSource(src);
events.filter(e -> e.getCode() != null)
    .keyBy(event -> Integer.valueOf(event.getCode()))
    .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
    .process(new ProcessWindowFunction<MyPojoEvent, MyPojoEvent, Integer, TimeWindow>()              {.....});

Best,
Flavio




--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809


Reply | Threaded
Open this post in threaded view
|

Re: Table API and ProcessWindowFunction

Flavio Pompermaier
Only one proposal here: many times it happens that when working with streaming sources you need to define which field is the processing/row. Right now you could define the processing or event time field implementingthe DefinedProctimeAttribute or DefinedRowtimeAttribute at source. But this is only helpful if you use SQL API..with TableFunctions for example you don't have a way to get the proc/row field easily. 
Also in the Flink exercises [1] you use aPojo where you have to implement a method getEventTime() to retrieve the row time field.

So, why not declaring 2 general interfaces like EventTimeObject and ProcessingTimeObject so I can declare my objects implementing those interfaces and I can get the fields I need easily?

Best,
Flavio


On Thu, Jul 11, 2019 at 10:01 AM Flavio Pompermaier <[hidden email]> wrote:
Thanks Hequn, I'll give it a try!

Best, Flavio

On Thu, Jul 11, 2019 at 3:38 AM Hequn Cheng <[hidden email]> wrote:
Hi,

> Can you provide a pseudo-code example of how to implement this?
Processing time
If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each record, you get the timestamp from System.currentTimeMillis(), say t, and w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end = w_start + 1000.

Event time
If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each record, get the timestamp from the corresponding timestamp field, say t, and get w_start and w_end same as above. 

More examples can be found in TimeWindowTest[1].

Best, Hequn
 


On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier <[hidden email]> wrote:
The problem with the LATERAL JOIN (via a LookupableTableSource+TableFunction because I need to call that function using the userId a a parameter)  is that I cannot know the window start/end..to me it's not clear how to get that from TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)...
Can you provide a pseudo-code example of how to implement this?

On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng <[hidden email]> wrote:
Hi Flavio,

Thanks for your information.

From your description, it seems that you only use the window to get the start and end time. There are no aggregations happen. If this is the case, you can get the start and end time by yourself(the `TimeWindow.getWindowStartWithOffset()` shows how to get window start according to the timestamp). To be more specific, if you use processing time, you can get your timestamp with System.currentTimeMillis(), and then use it to get the window start and end with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get the timestamp from the rowtime field.

With the start and end time, you can then perform LATERAL JOIN to enrich the information. You can add a cache in your table function to avoid frequent contacting with the REST endpoint.

Best, Hequn


On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier <[hidden email]> wrote:
Hi Hequn, thanks for your answer.
What I'm trying to do is to read a stream of events that basically contains a UserId field and, every X minutes (i.e. using a Time Window) and for each different UserId key, query 3 different REST services to enrich my POJOs*.
For the moment what I do is to use a ProcessWindowFunction after the .keyBy().window() as shown in the  previous mail example to contact those 3 services and enrich my object.

However I don't like this solution because I'd like to use Flink to it's full potential so I'd like to enrich my object using LATERAL TABLEs or ASYNC IO..
The main problem I'm facing right now is that  I can't find a way to pass the thumbing window start/end to the LATERAL JOIN table functions (because this is a parameter of the REST query).
Moreover I don't know whether this use case is something that Table API aims to solve..

* Of course this could kill the REST endpoint if the number of users is very big ..because of this I'd like to keep the external state of source tables as an internal Flink state and then do a JOIN on the UserId. The problem here is that I need to "materialize" them using Debezium (or similar) via Kafka and dynamic tables..is there any example of keeping multiple tables synched with Flink state through Debezium (without the need of rewriting all the logic for managing UPDATE/INSERT/DELETE)?

On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng <[hidden email]> wrote:
Hi Flavio,

Nice to hear your ideas on Table API!

Could you be more specific about your requirements? A detailed scenario would be quite helpful. For example, do you want to emit multi records through the collector or do you want to use the timer?

BTW, Table API introduces flatAggregate recently(both non-window flatAggregate and window flatAggregate) and will be included in the near coming release-1.9. The flatAggregate can emit multi records for a single group. More details here[1][2].
Hope this can solve your problem.

Best, Hequn


On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
from what I understood a ProcessWindowFunction can only be used in the Streaming API.
Is there any plan to port them also in the Table API (in the near future)?
I'd like to do with Table API the equivalent of:

final DataStream<MyPojoEvent> events = env.addSource(src);
events.filter(e -> e.getCode() != null)
    .keyBy(event -> Integer.valueOf(event.getCode()))
    .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
    .process(new ProcessWindowFunction<MyPojoEvent, MyPojoEvent, Integer, TimeWindow>()              {.....});

Best,
Flavio




--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809



Reply | Threaded
Open this post in threaded view
|

Re: Table API and ProcessWindowFunction

Hequn Cheng
Hi Flavio,

I think the reason that we don't have interfaces like EventTimeObject and ProcessingTimeObject is we don't want to define time attributes anywhere. It is considered to define your time attributes in the source. If we add an interface like EventTimeObject and ProcessingTimeObject in Flink, it may bring some other problems like should we generate time attributes anywhere once the object extends EventTimeObject and ProcessingTimeObject. The object may exist in a source, aggregate or even a sink.

However, I think it's a good idea to add such logic in your own code. For example, you can define a user-defined source which can just extract time attributes from EventTimeObject and ProcessingTimeObject, similar to the examples in the exercises.

Best, Hequn

On Thu, Jul 11, 2019 at 4:36 PM Flavio Pompermaier <[hidden email]> wrote:
Only one proposal here: many times it happens that when working with streaming sources you need to define which field is the processing/row. Right now you could define the processing or event time field implementingthe DefinedProctimeAttribute or DefinedRowtimeAttribute at source. But this is only helpful if you use SQL API..with TableFunctions for example you don't have a way to get the proc/row field easily. 
Also in the Flink exercises [1] you use aPojo where you have to implement a method getEventTime() to retrieve the row time field.

So, why not declaring 2 general interfaces like EventTimeObject and ProcessingTimeObject so I can declare my objects implementing those interfaces and I can get the fields I need easily?

Best,
Flavio


On Thu, Jul 11, 2019 at 10:01 AM Flavio Pompermaier <[hidden email]> wrote:
Thanks Hequn, I'll give it a try!

Best, Flavio

On Thu, Jul 11, 2019 at 3:38 AM Hequn Cheng <[hidden email]> wrote:
Hi,

> Can you provide a pseudo-code example of how to implement this?
Processing time
If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each record, you get the timestamp from System.currentTimeMillis(), say t, and w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end = w_start + 1000.

Event time
If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each record, get the timestamp from the corresponding timestamp field, say t, and get w_start and w_end same as above. 

More examples can be found in TimeWindowTest[1].

Best, Hequn
 


On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier <[hidden email]> wrote:
The problem with the LATERAL JOIN (via a LookupableTableSource+TableFunction because I need to call that function using the userId a a parameter)  is that I cannot know the window start/end..to me it's not clear how to get that from TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)...
Can you provide a pseudo-code example of how to implement this?

On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng <[hidden email]> wrote:
Hi Flavio,

Thanks for your information.

From your description, it seems that you only use the window to get the start and end time. There are no aggregations happen. If this is the case, you can get the start and end time by yourself(the `TimeWindow.getWindowStartWithOffset()` shows how to get window start according to the timestamp). To be more specific, if you use processing time, you can get your timestamp with System.currentTimeMillis(), and then use it to get the window start and end with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get the timestamp from the rowtime field.

With the start and end time, you can then perform LATERAL JOIN to enrich the information. You can add a cache in your table function to avoid frequent contacting with the REST endpoint.

Best, Hequn


On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier <[hidden email]> wrote:
Hi Hequn, thanks for your answer.
What I'm trying to do is to read a stream of events that basically contains a UserId field and, every X minutes (i.e. using a Time Window) and for each different UserId key, query 3 different REST services to enrich my POJOs*.
For the moment what I do is to use a ProcessWindowFunction after the .keyBy().window() as shown in the  previous mail example to contact those 3 services and enrich my object.

However I don't like this solution because I'd like to use Flink to it's full potential so I'd like to enrich my object using LATERAL TABLEs or ASYNC IO..
The main problem I'm facing right now is that  I can't find a way to pass the thumbing window start/end to the LATERAL JOIN table functions (because this is a parameter of the REST query).
Moreover I don't know whether this use case is something that Table API aims to solve..

* Of course this could kill the REST endpoint if the number of users is very big ..because of this I'd like to keep the external state of source tables as an internal Flink state and then do a JOIN on the UserId. The problem here is that I need to "materialize" them using Debezium (or similar) via Kafka and dynamic tables..is there any example of keeping multiple tables synched with Flink state through Debezium (without the need of rewriting all the logic for managing UPDATE/INSERT/DELETE)?

On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng <[hidden email]> wrote:
Hi Flavio,

Nice to hear your ideas on Table API!

Could you be more specific about your requirements? A detailed scenario would be quite helpful. For example, do you want to emit multi records through the collector or do you want to use the timer?

BTW, Table API introduces flatAggregate recently(both non-window flatAggregate and window flatAggregate) and will be included in the near coming release-1.9. The flatAggregate can emit multi records for a single group. More details here[1][2].
Hope this can solve your problem.

Best, Hequn


On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
from what I understood a ProcessWindowFunction can only be used in the Streaming API.
Is there any plan to port them also in the Table API (in the near future)?
I'd like to do with Table API the equivalent of:

final DataStream<MyPojoEvent> events = env.addSource(src);
events.filter(e -> e.getCode() != null)
    .keyBy(event -> Integer.valueOf(event.getCode()))
    .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
    .process(new ProcessWindowFunction<MyPojoEvent, MyPojoEvent, Integer, TimeWindow>()              {.....});

Best,
Flavio




--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809