Streaming SQL Job Switches to FINISHED before all records processed

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

Streaming SQL Job Switches to FINISHED before all records processed

austin.ce
Hey all,

I'm not sure if I've missed something in the docs, but I'm having a bit of trouble with a streaming SQL job that starts w/ raw SQL queries and then transitions to a more traditional streaming job. I'm on Flink 1.10 using the Blink planner, running locally with no checkpointing.

The job looks roughly like:

CSV 1 -->
CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time window w/ process func & custom trigger --> some other ops
CSV 3 --> 


When I remove the windowing directly after the `toRetractStream`, the records make it to the "some other ops" stage, but with the windowing, those operations are sometimes not sent any data. I can also get data sent to the downstream operators by putting in a no-op map before the window and placing some breakpoints in there to manually slow down processing.


The logs don't seem to indicate anything went wrong and generally look like:

4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to FINISHED.\4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) [FINISHED]
4820 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
...
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched from RUNNING to FINISHED.
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
...
rest of the shutdown
...
Program execution finished
Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
Job Runtime: 783 ms


Is there something I'm missing in my setup? Could it be my custom window trigger? Bug? I'm stumped.


Thanks,
Austin


Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL Job Switches to FINISHED before all records processed

Till Rohrmann
Hi Austin,

could you share with us the exact job you are running (including the custom window trigger)? This would help us to better understand your problem.

I am also pulling in Klou and Timo who might help with the windowing logic and the Table to DataStream conversion.

Cheers,
Till

On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

I'm not sure if I've missed something in the docs, but I'm having a bit of trouble with a streaming SQL job that starts w/ raw SQL queries and then transitions to a more traditional streaming job. I'm on Flink 1.10 using the Blink planner, running locally with no checkpointing.

The job looks roughly like:

CSV 1 -->
CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time window w/ process func & custom trigger --> some other ops
CSV 3 --> 


When I remove the windowing directly after the `toRetractStream`, the records make it to the "some other ops" stage, but with the windowing, those operations are sometimes not sent any data. I can also get data sent to the downstream operators by putting in a no-op map before the window and placing some breakpoints in there to manually slow down processing.


The logs don't seem to indicate anything went wrong and generally look like:

4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to FINISHED.\4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) [FINISHED]
4820 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
...
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched from RUNNING to FINISHED.
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
...
rest of the shutdown
...
Program execution finished
Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
Job Runtime: 783 ms


Is there something I'm missing in my setup? Could it be my custom window trigger? Bug? I'm stumped.


Thanks,
Austin


Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL Job Switches to FINISHED before all records processed

austin.ce
Hey Till,

Thanks for the reply -- I'll try to see if I can reproduce this in a small repo and share it with you.

Best,
Austin

On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann <[hidden email]> wrote:
Hi Austin,

could you share with us the exact job you are running (including the custom window trigger)? This would help us to better understand your problem.

I am also pulling in Klou and Timo who might help with the windowing logic and the Table to DataStream conversion.

Cheers,
Till

On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

I'm not sure if I've missed something in the docs, but I'm having a bit of trouble with a streaming SQL job that starts w/ raw SQL queries and then transitions to a more traditional streaming job. I'm on Flink 1.10 using the Blink planner, running locally with no checkpointing.

The job looks roughly like:

CSV 1 -->
CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time window w/ process func & custom trigger --> some other ops
CSV 3 --> 


When I remove the windowing directly after the `toRetractStream`, the records make it to the "some other ops" stage, but with the windowing, those operations are sometimes not sent any data. I can also get data sent to the downstream operators by putting in a no-op map before the window and placing some breakpoints in there to manually slow down processing.


The logs don't seem to indicate anything went wrong and generally look like:

4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to FINISHED.\4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) [FINISHED]
4820 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
...
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched from RUNNING to FINISHED.
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
...
rest of the shutdown
...
Program execution finished
Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
Job Runtime: 783 ms


Is there something I'm missing in my setup? Could it be my custom window trigger? Bug? I'm stumped.


Thanks,
Austin


Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL Job Switches to FINISHED before all records processed

austin.ce
Hey all,

Thanks for your patience. I've got a small repo that reproduces the issue here: https://github.com/austince/flink-1.10-sql-windowing-error

Not sure what I'm doing wrong but it feels silly.

Thanks so much!
Austin

On Tue, Sep 29, 2020 at 3:48 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Till,

Thanks for the reply -- I'll try to see if I can reproduce this in a small repo and share it with you.

Best,
Austin

On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann <[hidden email]> wrote:
Hi Austin,

could you share with us the exact job you are running (including the custom window trigger)? This would help us to better understand your problem.

I am also pulling in Klou and Timo who might help with the windowing logic and the Table to DataStream conversion.

Cheers,
Till

On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

I'm not sure if I've missed something in the docs, but I'm having a bit of trouble with a streaming SQL job that starts w/ raw SQL queries and then transitions to a more traditional streaming job. I'm on Flink 1.10 using the Blink planner, running locally with no checkpointing.

The job looks roughly like:

CSV 1 -->
CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time window w/ process func & custom trigger --> some other ops
CSV 3 --> 


When I remove the windowing directly after the `toRetractStream`, the records make it to the "some other ops" stage, but with the windowing, those operations are sometimes not sent any data. I can also get data sent to the downstream operators by putting in a no-op map before the window and placing some breakpoints in there to manually slow down processing.


The logs don't seem to indicate anything went wrong and generally look like:

4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to FINISHED.\4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) [FINISHED]
4820 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
...
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched from RUNNING to FINISHED.
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
...
rest of the shutdown
...
Program execution finished
Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
Job Runtime: 783 ms


Is there something I'm missing in my setup? Could it be my custom window trigger? Bug? I'm stumped.


Thanks,
Austin


Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL Job Switches to FINISHED before all records processed

Till Rohrmann
Hi Austin,

I believe that the problem is the processing time window. Unlike for event time where we send a MAX_WATERMARK at the end of the stream to trigger all remaining windows, this does not happen for processing time windows. Hence, if your stream ends and you still have an open processing time window, then it will never get triggered.

The problem should disappear if you use event time or if you process unbounded streams which never end.

Cheers,
Till

On Thu, Oct 1, 2020 at 12:01 AM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

Thanks for your patience. I've got a small repo that reproduces the issue here: https://github.com/austince/flink-1.10-sql-windowing-error

Not sure what I'm doing wrong but it feels silly.

Thanks so much!
Austin

On Tue, Sep 29, 2020 at 3:48 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Till,

Thanks for the reply -- I'll try to see if I can reproduce this in a small repo and share it with you.

Best,
Austin

On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann <[hidden email]> wrote:
Hi Austin,

could you share with us the exact job you are running (including the custom window trigger)? This would help us to better understand your problem.

I am also pulling in Klou and Timo who might help with the windowing logic and the Table to DataStream conversion.

Cheers,
Till

On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

I'm not sure if I've missed something in the docs, but I'm having a bit of trouble with a streaming SQL job that starts w/ raw SQL queries and then transitions to a more traditional streaming job. I'm on Flink 1.10 using the Blink planner, running locally with no checkpointing.

The job looks roughly like:

CSV 1 -->
CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time window w/ process func & custom trigger --> some other ops
CSV 3 --> 


When I remove the windowing directly after the `toRetractStream`, the records make it to the "some other ops" stage, but with the windowing, those operations are sometimes not sent any data. I can also get data sent to the downstream operators by putting in a no-op map before the window and placing some breakpoints in there to manually slow down processing.


The logs don't seem to indicate anything went wrong and generally look like:

4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to FINISHED.\4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) [FINISHED]
4820 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
...
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched from RUNNING to FINISHED.
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
...
rest of the shutdown
...
Program execution finished
Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
Job Runtime: 783 ms


Is there something I'm missing in my setup? Could it be my custom window trigger? Bug? I'm stumped.


Thanks,
Austin


Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL Job Switches to FINISHED before all records processed

austin.ce
Perfect, thanks so much Till!

On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann <[hidden email]> wrote:
Hi Austin,

I believe that the problem is the processing time window. Unlike for event time where we send a MAX_WATERMARK at the end of the stream to trigger all remaining windows, this does not happen for processing time windows. Hence, if your stream ends and you still have an open processing time window, then it will never get triggered.

The problem should disappear if you use event time or if you process unbounded streams which never end.

Cheers,
Till

On Thu, Oct 1, 2020 at 12:01 AM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

Thanks for your patience. I've got a small repo that reproduces the issue here: https://github.com/austince/flink-1.10-sql-windowing-error

Not sure what I'm doing wrong but it feels silly.

Thanks so much!
Austin

On Tue, Sep 29, 2020 at 3:48 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Till,

Thanks for the reply -- I'll try to see if I can reproduce this in a small repo and share it with you.

Best,
Austin

On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann <[hidden email]> wrote:
Hi Austin,

could you share with us the exact job you are running (including the custom window trigger)? This would help us to better understand your problem.

I am also pulling in Klou and Timo who might help with the windowing logic and the Table to DataStream conversion.

Cheers,
Till

On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

I'm not sure if I've missed something in the docs, but I'm having a bit of trouble with a streaming SQL job that starts w/ raw SQL queries and then transitions to a more traditional streaming job. I'm on Flink 1.10 using the Blink planner, running locally with no checkpointing.

The job looks roughly like:

CSV 1 -->
CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time window w/ process func & custom trigger --> some other ops
CSV 3 --> 


When I remove the windowing directly after the `toRetractStream`, the records make it to the "some other ops" stage, but with the windowing, those operations are sometimes not sent any data. I can also get data sent to the downstream operators by putting in a no-op map before the window and placing some breakpoints in there to manually slow down processing.


The logs don't seem to indicate anything went wrong and generally look like:

4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to FINISHED.\4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) [FINISHED]
4820 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
...
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched from RUNNING to FINISHED.
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
...
rest of the shutdown
...
Program execution finished
Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
Job Runtime: 783 ms


Is there something I'm missing in my setup? Could it be my custom window trigger? Bug? I'm stumped.


Thanks,
Austin












Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL Job Switches to FINISHED before all records processed

austin.ce
Hey Till,

Just a quick question on time characteristics -- this should work for IngestionTime as well, correct? Is there anything special I need to do to have the CsvTableSource/ toRetractStream call to carry through the assigned timestamps, or do I have to re-assign timestamps during the conversion? I'm currently getting the `Record has Long.MIN_VALUE timestamp (= no timestamp marker)` error, though I'm seeing timestamps being assigned if I step through the AutomaticWatermarkContext.

Thanks,
Austin

On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards <[hidden email]> wrote:
Perfect, thanks so much Till!

On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann <[hidden email]> wrote:
Hi Austin,

I believe that the problem is the processing time window. Unlike for event time where we send a MAX_WATERMARK at the end of the stream to trigger all remaining windows, this does not happen for processing time windows. Hence, if your stream ends and you still have an open processing time window, then it will never get triggered.

The problem should disappear if you use event time or if you process unbounded streams which never end.

Cheers,
Till

On Thu, Oct 1, 2020 at 12:01 AM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

Thanks for your patience. I've got a small repo that reproduces the issue here: https://github.com/austince/flink-1.10-sql-windowing-error

Not sure what I'm doing wrong but it feels silly.

Thanks so much!
Austin

On Tue, Sep 29, 2020 at 3:48 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Till,

Thanks for the reply -- I'll try to see if I can reproduce this in a small repo and share it with you.

Best,
Austin

On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann <[hidden email]> wrote:
Hi Austin,

could you share with us the exact job you are running (including the custom window trigger)? This would help us to better understand your problem.

I am also pulling in Klou and Timo who might help with the windowing logic and the Table to DataStream conversion.

Cheers,
Till

On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

I'm not sure if I've missed something in the docs, but I'm having a bit of trouble with a streaming SQL job that starts w/ raw SQL queries and then transitions to a more traditional streaming job. I'm on Flink 1.10 using the Blink planner, running locally with no checkpointing.

The job looks roughly like:

CSV 1 -->
CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time window w/ process func & custom trigger --> some other ops
CSV 3 --> 


When I remove the windowing directly after the `toRetractStream`, the records make it to the "some other ops" stage, but with the windowing, those operations are sometimes not sent any data. I can also get data sent to the downstream operators by putting in a no-op map before the window and placing some breakpoints in there to manually slow down processing.


The logs don't seem to indicate anything went wrong and generally look like:

4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to FINISHED.\4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) [FINISHED]
4820 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
...
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched from RUNNING to FINISHED.
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
...
rest of the shutdown
...
Program execution finished
Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
Job Runtime: 783 ms


Is there something I'm missing in my setup? Could it be my custom window trigger? Bug? I'm stumped.


Thanks,
Austin












Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL Job Switches to FINISHED before all records processed

Till Rohrmann
Hi Austin,

yes, it should also work for ingestion time.

I am not entirely sure whether event time is preserved when converting a Table into a retract stream. It should be possible and if it is not working, then I guess it is a missing feature. But I am sure that [hidden email] knows more about it. In doubt, you could assign a new watermark generator when having obtained the retract stream.

Here is also a link to some information about event time and watermarks [1]. Unfortunately, it does not state anything about the direction Table => DataStream.


Cheers,
Till

On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Till,

Just a quick question on time characteristics -- this should work for IngestionTime as well, correct? Is there anything special I need to do to have the CsvTableSource/ toRetractStream call to carry through the assigned timestamps, or do I have to re-assign timestamps during the conversion? I'm currently getting the `Record has Long.MIN_VALUE timestamp (= no timestamp marker)` error, though I'm seeing timestamps being assigned if I step through the AutomaticWatermarkContext.

Thanks,
Austin

On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards <[hidden email]> wrote:
Perfect, thanks so much Till!

On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann <[hidden email]> wrote:
Hi Austin,

I believe that the problem is the processing time window. Unlike for event time where we send a MAX_WATERMARK at the end of the stream to trigger all remaining windows, this does not happen for processing time windows. Hence, if your stream ends and you still have an open processing time window, then it will never get triggered.

The problem should disappear if you use event time or if you process unbounded streams which never end.

Cheers,
Till

On Thu, Oct 1, 2020 at 12:01 AM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

Thanks for your patience. I've got a small repo that reproduces the issue here: https://github.com/austince/flink-1.10-sql-windowing-error

Not sure what I'm doing wrong but it feels silly.

Thanks so much!
Austin

On Tue, Sep 29, 2020 at 3:48 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Till,

Thanks for the reply -- I'll try to see if I can reproduce this in a small repo and share it with you.

Best,
Austin

On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann <[hidden email]> wrote:
Hi Austin,

could you share with us the exact job you are running (including the custom window trigger)? This would help us to better understand your problem.

I am also pulling in Klou and Timo who might help with the windowing logic and the Table to DataStream conversion.

Cheers,
Till

On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

I'm not sure if I've missed something in the docs, but I'm having a bit of trouble with a streaming SQL job that starts w/ raw SQL queries and then transitions to a more traditional streaming job. I'm on Flink 1.10 using the Blink planner, running locally with no checkpointing.

The job looks roughly like:

CSV 1 -->
CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time window w/ process func & custom trigger --> some other ops
CSV 3 --> 


When I remove the windowing directly after the `toRetractStream`, the records make it to the "some other ops" stage, but with the windowing, those operations are sometimes not sent any data. I can also get data sent to the downstream operators by putting in a no-op map before the window and placing some breakpoints in there to manually slow down processing.


The logs don't seem to indicate anything went wrong and generally look like:

4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to FINISHED.\4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) [FINISHED]
4820 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
...
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched from RUNNING to FINISHED.
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
...
rest of the shutdown
...
Program execution finished
Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
Job Runtime: 783 ms


Is there something I'm missing in my setup? Could it be my custom window trigger? Bug? I'm stumped.


Thanks,
Austin












Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL Job Switches to FINISHED before all records processed

austin.ce
Hey Till,

Thanks for the notes. Yeah, the docs don't mention anything specific to this case, not sure if it's an uncommon one. Assigning timestamps on conversion does solve the issue. I'm happy to take a stab at implementing the feature if it is indeed missing and you all think it'd be worthwhile. I think it's definitely a confusing aspect of working w/ the Table & DataStream APIs together.

Best,
Austin

On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann <[hidden email]> wrote:
Hi Austin,

yes, it should also work for ingestion time.

I am not entirely sure whether event time is preserved when converting a Table into a retract stream. It should be possible and if it is not working, then I guess it is a missing feature. But I am sure that [hidden email] knows more about it. In doubt, you could assign a new watermark generator when having obtained the retract stream.

Here is also a link to some information about event time and watermarks [1]. Unfortunately, it does not state anything about the direction Table => DataStream.


Cheers,
Till

On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Till,

Just a quick question on time characteristics -- this should work for IngestionTime as well, correct? Is there anything special I need to do to have the CsvTableSource/ toRetractStream call to carry through the assigned timestamps, or do I have to re-assign timestamps during the conversion? I'm currently getting the `Record has Long.MIN_VALUE timestamp (= no timestamp marker)` error, though I'm seeing timestamps being assigned if I step through the AutomaticWatermarkContext.

Thanks,
Austin

On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards <[hidden email]> wrote:
Perfect, thanks so much Till!

On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann <[hidden email]> wrote:
Hi Austin,

I believe that the problem is the processing time window. Unlike for event time where we send a MAX_WATERMARK at the end of the stream to trigger all remaining windows, this does not happen for processing time windows. Hence, if your stream ends and you still have an open processing time window, then it will never get triggered.

The problem should disappear if you use event time or if you process unbounded streams which never end.

Cheers,
Till

On Thu, Oct 1, 2020 at 12:01 AM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

Thanks for your patience. I've got a small repo that reproduces the issue here: https://github.com/austince/flink-1.10-sql-windowing-error

Not sure what I'm doing wrong but it feels silly.

Thanks so much!
Austin

On Tue, Sep 29, 2020 at 3:48 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Till,

Thanks for the reply -- I'll try to see if I can reproduce this in a small repo and share it with you.

Best,
Austin

On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann <[hidden email]> wrote:
Hi Austin,

could you share with us the exact job you are running (including the custom window trigger)? This would help us to better understand your problem.

I am also pulling in Klou and Timo who might help with the windowing logic and the Table to DataStream conversion.

Cheers,
Till

On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

I'm not sure if I've missed something in the docs, but I'm having a bit of trouble with a streaming SQL job that starts w/ raw SQL queries and then transitions to a more traditional streaming job. I'm on Flink 1.10 using the Blink planner, running locally with no checkpointing.

The job looks roughly like:

CSV 1 -->
CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time window w/ process func & custom trigger --> some other ops
CSV 3 --> 


When I remove the windowing directly after the `toRetractStream`, the records make it to the "some other ops" stage, but with the windowing, those operations are sometimes not sent any data. I can also get data sent to the downstream operators by putting in a no-op map before the window and placing some breakpoints in there to manually slow down processing.


The logs don't seem to indicate anything went wrong and generally look like:

4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to FINISHED.\4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) [FINISHED]
4820 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
...
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched from RUNNING to FINISHED.
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
...
rest of the shutdown
...
Program execution finished
Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
Job Runtime: 783 ms


Is there something I'm missing in my setup? Could it be my custom window trigger? Bug? I'm stumped.


Thanks,
Austin












Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL Job Switches to FINISHED before all records processed

Till Rohrmann
Hi Austin,

thanks for offering to help. First I would suggest asking Timo whether this is an aspect which is still missing or whether we overlooked it. Based on that we can then take the next steps.

Cheers,
Till

On Fri, Oct 2, 2020 at 7:05 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Till,

Thanks for the notes. Yeah, the docs don't mention anything specific to this case, not sure if it's an uncommon one. Assigning timestamps on conversion does solve the issue. I'm happy to take a stab at implementing the feature if it is indeed missing and you all think it'd be worthwhile. I think it's definitely a confusing aspect of working w/ the Table & DataStream APIs together.

Best,
Austin

On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann <[hidden email]> wrote:
Hi Austin,

yes, it should also work for ingestion time.

I am not entirely sure whether event time is preserved when converting a Table into a retract stream. It should be possible and if it is not working, then I guess it is a missing feature. But I am sure that [hidden email] knows more about it. In doubt, you could assign a new watermark generator when having obtained the retract stream.

Here is also a link to some information about event time and watermarks [1]. Unfortunately, it does not state anything about the direction Table => DataStream.


Cheers,
Till

On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Till,

Just a quick question on time characteristics -- this should work for IngestionTime as well, correct? Is there anything special I need to do to have the CsvTableSource/ toRetractStream call to carry through the assigned timestamps, or do I have to re-assign timestamps during the conversion? I'm currently getting the `Record has Long.MIN_VALUE timestamp (= no timestamp marker)` error, though I'm seeing timestamps being assigned if I step through the AutomaticWatermarkContext.

Thanks,
Austin

On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards <[hidden email]> wrote:
Perfect, thanks so much Till!

On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann <[hidden email]> wrote:
Hi Austin,

I believe that the problem is the processing time window. Unlike for event time where we send a MAX_WATERMARK at the end of the stream to trigger all remaining windows, this does not happen for processing time windows. Hence, if your stream ends and you still have an open processing time window, then it will never get triggered.

The problem should disappear if you use event time or if you process unbounded streams which never end.

Cheers,
Till

On Thu, Oct 1, 2020 at 12:01 AM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

Thanks for your patience. I've got a small repo that reproduces the issue here: https://github.com/austince/flink-1.10-sql-windowing-error

Not sure what I'm doing wrong but it feels silly.

Thanks so much!
Austin

On Tue, Sep 29, 2020 at 3:48 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Till,

Thanks for the reply -- I'll try to see if I can reproduce this in a small repo and share it with you.

Best,
Austin

On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann <[hidden email]> wrote:
Hi Austin,

could you share with us the exact job you are running (including the custom window trigger)? This would help us to better understand your problem.

I am also pulling in Klou and Timo who might help with the windowing logic and the Table to DataStream conversion.

Cheers,
Till

On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

I'm not sure if I've missed something in the docs, but I'm having a bit of trouble with a streaming SQL job that starts w/ raw SQL queries and then transitions to a more traditional streaming job. I'm on Flink 1.10 using the Blink planner, running locally with no checkpointing.

The job looks roughly like:

CSV 1 -->
CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time window w/ process func & custom trigger --> some other ops
CSV 3 --> 


When I remove the windowing directly after the `toRetractStream`, the records make it to the "some other ops" stage, but with the windowing, those operations are sometimes not sent any data. I can also get data sent to the downstream operators by putting in a no-op map before the window and placing some breakpoints in there to manually slow down processing.


The logs don't seem to indicate anything went wrong and generally look like:

4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to FINISHED.\4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) [FINISHED]
4820 [flink-akka.actor.default-dispatcher-5] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
...
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched from RUNNING to FINISHED.
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
4996 [Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
...
rest of the shutdown
...
Program execution finished
Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
Job Runtime: 783 ms


Is there something I'm missing in my setup? Could it be my custom window trigger? Bug? I'm stumped.


Thanks,
Austin












Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL Job Switches to FINISHED before all records processed

Timo Walther
Hi Austin,

could you share some details of your SQL query with us? The reason why
I'm asking is because I guess that the rowtime field is not inserted
into the `StreamRecord` of DataStream API. The rowtime field is only
inserted if there is a single field in the output of the query that is a
valid "time attribute".

Esp. after non-time-based joins and aggregations, time attributes loose
there properties and become regular timestamps. Because timestamp and
watermarks might have diverged.

If you know what you're doing, you can also assign the timestamp
manually after `toRetractStream.assignTimestampAndWatermarks` and
reinsert the field into the stream record. But before you do that, I
think it is better to share more information about the query with us.

I hope this helps.

Regards,
Timo



On 05.10.20 09:25, Till Rohrmann wrote:

> Hi Austin,
>
> thanks for offering to help. First I would suggest asking Timo whether
> this is an aspect which is still missing or whether we overlooked it.
> Based on that we can then take the next steps.
>
> Cheers,
> Till
>
> On Fri, Oct 2, 2020 at 7:05 PM Austin Cawley-Edwards
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Hey Till,
>
>     Thanks for the notes. Yeah, the docs don't mention anything specific
>     to this case, not sure if it's an uncommon one. Assigning timestamps
>     on conversion does solve the issue. I'm happy to take a stab at
>     implementing the feature if it is indeed missing and you all think
>     it'd be worthwhile. I think it's definitely a confusing aspect of
>     working w/ the Table & DataStream APIs together.
>
>     Best,
>     Austin
>
>     On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi Austin,
>
>         yes, it should also work for ingestion time.
>
>         I am not entirely sure whether event time is preserved when
>         converting a Table into a retract stream. It should be possible
>         and if it is not working, then I guess it is a missing feature.
>         But I am sure that @Timo Walther
>         <mailto:[hidden email]> knows more about it. In doubt, you
>         could assign a new watermark generator when having obtained the
>         retract stream.
>
>         Here is also a link to some information about event time and
>         watermarks [1]. Unfortunately, it does not state anything about
>         the direction Table => DataStream.
>
>         [1]
>         https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
>
>         Cheers,
>         Till
>
>         On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards
>         <[hidden email] <mailto:[hidden email]>> wrote:
>
>             Hey Till,
>
>             Just a quick question on time characteristics -- this should
>             work for IngestionTime as well, correct? Is there anything
>             special I need to do to have the CsvTableSource/
>             toRetractStream call to carry through the assigned
>             timestamps, or do I have to re-assign timestamps during the
>             conversion? I'm currently getting the `Record has
>             Long.MIN_VALUE timestamp (= no timestamp marker)` error,
>             though I'm seeing timestamps being assigned if I step
>             through the AutomaticWatermarkContext.
>
>             Thanks,
>             Austin
>
>             On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards
>             <[hidden email] <mailto:[hidden email]>>
>             wrote:
>
>                 Perfect, thanks so much Till!
>
>                 On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann
>                 <[hidden email] <mailto:[hidden email]>> wrote:
>
>                     Hi Austin,
>
>                     I believe that the problem is the processing time
>                     window. Unlike for event time where we send a
>                     MAX_WATERMARK at the end of the stream to trigger
>                     all remaining windows, this does not happen for
>                     processing time windows. Hence, if your stream ends
>                     and you still have an open processing time window,
>                     then it will never get triggered.
>
>                     The problem should disappear if you use event time
>                     or if you process unbounded streams which never end.
>
>                     Cheers,
>                     Till
>
>                     On Thu, Oct 1, 2020 at 12:01 AM Austin
>                     Cawley-Edwards <[hidden email]
>                     <mailto:[hidden email]>> wrote:
>
>                         Hey all,
>
>                         Thanks for your patience. I've got a small repo
>                         that reproduces the issue here:
>                         https://github.com/austince/flink-1.10-sql-windowing-error
>
>
>                         Not sure what I'm doing wrong but it feels silly.
>
>                         Thanks so much!
>                         Austin
>
>                         On Tue, Sep 29, 2020 at 3:48 PM Austin
>                         Cawley-Edwards <[hidden email]
>                         <mailto:[hidden email]>> wrote:
>
>                             Hey Till,
>
>                             Thanks for the reply -- I'll try to see if I
>                             can reproduce this in a small repo and share
>                             it with you.
>
>                             Best,
>                             Austin
>
>                             On Tue, Sep 29, 2020 at 3:58 AM Till
>                             Rohrmann <[hidden email]
>                             <mailto:[hidden email]>> wrote:
>
>                                 Hi Austin,
>
>                                 could you share with us the exact job
>                                 you are running (including the custom
>                                 window trigger)? This would help us to
>                                 better understand your problem.
>
>                                 I am also pulling in Klou and Timo who
>                                 might help with the windowing logic and
>                                 the Table to DataStream conversion.
>
>                                 Cheers,
>                                 Till
>
>                                 On Mon, Sep 28, 2020 at 11:49 PM Austin
>                                 Cawley-Edwards <[hidden email]
>                                 <mailto:[hidden email]>> wrote:
>
>                                     Hey all,
>
>                                     I'm not sure if I've missed
>                                     something in the docs, but I'm
>                                     having a bit of trouble with a
>                                     streaming SQL job that starts w/ raw
>                                     SQL queries and then transitions to
>                                     a more traditional streaming job.
>                                     I'm on Flink 1.10 using the Blink
>                                     planner, running locally with no
>                                     checkpointing.
>
>                                     The job looks roughly like:
>
>                                     CSV 1 -->
>                                     CSV 2 -->  SQL Query to Join -->
>                                     toRetractStream --> keyed time
>                                     window w/ process func & custom
>                                     trigger --> some other ops
>                                     CSV 3 -->
>
>
>                                     When I remove the windowing directly
>                                     after the `toRetractStream`, the
>                                     records make it to the "some other
>                                     ops" stage, but with the windowing,
>                                     those operations are sometimes not
>                                     sent any data. I can also get data
>                                     sent to the downstream operators by
>                                     putting in a no-op map before the
>                                     window and placing some breakpoints
>                                     in there to manually slow down
>                                     processing.
>
>
>                                     The logs don't seem to indicate
>                                     anything went wrong and generally
>                                     look like:
>
>                                     4819 [Source: Custom File source
>                                     (1/1)] INFO
>                                       org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to FINISHED.\4819 [Source: Custom File source (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
>                                     4819 [Source: Custom File source
>                                     (1/1)] INFO
>                                       org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) [FINISHED]
>                                     4820
>                                     [flink-akka.actor.default-dispatcher-5]
>                                     INFO
>                                       org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
>                                     ...
>                                     4996
>                                     [Window(TumblingProcessingTimeWindows(60000),
>                                     TimedCountTrigger,
>                                     ProcessWindowFunction$1) (1/1)] INFO
>                                       org.apache.flink.runtime.taskmanager.Task  - Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched from RUNNING to FINISHED.
>                                     4996
>                                     [Window(TumblingProcessingTimeWindows(60000),
>                                     TimedCountTrigger,
>                                     ProcessWindowFunction$1) (1/1)] INFO
>                                       org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
>                                     4996
>                                     [Window(TumblingProcessingTimeWindows(60000),
>                                     TimedCountTrigger,
>                                     ProcessWindowFunction$1) (1/1)] INFO
>                                       org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
>                                     ...
>                                     rest of the shutdown
>                                     ...
>                                     Program execution finished
>                                     Job with JobID
>                                     889b161e432c0e69a8d760bbed205d5d has
>                                     finished.
>                                     Job Runtime: 783 ms
>
>
>                                     Is there something I'm missing in my
>                                     setup? Could it be my custom window
>                                     trigger? Bug? I'm stumped.
>
>
>                                     Thanks,
>                                     Austin
>
>
>
>
>
>
>
>
>
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL Job Switches to FINISHED before all records processed

Timo Walther
Btw which planner are you using?

Regards,
Timo

On 05.10.20 10:23, Timo Walther wrote:

> Hi Austin,
>
> could you share some details of your SQL query with us? The reason why
> I'm asking is because I guess that the rowtime field is not inserted
> into the `StreamRecord` of DataStream API. The rowtime field is only
> inserted if there is a single field in the output of the query that is a
> valid "time attribute".
>
> Esp. after non-time-based joins and aggregations, time attributes loose
> there properties and become regular timestamps. Because timestamp and
> watermarks might have diverged.
>
> If you know what you're doing, you can also assign the timestamp
> manually after `toRetractStream.assignTimestampAndWatermarks` and
> reinsert the field into the stream record. But before you do that, I
> think it is better to share more information about the query with us.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
>
> On 05.10.20 09:25, Till Rohrmann wrote:
>> Hi Austin,
>>
>> thanks for offering to help. First I would suggest asking Timo whether
>> this is an aspect which is still missing or whether we overlooked it.
>> Based on that we can then take the next steps.
>>
>> Cheers,
>> Till
>>
>> On Fri, Oct 2, 2020 at 7:05 PM Austin Cawley-Edwards
>> <[hidden email] <mailto:[hidden email]>> wrote:
>>
>>     Hey Till,
>>
>>     Thanks for the notes. Yeah, the docs don't mention anything specific
>>     to this case, not sure if it's an uncommon one. Assigning timestamps
>>     on conversion does solve the issue. I'm happy to take a stab at
>>     implementing the feature if it is indeed missing and you all think
>>     it'd be worthwhile. I think it's definitely a confusing aspect of
>>     working w/ the Table & DataStream APIs together.
>>
>>     Best,
>>     Austin
>>
>>     On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann <[hidden email]
>>     <mailto:[hidden email]>> wrote:
>>
>>         Hi Austin,
>>
>>         yes, it should also work for ingestion time.
>>
>>         I am not entirely sure whether event time is preserved when
>>         converting a Table into a retract stream. It should be possible
>>         and if it is not working, then I guess it is a missing feature.
>>         But I am sure that @Timo Walther
>>         <mailto:[hidden email]> knows more about it. In doubt, you
>>         could assign a new watermark generator when having obtained the
>>         retract stream.
>>
>>         Here is also a link to some information about event time and
>>         watermarks [1]. Unfortunately, it does not state anything about
>>         the direction Table => DataStream.
>>
>>         [1]
>>        
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html 
>>
>>
>>         Cheers,
>>         Till
>>
>>         On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards
>>         <[hidden email] <mailto:[hidden email]>> wrote:
>>
>>             Hey Till,
>>
>>             Just a quick question on time characteristics -- this should
>>             work for IngestionTime as well, correct? Is there anything
>>             special I need to do to have the CsvTableSource/
>>             toRetractStream call to carry through the assigned
>>             timestamps, or do I have to re-assign timestamps during the
>>             conversion? I'm currently getting the `Record has
>>             Long.MIN_VALUE timestamp (= no timestamp marker)` error,
>>             though I'm seeing timestamps being assigned if I step
>>             through the AutomaticWatermarkContext.
>>
>>             Thanks,
>>             Austin
>>
>>             On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards
>>             <[hidden email] <mailto:[hidden email]>>
>>             wrote:
>>
>>                 Perfect, thanks so much Till!
>>
>>                 On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann
>>                 <[hidden email] <mailto:[hidden email]>>
>> wrote:
>>
>>                     Hi Austin,
>>
>>                     I believe that the problem is the processing time
>>                     window. Unlike for event time where we send a
>>                     MAX_WATERMARK at the end of the stream to trigger
>>                     all remaining windows, this does not happen for
>>                     processing time windows. Hence, if your stream ends
>>                     and you still have an open processing time window,
>>                     then it will never get triggered.
>>
>>                     The problem should disappear if you use event time
>>                     or if you process unbounded streams which never end.
>>
>>                     Cheers,
>>                     Till
>>
>>                     On Thu, Oct 1, 2020 at 12:01 AM Austin
>>                     Cawley-Edwards <[hidden email]
>>                     <mailto:[hidden email]>> wrote:
>>
>>                         Hey all,
>>
>>                         Thanks for your patience. I've got a small repo
>>                         that reproduces the issue here:
>>                        
>> https://github.com/austince/flink-1.10-sql-windowing-error
>>
>>
>>                         Not sure what I'm doing wrong but it feels silly.
>>
>>                         Thanks so much!
>>                         Austin
>>
>>                         On Tue, Sep 29, 2020 at 3:48 PM Austin
>>                         Cawley-Edwards <[hidden email]
>>                         <mailto:[hidden email]>> wrote:
>>
>>                             Hey Till,
>>
>>                             Thanks for the reply -- I'll try to see if I
>>                             can reproduce this in a small repo and share
>>                             it with you.
>>
>>                             Best,
>>                             Austin
>>
>>                             On Tue, Sep 29, 2020 at 3:58 AM Till
>>                             Rohrmann <[hidden email]
>>                             <mailto:[hidden email]>> wrote:
>>
>>                                 Hi Austin,
>>
>>                                 could you share with us the exact job
>>                                 you are running (including the custom
>>                                 window trigger)? This would help us to
>>                                 better understand your problem.
>>
>>                                 I am also pulling in Klou and Timo who
>>                                 might help with the windowing logic and
>>                                 the Table to DataStream conversion.
>>
>>                                 Cheers,
>>                                 Till
>>
>>                                 On Mon, Sep 28, 2020 at 11:49 PM Austin
>>                                 Cawley-Edwards <[hidden email]
>>                                 <mailto:[hidden email]>> wrote:
>>
>>                                     Hey all,
>>
>>                                     I'm not sure if I've missed
>>                                     something in the docs, but I'm
>>                                     having a bit of trouble with a
>>                                     streaming SQL job that starts w/ raw
>>                                     SQL queries and then transitions to
>>                                     a more traditional streaming job.
>>                                     I'm on Flink 1.10 using the Blink
>>                                     planner, running locally with no
>>                                     checkpointing.
>>
>>                                     The job looks roughly like:
>>
>>                                     CSV 1 -->
>>                                     CSV 2 -->  SQL Query to Join -->
>>                                     toRetractStream --> keyed time
>>                                     window w/ process func & custom
>>                                     trigger --> some other ops
>>                                     CSV 3 -->
>>
>>
>>                                     When I remove the windowing directly
>>                                     after the `toRetractStream`, the
>>                                     records make it to the "some other
>>                                     ops" stage, but with the windowing,
>>                                     those operations are sometimes not
>>                                     sent any data. I can also get data
>>                                     sent to the downstream operators by
>>                                     putting in a no-op map before the
>>                                     window and placing some breakpoints
>>                                     in there to manually slow down
>>                                     processing.
>>
>>
>>                                     The logs don't seem to indicate
>>                                     anything went wrong and generally
>>                                     look like:
>>
>>                                     4819 [Source: Custom File source
>>                                     (1/1)] INFO
>>                                      
>>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File
>> source (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING
>> to FINISHED.\4819 [Source: Custom File source (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources
>> for Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
>>                                     4819 [Source: Custom File source
>>                                     (1/1)] INFO
>>                                      
>>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
>> streams are closed for task Source: Custom File source (1/1)
>> (3578629787c777320d9ab030c004abd4) [FINISHED]
>>                                     4820
>>                                    
>> [flink-akka.actor.default-dispatcher-5]
>>                                     INFO
>>                                      
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering
>> task and sending final execution state FINISHED to JobManager for task
>> Source: Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
>>                                     ...
>>                                     4996
>>                                    
>> [Window(TumblingProcessingTimeWindows(60000),
>>                                     TimedCountTrigger,
>>                                     ProcessWindowFunction$1) (1/1)] INFO
>>                                      
>>  org.apache.flink.runtime.taskmanager.Task  -
>> Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f)
>> switched from RUNNING to FINISHED.
>>                                     4996
>>                                    
>> [Window(TumblingProcessingTimeWindows(60000),
>>                                     TimedCountTrigger,
>>                                     ProcessWindowFunction$1) (1/1)] INFO
>>                                      
>>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources
>> for Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
>>                                     4996
>>                                    
>> [Window(TumblingProcessingTimeWindows(60000),
>>                                     TimedCountTrigger,
>>                                     ProcessWindowFunction$1) (1/1)] INFO
>>                                      
>>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
>> streams are closed for task
>> Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f)
>> [FINISHED]
>>                                     ...
>>                                     rest of the shutdown
>>                                     ...
>>                                     Program execution finished
>>                                     Job with JobID
>>                                     889b161e432c0e69a8d760bbed205d5d has
>>                                     finished.
>>                                     Job Runtime: 783 ms
>>
>>
>>                                     Is there something I'm missing in my
>>                                     setup? Could it be my custom window
>>                                     trigger? Bug? I'm stumped.
>>
>>
>>                                     Thanks,
>>                                     Austin
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL Job Switches to FINISHED before all records processed

austin.ce
Hey Timo,

Sorry for the delayed reply. I'm using the Blink planner and using non-time-based joins. I've got an example repo here that shows my query/ setup [1]. It's got the manual timestamp assignment commented out for now, but that does indeed solve the issue.

I'd really like to not worry about time at all in this job hah -- I started just using processing time, but Till pointed out that processing time timers won't be fired when input ends, which is the case for this streaming job processing CSV files, so I should be using event time. With that suggestion, I switched to ingestion time, where I then discovered the issue converting from SQL to data stream.

IMO, as a user manually assigning timestamps on conversion makes sense if you're using event time and already handling time attributes yourself, but for ingestion time you really don't want to think about time at all, which is why it might make sense to propigate the automatically assigned timestamps in that case. Though not sure how difficult that would be. Let me know what you think!


Best + thanks again,
Austin


On Mon, Oct 5, 2020 at 4:24 AM Timo Walther <[hidden email]> wrote:
Btw which planner are you using?

Regards,
Timo

On 05.10.20 10:23, Timo Walther wrote:
> Hi Austin,
>
> could you share some details of your SQL query with us? The reason why
> I'm asking is because I guess that the rowtime field is not inserted
> into the `StreamRecord` of DataStream API. The rowtime field is only
> inserted if there is a single field in the output of the query that is a
> valid "time attribute".
>
> Esp. after non-time-based joins and aggregations, time attributes loose
> there properties and become regular timestamps. Because timestamp and
> watermarks might have diverged.
>
> If you know what you're doing, you can also assign the timestamp
> manually after `toRetractStream.assignTimestampAndWatermarks` and
> reinsert the field into the stream record. But before you do that, I
> think it is better to share more information about the query with us.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
>
> On 05.10.20 09:25, Till Rohrmann wrote:
>> Hi Austin,
>>
>> thanks for offering to help. First I would suggest asking Timo whether
>> this is an aspect which is still missing or whether we overlooked it.
>> Based on that we can then take the next steps.
>>
>> Cheers,
>> Till
>>
>> On Fri, Oct 2, 2020 at 7:05 PM Austin Cawley-Edwards
>> <[hidden email] <mailto:[hidden email]>> wrote:
>>
>>     Hey Till,
>>
>>     Thanks for the notes. Yeah, the docs don't mention anything specific
>>     to this case, not sure if it's an uncommon one. Assigning timestamps
>>     on conversion does solve the issue. I'm happy to take a stab at
>>     implementing the feature if it is indeed missing and you all think
>>     it'd be worthwhile. I think it's definitely a confusing aspect of
>>     working w/ the Table & DataStream APIs together.
>>
>>     Best,
>>     Austin
>>
>>     On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann <[hidden email]
>>     <mailto:[hidden email]>> wrote:
>>
>>         Hi Austin,
>>
>>         yes, it should also work for ingestion time.
>>
>>         I am not entirely sure whether event time is preserved when
>>         converting a Table into a retract stream. It should be possible
>>         and if it is not working, then I guess it is a missing feature.
>>         But I am sure that @Timo Walther
>>         <mailto:[hidden email]> knows more about it. In doubt, you
>>         could assign a new watermark generator when having obtained the
>>         retract stream.
>>
>>         Here is also a link to some information about event time and
>>         watermarks [1]. Unfortunately, it does not state anything about
>>         the direction Table => DataStream.
>>
>>         [1]
>>         
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
>>
>>
>>         Cheers,
>>         Till
>>
>>         On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards
>>         <[hidden email] <mailto:[hidden email]>> wrote:
>>
>>             Hey Till,
>>
>>             Just a quick question on time characteristics -- this should
>>             work for IngestionTime as well, correct? Is there anything
>>             special I need to do to have the CsvTableSource/
>>             toRetractStream call to carry through the assigned
>>             timestamps, or do I have to re-assign timestamps during the
>>             conversion? I'm currently getting the `Record has
>>             Long.MIN_VALUE timestamp (= no timestamp marker)` error,
>>             though I'm seeing timestamps being assigned if I step
>>             through the AutomaticWatermarkContext.
>>
>>             Thanks,
>>             Austin
>>
>>             On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards
>>             <[hidden email] <mailto:[hidden email]>>
>>             wrote:
>>
>>                 Perfect, thanks so much Till!
>>
>>                 On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann
>>                 <[hidden email] <mailto:[hidden email]>>
>> wrote:
>>
>>                     Hi Austin,
>>
>>                     I believe that the problem is the processing time
>>                     window. Unlike for event time where we send a
>>                     MAX_WATERMARK at the end of the stream to trigger
>>                     all remaining windows, this does not happen for
>>                     processing time windows. Hence, if your stream ends
>>                     and you still have an open processing time window,
>>                     then it will never get triggered.
>>
>>                     The problem should disappear if you use event time
>>                     or if you process unbounded streams which never end.
>>
>>                     Cheers,
>>                     Till
>>
>>                     On Thu, Oct 1, 2020 at 12:01 AM Austin
>>                     Cawley-Edwards <[hidden email]
>>                     <mailto:[hidden email]>> wrote:
>>
>>                         Hey all,
>>
>>                         Thanks for your patience. I've got a small repo
>>                         that reproduces the issue here:
>>                         
>> https://github.com/austince/flink-1.10-sql-windowing-error
>>
>>
>>                         Not sure what I'm doing wrong but it feels silly.
>>
>>                         Thanks so much!
>>                         Austin
>>
>>                         On Tue, Sep 29, 2020 at 3:48 PM Austin
>>                         Cawley-Edwards <[hidden email]
>>                         <mailto:[hidden email]>> wrote:
>>
>>                             Hey Till,
>>
>>                             Thanks for the reply -- I'll try to see if I
>>                             can reproduce this in a small repo and share
>>                             it with you.
>>
>>                             Best,
>>                             Austin
>>
>>                             On Tue, Sep 29, 2020 at 3:58 AM Till
>>                             Rohrmann <[hidden email]
>>                             <mailto:[hidden email]>> wrote:
>>
>>                                 Hi Austin,
>>
>>                                 could you share with us the exact job
>>                                 you are running (including the custom
>>                                 window trigger)? This would help us to
>>                                 better understand your problem.
>>
>>                                 I am also pulling in Klou and Timo who
>>                                 might help with the windowing logic and
>>                                 the Table to DataStream conversion.
>>
>>                                 Cheers,
>>                                 Till
>>
>>                                 On Mon, Sep 28, 2020 at 11:49 PM Austin
>>                                 Cawley-Edwards <[hidden email]
>>                                 <mailto:[hidden email]>> wrote:
>>
>>                                     Hey all,
>>
>>                                     I'm not sure if I've missed
>>                                     something in the docs, but I'm
>>                                     having a bit of trouble with a
>>                                     streaming SQL job that starts w/ raw
>>                                     SQL queries and then transitions to
>>                                     a more traditional streaming job.
>>                                     I'm on Flink 1.10 using the Blink
>>                                     planner, running locally with no
>>                                     checkpointing.
>>
>>                                     The job looks roughly like:
>>
>>                                     CSV 1 -->
>>                                     CSV 2 -->  SQL Query to Join -->
>>                                     toRetractStream --> keyed time
>>                                     window w/ process func & custom
>>                                     trigger --> some other ops
>>                                     CSV 3 -->
>>
>>
>>                                     When I remove the windowing directly
>>                                     after the `toRetractStream`, the
>>                                     records make it to the "some other
>>                                     ops" stage, but with the windowing,
>>                                     those operations are sometimes not
>>                                     sent any data. I can also get data
>>                                     sent to the downstream operators by
>>                                     putting in a no-op map before the
>>                                     window and placing some breakpoints
>>                                     in there to manually slow down
>>                                     processing.
>>
>>
>>                                     The logs don't seem to indicate
>>                                     anything went wrong and generally
>>                                     look like:
>>
>>                                     4819 [Source: Custom File source
>>                                     (1/1)] INFO
>>                                     
>>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File
>> source (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING
>> to FINISHED.\4819 [Source: Custom File source (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources
>> for Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
>>                                     4819 [Source: Custom File source
>>                                     (1/1)] INFO
>>                                     
>>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
>> streams are closed for task Source: Custom File source (1/1)
>> (3578629787c777320d9ab030c004abd4) [FINISHED]
>>                                     4820
>>                                     
>> [flink-akka.actor.default-dispatcher-5]
>>                                     INFO
>>                                     
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering
>> task and sending final execution state FINISHED to JobManager for task
>> Source: Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
>>                                     ...
>>                                     4996
>>                                     
>> [Window(TumblingProcessingTimeWindows(60000),
>>                                     TimedCountTrigger,
>>                                     ProcessWindowFunction$1) (1/1)] INFO
>>                                     
>>  org.apache.flink.runtime.taskmanager.Task  -
>> Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f)
>> switched from RUNNING to FINISHED.
>>                                     4996
>>                                     
>> [Window(TumblingProcessingTimeWindows(60000),
>>                                     TimedCountTrigger,
>>                                     ProcessWindowFunction$1) (1/1)] INFO
>>                                     
>>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources
>> for Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
>>                                     4996
>>                                     
>> [Window(TumblingProcessingTimeWindows(60000),
>>                                     TimedCountTrigger,
>>                                     ProcessWindowFunction$1) (1/1)] INFO
>>                                     
>>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
>> streams are closed for task
>> Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f)
>> [FINISHED]
>>                                     ...
>>                                     rest of the shutdown
>>                                     ...
>>                                     Program execution finished
>>                                     Job with JobID
>>                                     889b161e432c0e69a8d760bbed205d5d has
>>                                     finished.
>>                                     Job Runtime: 783 ms
>>
>>
>>                                     Is there something I'm missing in my
>>                                     setup? Could it be my custom window
>>                                     trigger? Bug? I'm stumped.
>>
>>
>>                                     Thanks,
>>                                     Austin
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL Job Switches to FINISHED before all records processed

Timo Walther
Hi Austin,

if you don't want to worry about time at all, you should probably not
use any windows because those are a time-based operation.

A solution that would look a bit nicer could be to use a pure
KeyedProcessFunction and implement the deduplication logic without
reusing windows. In ProcessFunctions you can register an event-time
timer. The timer would be triggered by the MAX_WATERMARK when the
pipeline shuts down even without having a timestamp assigned in the
StreamRecord. Watermark will leave SQL also without a time attribute as
far as I know.

Regards,
Timo


On 08.10.20 17:38, Austin Cawley-Edwards wrote:

> Hey Timo,
>
> Sorry for the delayed reply. I'm using the Blink planner and using
> non-time-based joins. I've got an example repo here that shows my query/
> setup [1]. It's got the manual timestamp assignment commented out for
> now, but that does indeed solve the issue.
>
> I'd really like to not worry about time at all in this job hah -- I
> started just using processing time, but Till pointed out that processing
> time timers won't be fired when input ends, which is the case for this
> streaming job processing CSV files, so I should be using event time.
> With that suggestion, I switched to ingestion time, where I then
> discovered the issue converting from SQL to data stream.
>
> IMO, as a user manually assigning timestamps on conversion makes sense
> if you're using event time and already handling time attributes
> yourself, but for ingestion time you really don't want to think about
> time at all, which is why it might make sense to propigate the
> automatically assigned timestamps in that case. Though not sure how
> difficult that would be. Let me know what you think!
>
>
> Best + thanks again,
> Austin
>
> [1]: https://github.com/austince/flink-1.10-sql-windowing-error
>
> On Mon, Oct 5, 2020 at 4:24 AM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Btw which planner are you using?
>
>     Regards,
>     Timo
>
>     On 05.10.20 10:23, Timo Walther wrote:
>      > Hi Austin,
>      >
>      > could you share some details of your SQL query with us? The
>     reason why
>      > I'm asking is because I guess that the rowtime field is not inserted
>      > into the `StreamRecord` of DataStream API. The rowtime field is only
>      > inserted if there is a single field in the output of the query
>     that is a
>      > valid "time attribute".
>      >
>      > Esp. after non-time-based joins and aggregations, time attributes
>     loose
>      > there properties and become regular timestamps. Because timestamp
>     and
>      > watermarks might have diverged.
>      >
>      > If you know what you're doing, you can also assign the timestamp
>      > manually after `toRetractStream.assignTimestampAndWatermarks` and
>      > reinsert the field into the stream record. But before you do that, I
>      > think it is better to share more information about the query with us.
>      >
>      > I hope this helps.
>      >
>      > Regards,
>      > Timo
>      >
>      >
>      >
>      > On 05.10.20 09:25, Till Rohrmann wrote:
>      >> Hi Austin,
>      >>
>      >> thanks for offering to help. First I would suggest asking Timo
>     whether
>      >> this is an aspect which is still missing or whether we
>     overlooked it.
>      >> Based on that we can then take the next steps.
>      >>
>      >> Cheers,
>      >> Till
>      >>
>      >> On Fri, Oct 2, 2020 at 7:05 PM Austin Cawley-Edwards
>      >> <[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>>
>     wrote:
>      >>
>      >>     Hey Till,
>      >>
>      >>     Thanks for the notes. Yeah, the docs don't mention anything
>     specific
>      >>     to this case, not sure if it's an uncommon one. Assigning
>     timestamps
>      >>     on conversion does solve the issue. I'm happy to take a stab at
>      >>     implementing the feature if it is indeed missing and you all
>     think
>      >>     it'd be worthwhile. I think it's definitely a confusing
>     aspect of
>      >>     working w/ the Table & DataStream APIs together.
>      >>
>      >>     Best,
>      >>     Austin
>      >>
>      >>     On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann
>     <[hidden email] <mailto:[hidden email]>
>      >>     <mailto:[hidden email] <mailto:[hidden email]>>>
>     wrote:
>      >>
>      >>         Hi Austin,
>      >>
>      >>         yes, it should also work for ingestion time.
>      >>
>      >>         I am not entirely sure whether event time is preserved when
>      >>         converting a Table into a retract stream. It should be
>     possible
>      >>         and if it is not working, then I guess it is a missing
>     feature.
>      >>         But I am sure that @Timo Walther
>      >>         <mailto:[hidden email]
>     <mailto:[hidden email]>> knows more about it. In doubt, you
>      >>         could assign a new watermark generator when having
>     obtained the
>      >>         retract stream.
>      >>
>      >>         Here is also a link to some information about event time and
>      >>         watermarks [1]. Unfortunately, it does not state
>     anything about
>      >>         the direction Table => DataStream.
>      >>
>      >>         [1]
>      >>
>      >>
>     https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
>
>      >>
>      >>
>      >>         Cheers,
>      >>         Till
>      >>
>      >>         On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards
>      >>         <[hidden email]
>     <mailto:[hidden email]> <mailto:[hidden email]
>     <mailto:[hidden email]>>> wrote:
>      >>
>      >>             Hey Till,
>      >>
>      >>             Just a quick question on time characteristics --
>     this should
>      >>             work for IngestionTime as well, correct? Is there
>     anything
>      >>             special I need to do to have the CsvTableSource/
>      >>             toRetractStream call to carry through the assigned
>      >>             timestamps, or do I have to re-assign timestamps
>     during the
>      >>             conversion? I'm currently getting the `Record has
>      >>             Long.MIN_VALUE timestamp (= no timestamp marker)` error,
>      >>             though I'm seeing timestamps being assigned if I step
>      >>             through the AutomaticWatermarkContext.
>      >>
>      >>             Thanks,
>      >>             Austin
>      >>
>      >>             On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards
>      >>             <[hidden email]
>     <mailto:[hidden email]> <mailto:[hidden email]
>     <mailto:[hidden email]>>>
>      >>             wrote:
>      >>
>      >>                 Perfect, thanks so much Till!
>      >>
>      >>                 On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann
>      >>                 <[hidden email]
>     <mailto:[hidden email]> <mailto:[hidden email]
>     <mailto:[hidden email]>>>
>      >> wrote:
>      >>
>      >>                     Hi Austin,
>      >>
>      >>                     I believe that the problem is the processing
>     time
>      >>                     window. Unlike for event time where we send a
>      >>                     MAX_WATERMARK at the end of the stream to
>     trigger
>      >>                     all remaining windows, this does not happen for
>      >>                     processing time windows. Hence, if your
>     stream ends
>      >>                     and you still have an open processing time
>     window,
>      >>                     then it will never get triggered.
>      >>
>      >>                     The problem should disappear if you use
>     event time
>      >>                     or if you process unbounded streams which
>     never end.
>      >>
>      >>                     Cheers,
>      >>                     Till
>      >>
>      >>                     On Thu, Oct 1, 2020 at 12:01 AM Austin
>      >>                     Cawley-Edwards <[hidden email]
>     <mailto:[hidden email]>
>      >>                     <mailto:[hidden email]
>     <mailto:[hidden email]>>> wrote:
>      >>
>      >>                         Hey all,
>      >>
>      >>                         Thanks for your patience. I've got a
>     small repo
>      >>                         that reproduces the issue here:
>      >>
>      >> https://github.com/austince/flink-1.10-sql-windowing-error
>      >>
>      >>
>      >>                         Not sure what I'm doing wrong but it
>     feels silly.
>      >>
>      >>                         Thanks so much!
>      >>                         Austin
>      >>
>      >>                         On Tue, Sep 29, 2020 at 3:48 PM Austin
>      >>                         Cawley-Edwards <[hidden email]
>     <mailto:[hidden email]>
>      >>                         <mailto:[hidden email]
>     <mailto:[hidden email]>>> wrote:
>      >>
>      >>                             Hey Till,
>      >>
>      >>                             Thanks for the reply -- I'll try to
>     see if I
>      >>                             can reproduce this in a small repo
>     and share
>      >>                             it with you.
>      >>
>      >>                             Best,
>      >>                             Austin
>      >>
>      >>                             On Tue, Sep 29, 2020 at 3:58 AM Till
>      >>                             Rohrmann <[hidden email]
>     <mailto:[hidden email]>
>      >>                             <mailto:[hidden email]
>     <mailto:[hidden email]>>> wrote:
>      >>
>      >>                                 Hi Austin,
>      >>
>      >>                                 could you share with us the
>     exact job
>      >>                                 you are running (including the
>     custom
>      >>                                 window trigger)? This would help
>     us to
>      >>                                 better understand your problem.
>      >>
>      >>                                 I am also pulling in Klou and
>     Timo who
>      >>                                 might help with the windowing
>     logic and
>      >>                                 the Table to DataStream conversion.
>      >>
>      >>                                 Cheers,
>      >>                                 Till
>      >>
>      >>                                 On Mon, Sep 28, 2020 at 11:49 PM
>     Austin
>      >>                                 Cawley-Edwards
>     <[hidden email] <mailto:[hidden email]>
>      >>                                 <mailto:[hidden email]
>     <mailto:[hidden email]>>> wrote:
>      >>
>      >>                                     Hey all,
>      >>
>      >>                                     I'm not sure if I've missed
>      >>                                     something in the docs, but I'm
>      >>                                     having a bit of trouble with a
>      >>                                     streaming SQL job that
>     starts w/ raw
>      >>                                     SQL queries and then
>     transitions to
>      >>                                     a more traditional streaming
>     job.
>      >>                                     I'm on Flink 1.10 using the
>     Blink
>      >>                                     planner, running locally with no
>      >>                                     checkpointing.
>      >>
>      >>                                     The job looks roughly like:
>      >>
>      >>                                     CSV 1 -->
>      >>                                     CSV 2 -->  SQL Query to Join -->
>      >>                                     toRetractStream --> keyed time
>      >>                                     window w/ process func & custom
>      >>                                     trigger --> some other ops
>      >>                                     CSV 3 -->
>      >>
>      >>
>      >>                                     When I remove the windowing
>     directly
>      >>                                     after the `toRetractStream`, the
>      >>                                     records make it to the "some
>     other
>      >>                                     ops" stage, but with the
>     windowing,
>      >>                                     those operations are
>     sometimes not
>      >>                                     sent any data. I can also
>     get data
>      >>                                     sent to the downstream
>     operators by
>      >>                                     putting in a no-op map
>     before the
>      >>                                     window and placing some
>     breakpoints
>      >>                                     in there to manually slow down
>      >>                                     processing.
>      >>
>      >>
>      >>                                     The logs don't seem to indicate
>      >>                                     anything went wrong and
>     generally
>      >>                                     look like:
>      >>
>      >>                                     4819 [Source: Custom File source
>      >>                                     (1/1)] INFO
>      >>
>      >>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File
>      >> source (1/1) (3578629787c777320d9ab030c004abd4) switched from
>     RUNNING
>      >> to FINISHED.\4819 [Source: Custom File source (1/1)] INFO
>      >>  org.apache.flink.runtime.taskmanager.Task  - Freeing task
>     resources
>      >> for Source: Custom File source (1/1)
>     (3578629787c777320d9ab030c004abd4).
>      >>                                     4819 [Source: Custom File source
>      >>                                     (1/1)] INFO
>      >>
>      >>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all
>     FileSystem
>      >> streams are closed for task Source: Custom File source (1/1)
>      >> (3578629787c777320d9ab030c004abd4) [FINISHED]
>      >>                                     4820
>      >>
>      >> [flink-akka.actor.default-dispatcher-5]
>      >>                                     INFO
>      >>
>      >>  org.apache.flink.runtime.taskexecutor.TaskExecutor  -
>     Un-registering
>      >> task and sending final execution state FINISHED to JobManager
>     for task
>      >> Source: Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
>      >>                                     ...
>      >>                                     4996
>      >>
>      >> [Window(TumblingProcessingTimeWindows(60000),
>      >>                                     TimedCountTrigger,
>      >>                                     ProcessWindowFunction$1)
>     (1/1)] INFO
>      >>
>      >>  org.apache.flink.runtime.taskmanager.Task  -
>      >> Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
>      >> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f)
>      >> switched from RUNNING to FINISHED.
>      >>                                     4996
>      >>
>      >> [Window(TumblingProcessingTimeWindows(60000),
>      >>                                     TimedCountTrigger,
>      >>                                     ProcessWindowFunction$1)
>     (1/1)] INFO
>      >>
>      >>  org.apache.flink.runtime.taskmanager.Task  - Freeing task
>     resources
>      >> for Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
>      >> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
>      >>                                     4996
>      >>
>      >> [Window(TumblingProcessingTimeWindows(60000),
>      >>                                     TimedCountTrigger,
>      >>                                     ProcessWindowFunction$1)
>     (1/1)] INFO
>      >>
>      >>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all
>     FileSystem
>      >> streams are closed for task
>      >> Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
>      >> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f)
>      >> [FINISHED]
>      >>                                     ...
>      >>                                     rest of the shutdown
>      >>                                     ...
>      >>                                     Program execution finished
>      >>                                     Job with JobID
>      >>                                    
>     889b161e432c0e69a8d760bbed205d5d has
>      >>                                     finished.
>      >>                                     Job Runtime: 783 ms
>      >>
>      >>
>      >>                                     Is there something I'm
>     missing in my
>      >>                                     setup? Could it be my custom
>     window
>      >>                                     trigger? Bug? I'm stumped.
>      >>
>      >>
>      >>                                     Thanks,
>      >>                                     Austin
>      >>
>      >>
>      >>
>      >>
>      >>
>      >>
>      >>
>      >>
>      >>
>      >>
>      >>
>      >>
>      >
>

Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL Job Switches to FINISHED before all records processed

austin.ce
Hey Timo,

Hah, that's a fair point about using time. I guess I should update my statement to "as a user, I don't want to worry about manually managing time".

That's a nice suggestion with the KeyedProcessFunction and no windows, I'll give that a shot. If I don't want to emit any duplicates, I'd have to essentially buffer the "last seen duplicate" for each key in that process function until the MAX_WATERMARK is sent through though, right? I could emit early results if I assume the max number of possible duplicates, but for records with no duplicates, I'd have to wait until no more records are coming -- am I missing something?

Thanks so much,
Austin

On Fri, Oct 9, 2020 at 10:44 AM Timo Walther <[hidden email]> wrote:
Hi Austin,

if you don't want to worry about time at all, you should probably not
use any windows because those are a time-based operation.

A solution that would look a bit nicer could be to use a pure
KeyedProcessFunction and implement the deduplication logic without
reusing windows. In ProcessFunctions you can register an event-time
timer. The timer would be triggered by the MAX_WATERMARK when the
pipeline shuts down even without having a timestamp assigned in the
StreamRecord. Watermark will leave SQL also without a time attribute as
far as I know.

Regards,
Timo


On 08.10.20 17:38, Austin Cawley-Edwards wrote:
> Hey Timo,
>
> Sorry for the delayed reply. I'm using the Blink planner and using
> non-time-based joins. I've got an example repo here that shows my query/
> setup [1]. It's got the manual timestamp assignment commented out for
> now, but that does indeed solve the issue.
>
> I'd really like to not worry about time at all in this job hah -- I
> started just using processing time, but Till pointed out that processing
> time timers won't be fired when input ends, which is the case for this
> streaming job processing CSV files, so I should be using event time.
> With that suggestion, I switched to ingestion time, where I then
> discovered the issue converting from SQL to data stream.
>
> IMO, as a user manually assigning timestamps on conversion makes sense
> if you're using event time and already handling time attributes
> yourself, but for ingestion time you really don't want to think about
> time at all, which is why it might make sense to propigate the
> automatically assigned timestamps in that case. Though not sure how
> difficult that would be. Let me know what you think!
>
>
> Best + thanks again,
> Austin
>
> [1]: https://github.com/austince/flink-1.10-sql-windowing-error
>
> On Mon, Oct 5, 2020 at 4:24 AM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Btw which planner are you using?
>
>     Regards,
>     Timo
>
>     On 05.10.20 10:23, Timo Walther wrote:
>      > Hi Austin,
>      >
>      > could you share some details of your SQL query with us? The
>     reason why
>      > I'm asking is because I guess that the rowtime field is not inserted
>      > into the `StreamRecord` of DataStream API. The rowtime field is only
>      > inserted if there is a single field in the output of the query
>     that is a
>      > valid "time attribute".
>      >
>      > Esp. after non-time-based joins and aggregations, time attributes
>     loose
>      > there properties and become regular timestamps. Because timestamp
>     and
>      > watermarks might have diverged.
>      >
>      > If you know what you're doing, you can also assign the timestamp
>      > manually after `toRetractStream.assignTimestampAndWatermarks` and
>      > reinsert the field into the stream record. But before you do that, I
>      > think it is better to share more information about the query with us.
>      >
>      > I hope this helps.
>      >
>      > Regards,
>      > Timo
>      >
>      >
>      >
>      > On 05.10.20 09:25, Till Rohrmann wrote:
>      >> Hi Austin,
>      >>
>      >> thanks for offering to help. First I would suggest asking Timo
>     whether
>      >> this is an aspect which is still missing or whether we
>     overlooked it.
>      >> Based on that we can then take the next steps.
>      >>
>      >> Cheers,
>      >> Till
>      >>
>      >> On Fri, Oct 2, 2020 at 7:05 PM Austin Cawley-Edwards
>      >> <[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>>
>     wrote:
>      >>
>      >>     Hey Till,
>      >>
>      >>     Thanks for the notes. Yeah, the docs don't mention anything
>     specific
>      >>     to this case, not sure if it's an uncommon one. Assigning
>     timestamps
>      >>     on conversion does solve the issue. I'm happy to take a stab at
>      >>     implementing the feature if it is indeed missing and you all
>     think
>      >>     it'd be worthwhile. I think it's definitely a confusing
>     aspect of
>      >>     working w/ the Table & DataStream APIs together.
>      >>
>      >>     Best,
>      >>     Austin
>      >>
>      >>     On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann
>     <[hidden email] <mailto:[hidden email]>
>      >>     <mailto:[hidden email] <mailto:[hidden email]>>>
>     wrote:
>      >>
>      >>         Hi Austin,
>      >>
>      >>         yes, it should also work for ingestion time.
>      >>
>      >>         I am not entirely sure whether event time is preserved when
>      >>         converting a Table into a retract stream. It should be
>     possible
>      >>         and if it is not working, then I guess it is a missing
>     feature.
>      >>         But I am sure that @Timo Walther
>      >>         <mailto:[hidden email]
>     <mailto:[hidden email]>> knows more about it. In doubt, you
>      >>         could assign a new watermark generator when having
>     obtained the
>      >>         retract stream.
>      >>
>      >>         Here is also a link to some information about event time and
>      >>         watermarks [1]. Unfortunately, it does not state
>     anything about
>      >>         the direction Table => DataStream.
>      >>
>      >>         [1]
>      >>
>      >>
>     https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
>
>      >>
>      >>
>      >>         Cheers,
>      >>         Till
>      >>
>      >>         On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards
>      >>         <[hidden email]
>     <mailto:[hidden email]> <mailto:[hidden email]
>     <mailto:[hidden email]>>> wrote:
>      >>
>      >>             Hey Till,
>      >>
>      >>             Just a quick question on time characteristics --
>     this should
>      >>             work for IngestionTime as well, correct? Is there
>     anything
>      >>             special I need to do to have the CsvTableSource/
>      >>             toRetractStream call to carry through the assigned
>      >>             timestamps, or do I have to re-assign timestamps
>     during the
>      >>             conversion? I'm currently getting the `Record has
>      >>             Long.MIN_VALUE timestamp (= no timestamp marker)` error,
>      >>             though I'm seeing timestamps being assigned if I step
>      >>             through the AutomaticWatermarkContext.
>      >>
>      >>             Thanks,
>      >>             Austin
>      >>
>      >>             On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards
>      >>             <[hidden email]
>     <mailto:[hidden email]> <mailto:[hidden email]
>     <mailto:[hidden email]>>>
>      >>             wrote:
>      >>
>      >>                 Perfect, thanks so much Till!
>      >>
>      >>                 On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann
>      >>                 <[hidden email]
>     <mailto:[hidden email]> <mailto:[hidden email]
>     <mailto:[hidden email]>>>
>      >> wrote:
>      >>
>      >>                     Hi Austin,
>      >>
>      >>                     I believe that the problem is the processing
>     time
>      >>                     window. Unlike for event time where we send a
>      >>                     MAX_WATERMARK at the end of the stream to
>     trigger
>      >>                     all remaining windows, this does not happen for
>      >>                     processing time windows. Hence, if your
>     stream ends
>      >>                     and you still have an open processing time
>     window,
>      >>                     then it will never get triggered.
>      >>
>      >>                     The problem should disappear if you use
>     event time
>      >>                     or if you process unbounded streams which
>     never end.
>      >>
>      >>                     Cheers,
>      >>                     Till
>      >>
>      >>                     On Thu, Oct 1, 2020 at 12:01 AM Austin
>      >>                     Cawley-Edwards <[hidden email]
>     <mailto:[hidden email]>
>      >>                     <mailto:[hidden email]
>     <mailto:[hidden email]>>> wrote:
>      >>
>      >>                         Hey all,
>      >>
>      >>                         Thanks for your patience. I've got a
>     small repo
>      >>                         that reproduces the issue here:
>      >>
>      >> https://github.com/austince/flink-1.10-sql-windowing-error
>      >>
>      >>
>      >>                         Not sure what I'm doing wrong but it
>     feels silly.
>      >>
>      >>                         Thanks so much!
>      >>                         Austin
>      >>
>      >>                         On Tue, Sep 29, 2020 at 3:48 PM Austin
>      >>                         Cawley-Edwards <[hidden email]
>     <mailto:[hidden email]>
>      >>                         <mailto:[hidden email]
>     <mailto:[hidden email]>>> wrote:
>      >>
>      >>                             Hey Till,
>      >>
>      >>                             Thanks for the reply -- I'll try to
>     see if I
>      >>                             can reproduce this in a small repo
>     and share
>      >>                             it with you.
>      >>
>      >>                             Best,
>      >>                             Austin
>      >>
>      >>                             On Tue, Sep 29, 2020 at 3:58 AM Till
>      >>                             Rohrmann <[hidden email]
>     <mailto:[hidden email]>
>      >>                             <mailto:[hidden email]
>     <mailto:[hidden email]>>> wrote:
>      >>
>      >>                                 Hi Austin,
>      >>
>      >>                                 could you share with us the
>     exact job
>      >>                                 you are running (including the
>     custom
>      >>                                 window trigger)? This would help
>     us to
>      >>                                 better understand your problem.
>      >>
>      >>                                 I am also pulling in Klou and
>     Timo who
>      >>                                 might help with the windowing
>     logic and
>      >>                                 the Table to DataStream conversion.
>      >>
>      >>                                 Cheers,
>      >>                                 Till
>      >>
>      >>                                 On Mon, Sep 28, 2020 at 11:49 PM
>     Austin
>      >>                                 Cawley-Edwards
>     <[hidden email] <mailto:[hidden email]>
>      >>                                 <mailto:[hidden email]
>     <mailto:[hidden email]>>> wrote:
>      >>
>      >>                                     Hey all,
>      >>
>      >>                                     I'm not sure if I've missed
>      >>                                     something in the docs, but I'm
>      >>                                     having a bit of trouble with a
>      >>                                     streaming SQL job that
>     starts w/ raw
>      >>                                     SQL queries and then
>     transitions to
>      >>                                     a more traditional streaming
>     job.
>      >>                                     I'm on Flink 1.10 using the
>     Blink
>      >>                                     planner, running locally with no
>      >>                                     checkpointing.
>      >>
>      >>                                     The job looks roughly like:
>      >>
>      >>                                     CSV 1 -->
>      >>                                     CSV 2 -->  SQL Query to Join -->
>      >>                                     toRetractStream --> keyed time
>      >>                                     window w/ process func & custom
>      >>                                     trigger --> some other ops
>      >>                                     CSV 3 -->
>      >>
>      >>
>      >>                                     When I remove the windowing
>     directly
>      >>                                     after the `toRetractStream`, the
>      >>                                     records make it to the "some
>     other
>      >>                                     ops" stage, but with the
>     windowing,
>      >>                                     those operations are
>     sometimes not
>      >>                                     sent any data. I can also
>     get data
>      >>                                     sent to the downstream
>     operators by
>      >>                                     putting in a no-op map
>     before the
>      >>                                     window and placing some
>     breakpoints
>      >>                                     in there to manually slow down
>      >>                                     processing.
>      >>
>      >>
>      >>                                     The logs don't seem to indicate
>      >>                                     anything went wrong and
>     generally
>      >>                                     look like:
>      >>
>      >>                                     4819 [Source: Custom File source
>      >>                                     (1/1)] INFO
>      >>
>      >>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File
>      >> source (1/1) (3578629787c777320d9ab030c004abd4) switched from
>     RUNNING
>      >> to FINISHED.\4819 [Source: Custom File source (1/1)] INFO
>      >>  org.apache.flink.runtime.taskmanager.Task  - Freeing task
>     resources
>      >> for Source: Custom File source (1/1)
>     (3578629787c777320d9ab030c004abd4).
>      >>                                     4819 [Source: Custom File source
>      >>                                     (1/1)] INFO
>      >>
>      >>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all
>     FileSystem
>      >> streams are closed for task Source: Custom File source (1/1)
>      >> (3578629787c777320d9ab030c004abd4) [FINISHED]
>      >>                                     4820
>      >>
>      >> [flink-akka.actor.default-dispatcher-5]
>      >>                                     INFO
>      >>
>      >>  org.apache.flink.runtime.taskexecutor.TaskExecutor  -
>     Un-registering
>      >> task and sending final execution state FINISHED to JobManager
>     for task
>      >> Source: Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
>      >>                                     ...
>      >>                                     4996
>      >>
>      >> [Window(TumblingProcessingTimeWindows(60000),
>      >>                                     TimedCountTrigger,
>      >>                                     ProcessWindowFunction$1)
>     (1/1)] INFO
>      >>
>      >>  org.apache.flink.runtime.taskmanager.Task  -
>      >> Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
>      >> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f)
>      >> switched from RUNNING to FINISHED.
>      >>                                     4996
>      >>
>      >> [Window(TumblingProcessingTimeWindows(60000),
>      >>                                     TimedCountTrigger,
>      >>                                     ProcessWindowFunction$1)
>     (1/1)] INFO
>      >>
>      >>  org.apache.flink.runtime.taskmanager.Task  - Freeing task
>     resources
>      >> for Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
>      >> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
>      >>                                     4996
>      >>
>      >> [Window(TumblingProcessingTimeWindows(60000),
>      >>                                     TimedCountTrigger,
>      >>                                     ProcessWindowFunction$1)
>     (1/1)] INFO
>      >>
>      >>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all
>     FileSystem
>      >> streams are closed for task
>      >> Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
>      >> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f)
>      >> [FINISHED]
>      >>                                     ...
>      >>                                     rest of the shutdown
>      >>                                     ...
>      >>                                     Program execution finished
>      >>                                     Job with JobID
>      >>                                   
>     889b161e432c0e69a8d760bbed205d5d has
>      >>                                     finished.
>      >>                                     Job Runtime: 783 ms
>      >>
>      >>
>      >>                                     Is there something I'm
>     missing in my
>      >>                                     setup? Could it be my custom
>     window
>      >>                                     trigger? Bug? I'm stumped.
>      >>
>      >>
>      >>                                     Thanks,
>      >>                                     Austin
>      >>
>      >>
>      >>
>      >>
>      >>
>      >>
>      >>
>      >>
>      >>
>      >>
>      >>
>      >>
>      >
>

Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL Job Switches to FINISHED before all records processed

Timo Walther
Hi Austin,

your explanation for the KeyedProcessFunction implementation sounds good
to me. Using the time and state primitives for this task will make the
implementation more explicit but also more readable.

Let me know if you could solve your use case.

Regards,
Timo


On 09.10.20 17:27, Austin Cawley-Edwards wrote:

> Hey Timo,
>
> Hah, that's a fair point about using time. I guess I should update my
> statement to "as a user, I don't want to worry about /manually managing/
> time".
>
> That's a nice suggestion with the KeyedProcessFunction and no windows,
> I'll give that a shot. If I don't want to emit any duplicates, I'd have
> to essentially buffer the "last seen duplicate" for each key in that
> process function until the MAX_WATERMARK is sent through though, right?
> I could emit early results if I assume the max number of possible
> duplicates, but for records with no duplicates, I'd have to wait until
> no more records are coming -- am I missing something?
>
> Thanks so much,
> Austin
>
> On Fri, Oct 9, 2020 at 10:44 AM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Austin,
>
>     if you don't want to worry about time at all, you should probably not
>     use any windows because those are a time-based operation.
>
>     A solution that would look a bit nicer could be to use a pure
>     KeyedProcessFunction and implement the deduplication logic without
>     reusing windows. In ProcessFunctions you can register an event-time
>     timer. The timer would be triggered by the MAX_WATERMARK when the
>     pipeline shuts down even without having a timestamp assigned in the
>     StreamRecord. Watermark will leave SQL also without a time attribute as
>     far as I know.
>
>     Regards,
>     Timo
>
>
>     On 08.10.20 17:38, Austin Cawley-Edwards wrote:
>      > Hey Timo,
>      >
>      > Sorry for the delayed reply. I'm using the Blink planner and using
>      > non-time-based joins. I've got an example repo here that shows my
>     query/
>      > setup [1]. It's got the manual timestamp assignment commented out
>     for
>      > now, but that does indeed solve the issue.
>      >
>      > I'd really like to not worry about time at all in this job hah -- I
>      > started just using processing time, but Till pointed out that
>     processing
>      > time timers won't be fired when input ends, which is the case for
>     this
>      > streaming job processing CSV files, so I should be using event time.
>      > With that suggestion, I switched to ingestion time, where I then
>      > discovered the issue converting from SQL to data stream.
>      >
>      > IMO, as a user manually assigning timestamps on conversion makes
>     sense
>      > if you're using event time and already handling time attributes
>      > yourself, but for ingestion time you really don't want to think
>     about
>      > time at all, which is why it might make sense to propigate the
>      > automatically assigned timestamps in that case. Though not sure how
>      > difficult that would be. Let me know what you think!
>      >
>      >
>      > Best + thanks again,
>      > Austin
>      >
>      > [1]: https://github.com/austince/flink-1.10-sql-windowing-error
>      >
>      > On Mon, Oct 5, 2020 at 4:24 AM Timo Walther <[hidden email]
>     <mailto:[hidden email]>
>      > <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>      >
>      >     Btw which planner are you using?
>      >
>      >     Regards,
>      >     Timo
>      >
>      >     On 05.10.20 10:23, Timo Walther wrote:
>      >      > Hi Austin,
>      >      >
>      >      > could you share some details of your SQL query with us? The
>      >     reason why
>      >      > I'm asking is because I guess that the rowtime field is
>     not inserted
>      >      > into the `StreamRecord` of DataStream API. The rowtime
>     field is only
>      >      > inserted if there is a single field in the output of the query
>      >     that is a
>      >      > valid "time attribute".
>      >      >
>      >      > Esp. after non-time-based joins and aggregations, time
>     attributes
>      >     loose
>      >      > there properties and become regular timestamps. Because
>     timestamp
>      >     and
>      >      > watermarks might have diverged.
>      >      >
>      >      > If you know what you're doing, you can also assign the
>     timestamp
>      >      > manually after
>     `toRetractStream.assignTimestampAndWatermarks` and
>      >      > reinsert the field into the stream record. But before you
>     do that, I
>      >      > think it is better to share more information about the
>     query with us.
>      >      >
>      >      > I hope this helps.
>      >      >
>      >      > Regards,
>      >      > Timo
>      >      >
>      >      >
>      >      >
>      >      > On 05.10.20 09:25, Till Rohrmann wrote:
>      >      >> Hi Austin,
>      >      >>
>      >      >> thanks for offering to help. First I would suggest asking
>     Timo
>      >     whether
>      >      >> this is an aspect which is still missing or whether we
>      >     overlooked it.
>      >      >> Based on that we can then take the next steps.
>      >      >>
>      >      >> Cheers,
>      >      >> Till
>      >      >>
>      >      >> On Fri, Oct 2, 2020 at 7:05 PM Austin Cawley-Edwards
>      >      >> <[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>
>      >     <mailto:[hidden email]
>     <mailto:[hidden email]> <mailto:[hidden email]
>     <mailto:[hidden email]>>>>
>      >     wrote:
>      >      >>
>      >      >>     Hey Till,
>      >      >>
>      >      >>     Thanks for the notes. Yeah, the docs don't mention
>     anything
>      >     specific
>      >      >>     to this case, not sure if it's an uncommon one. Assigning
>      >     timestamps
>      >      >>     on conversion does solve the issue. I'm happy to take
>     a stab at
>      >      >>     implementing the feature if it is indeed missing and
>     you all
>      >     think
>      >      >>     it'd be worthwhile. I think it's definitely a confusing
>      >     aspect of
>      >      >>     working w/ the Table & DataStream APIs together.
>      >      >>
>      >      >>     Best,
>      >      >>     Austin
>      >      >>
>      >      >>     On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann
>      >     <[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>
>      >      >>     <mailto:[hidden email]
>     <mailto:[hidden email]> <mailto:[hidden email]
>     <mailto:[hidden email]>>>>
>      >     wrote:
>      >      >>
>      >      >>         Hi Austin,
>      >      >>
>      >      >>         yes, it should also work for ingestion time.
>      >      >>
>      >      >>         I am not entirely sure whether event time is
>     preserved when
>      >      >>         converting a Table into a retract stream. It
>     should be
>      >     possible
>      >      >>         and if it is not working, then I guess it is a
>     missing
>      >     feature.
>      >      >>         But I am sure that @Timo Walther
>      >      >>         <mailto:[hidden email]
>     <mailto:[hidden email]>
>      >     <mailto:[hidden email]
>     <mailto:[hidden email]>>> knows more about it. In doubt, you
>      >      >>         could assign a new watermark generator when having
>      >     obtained the
>      >      >>         retract stream.
>      >      >>
>      >      >>         Here is also a link to some information about
>     event time and
>      >      >>         watermarks [1]. Unfortunately, it does not state
>      >     anything about
>      >      >>         the direction Table => DataStream.
>      >      >>
>      >      >>         [1]
>      >      >>
>      >      >>
>      >
>     https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
>      >
>      >      >>
>      >      >>
>      >      >>         Cheers,
>      >      >>         Till
>      >      >>
>      >      >>         On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards
>      >      >>         <[hidden email]
>     <mailto:[hidden email]>
>      >     <mailto:[hidden email]
>     <mailto:[hidden email]>> <mailto:[hidden email]
>     <mailto:[hidden email]>
>      >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>> wrote:
>      >      >>
>      >      >>             Hey Till,
>      >      >>
>      >      >>             Just a quick question on time characteristics --
>      >     this should
>      >      >>             work for IngestionTime as well, correct? Is there
>      >     anything
>      >      >>             special I need to do to have the CsvTableSource/
>      >      >>             toRetractStream call to carry through the
>     assigned
>      >      >>             timestamps, or do I have to re-assign timestamps
>      >     during the
>      >      >>             conversion? I'm currently getting the `Record has
>      >      >>             Long.MIN_VALUE timestamp (= no timestamp
>     marker)` error,
>      >      >>             though I'm seeing timestamps being assigned
>     if I step
>      >      >>             through the AutomaticWatermarkContext.
>      >      >>
>      >      >>             Thanks,
>      >      >>             Austin
>      >      >>
>      >      >>             On Thu, Oct 1, 2020 at 10:52 AM Austin
>     Cawley-Edwards
>      >      >>             <[hidden email]
>     <mailto:[hidden email]>
>      >     <mailto:[hidden email]
>     <mailto:[hidden email]>> <mailto:[hidden email]
>     <mailto:[hidden email]>
>      >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>>
>      >      >>             wrote:
>      >      >>
>      >      >>                 Perfect, thanks so much Till!
>      >      >>
>      >      >>                 On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann
>      >      >>                 <[hidden email]
>     <mailto:[hidden email]>
>      >     <mailto:[hidden email] <mailto:[hidden email]>>
>     <mailto:[hidden email] <mailto:[hidden email]>
>      >     <mailto:[hidden email] <mailto:[hidden email]>>>>
>      >      >> wrote:
>      >      >>
>      >      >>                     Hi Austin,
>      >      >>
>      >      >>                     I believe that the problem is the
>     processing
>      >     time
>      >      >>                     window. Unlike for event time where
>     we send a
>      >      >>                     MAX_WATERMARK at the end of the stream to
>      >     trigger
>      >      >>                     all remaining windows, this does not
>     happen for
>      >      >>                     processing time windows. Hence, if your
>      >     stream ends
>      >      >>                     and you still have an open processing
>     time
>      >     window,
>      >      >>                     then it will never get triggered.
>      >      >>
>      >      >>                     The problem should disappear if you use
>      >     event time
>      >      >>                     or if you process unbounded streams which
>      >     never end.
>      >      >>
>      >      >>                     Cheers,
>      >      >>                     Till
>      >      >>
>      >      >>                     On Thu, Oct 1, 2020 at 12:01 AM Austin
>      >      >>                     Cawley-Edwards
>     <[hidden email] <mailto:[hidden email]>
>      >     <mailto:[hidden email] <mailto:[hidden email]>>
>      >      >>                     <mailto:[hidden email]
>     <mailto:[hidden email]>
>      >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>> wrote:
>      >      >>
>      >      >>                         Hey all,
>      >      >>
>      >      >>                         Thanks for your patience. I've got a
>      >     small repo
>      >      >>                         that reproduces the issue here:
>      >      >>
>      >      >> https://github.com/austince/flink-1.10-sql-windowing-error
>      >      >>
>      >      >>
>      >      >>                         Not sure what I'm doing wrong but it
>      >     feels silly.
>      >      >>
>      >      >>                         Thanks so much!
>      >      >>                         Austin
>      >      >>
>      >      >>                         On Tue, Sep 29, 2020 at 3:48 PM
>     Austin
>      >      >>                         Cawley-Edwards
>     <[hidden email] <mailto:[hidden email]>
>      >     <mailto:[hidden email] <mailto:[hidden email]>>
>      >      >>                         <mailto:[hidden email]
>     <mailto:[hidden email]>
>      >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>> wrote:
>      >      >>
>      >      >>                             Hey Till,
>      >      >>
>      >      >>                             Thanks for the reply -- I'll
>     try to
>      >     see if I
>      >      >>                             can reproduce this in a small
>     repo
>      >     and share
>      >      >>                             it with you.
>      >      >>
>      >      >>                             Best,
>      >      >>                             Austin
>      >      >>
>      >      >>                             On Tue, Sep 29, 2020 at 3:58
>     AM Till
>      >      >>                             Rohrmann
>     <[hidden email] <mailto:[hidden email]>
>      >     <mailto:[hidden email] <mailto:[hidden email]>>
>      >      >>                             <mailto:[hidden email]
>     <mailto:[hidden email]>
>      >     <mailto:[hidden email] <mailto:[hidden email]>>>>
>     wrote:
>      >      >>
>      >      >>                                 Hi Austin,
>      >      >>
>      >      >>                                 could you share with us the
>      >     exact job
>      >      >>                                 you are running
>     (including the
>      >     custom
>      >      >>                                 window trigger)? This
>     would help
>      >     us to
>      >      >>                                 better understand your
>     problem.
>      >      >>
>      >      >>                                 I am also pulling in Klou and
>      >     Timo who
>      >      >>                                 might help with the windowing
>      >     logic and
>      >      >>                                 the Table to DataStream
>     conversion.
>      >      >>
>      >      >>                                 Cheers,
>      >      >>                                 Till
>      >      >>
>      >      >>                                 On Mon, Sep 28, 2020 at
>     11:49 PM
>      >     Austin
>      >      >>                                 Cawley-Edwards
>      >     <[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>
>      >      >>                                
>     <mailto:[hidden email] <mailto:[hidden email]>
>      >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>> wrote:
>      >      >>
>      >      >>                                     Hey all,
>      >      >>
>      >      >>                                     I'm not sure if I've
>     missed
>      >      >>                                     something in the
>     docs, but I'm
>      >      >>                                     having a bit of
>     trouble with a
>      >      >>                                     streaming SQL job that
>      >     starts w/ raw
>      >      >>                                     SQL queries and then
>      >     transitions to
>      >      >>                                     a more traditional
>     streaming
>      >     job.
>      >      >>                                     I'm on Flink 1.10
>     using the
>      >     Blink
>      >      >>                                     planner, running
>     locally with no
>      >      >>                                     checkpointing.
>      >      >>
>      >      >>                                     The job looks roughly
>     like:
>      >      >>
>      >      >>                                     CSV 1 -->
>      >      >>                                     CSV 2 -->  SQL Query
>     to Join -->
>      >      >>                                     toRetractStream -->
>     keyed time
>      >      >>                                     window w/ process
>     func & custom
>      >      >>                                     trigger --> some
>     other ops
>      >      >>                                     CSV 3 -->
>      >      >>
>      >      >>
>      >      >>                                     When I remove the
>     windowing
>      >     directly
>      >      >>                                     after the
>     `toRetractStream`, the
>      >      >>                                     records make it to
>     the "some
>      >     other
>      >      >>                                     ops" stage, but with the
>      >     windowing,
>      >      >>                                     those operations are
>      >     sometimes not
>      >      >>                                     sent any data. I can also
>      >     get data
>      >      >>                                     sent to the downstream
>      >     operators by
>      >      >>                                     putting in a no-op map
>      >     before the
>      >      >>                                     window and placing some
>      >     breakpoints
>      >      >>                                     in there to manually
>     slow down
>      >      >>                                     processing.
>      >      >>
>      >      >>
>      >      >>                                     The logs don't seem
>     to indicate
>      >      >>                                     anything went wrong and
>      >     generally
>      >      >>                                     look like:
>      >      >>
>      >      >>                                     4819 [Source: Custom
>     File source
>      >      >>                                     (1/1)] INFO
>      >      >>
>      >      >>  org.apache.flink.runtime.taskmanager.Task  - Source:
>     Custom File
>      >      >> source (1/1) (3578629787c777320d9ab030c004abd4) switched from
>      >     RUNNING
>      >      >> to FINISHED.\4819 [Source: Custom File source (1/1)] INFO
>      >      >>  org.apache.flink.runtime.taskmanager.Task  - Freeing task
>      >     resources
>      >      >> for Source: Custom File source (1/1)
>      >     (3578629787c777320d9ab030c004abd4).
>      >      >>                                     4819 [Source: Custom
>     File source
>      >      >>                                     (1/1)] INFO
>      >      >>
>      >      >>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all
>      >     FileSystem
>      >      >> streams are closed for task Source: Custom File source (1/1)
>      >      >> (3578629787c777320d9ab030c004abd4) [FINISHED]
>      >      >>                                     4820
>      >      >>
>      >      >> [flink-akka.actor.default-dispatcher-5]
>      >      >>                                     INFO
>      >      >>
>      >      >>  org.apache.flink.runtime.taskexecutor.TaskExecutor  -
>      >     Un-registering
>      >      >> task and sending final execution state FINISHED to JobManager
>      >     for task
>      >      >> Source: Custom File source (1/1)
>     3578629787c777320d9ab030c004abd4.
>      >      >>                                     ...
>      >      >>                                     4996
>      >      >>
>      >      >> [Window(TumblingProcessingTimeWindows(60000),
>      >      >>                                     TimedCountTrigger,
>      >      >>                                     ProcessWindowFunction$1)
>      >     (1/1)] INFO
>      >      >>
>      >      >>  org.apache.flink.runtime.taskmanager.Task  -
>      >      >> Window(TumblingProcessingTimeWindows(60000),
>     TimedCountTrigger,
>      >      >> ProcessWindowFunction$1) (1/1)
>     (907acf9bfa2f4a9bbd13997b8b34d91f)
>      >      >> switched from RUNNING to FINISHED.
>      >      >>                                     4996
>      >      >>
>      >      >> [Window(TumblingProcessingTimeWindows(60000),
>      >      >>                                     TimedCountTrigger,
>      >      >>                                     ProcessWindowFunction$1)
>      >     (1/1)] INFO
>      >      >>
>      >      >>  org.apache.flink.runtime.taskmanager.Task  - Freeing task
>      >     resources
>      >      >> for Window(TumblingProcessingTimeWindows(60000),
>     TimedCountTrigger,
>      >      >> ProcessWindowFunction$1) (1/1)
>     (907acf9bfa2f4a9bbd13997b8b34d91f).
>      >      >>                                     4996
>      >      >>
>      >      >> [Window(TumblingProcessingTimeWindows(60000),
>      >      >>                                     TimedCountTrigger,
>      >      >>                                     ProcessWindowFunction$1)
>      >     (1/1)] INFO
>      >      >>
>      >      >>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all
>      >     FileSystem
>      >      >> streams are closed for task
>      >      >> Window(TumblingProcessingTimeWindows(60000),
>     TimedCountTrigger,
>      >      >> ProcessWindowFunction$1) (1/1)
>     (907acf9bfa2f4a9bbd13997b8b34d91f)
>      >      >> [FINISHED]
>      >      >>                                     ...
>      >      >>                                     rest of the shutdown
>      >      >>                                     ...
>      >      >>                                     Program execution
>     finished
>      >      >>                                     Job with JobID
>      >      >>
>      >     889b161e432c0e69a8d760bbed205d5d has
>      >      >>                                     finished.
>      >      >>                                     Job Runtime: 783 ms
>      >      >>
>      >      >>
>      >      >>                                     Is there something I'm
>      >     missing in my
>      >      >>                                     setup? Could it be my
>     custom
>      >     window
>      >      >>                                     trigger? Bug? I'm
>     stumped.
>      >      >>
>      >      >>
>      >      >>                                     Thanks,
>      >      >>                                     Austin
>      >      >>
>      >      >>
>      >      >>
>      >      >>
>      >      >>
>      >      >>
>      >      >>
>      >      >>
>      >      >>
>      >      >>
>      >      >>
>      >      >>
>      >      >
>      >
>