Sliding window buffering on restart without save point

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Sliding window buffering on restart without save point

shater93
Hello,

I am having a Flink pipeline processing data in several overlapping(sliding)
windows such that they span [t_i, t_i + T], where t_i is the window starting
time and T is the window size. The overlap is such that t_(I+1) - t_i = T/6
(i.e on every window size there is 6 overlapping windows).

When deploying in my CI/CD process to Kubernetes, there are sometimes
serialisation problems due to change of Flink DAG, checkpoint states and etc
as a symptom of, for instance a change of definition of these classes
(adding/removing a field). This leads to the fact that the process cannot
start from the save point that I am saving during a deploy. How could this
be managed in an efficient way? I understand that the way I am using
windowing is not optimal here so lets not focus on those solutions.

Currently, my only approach is:
* Shutdown the streaming process in a controlled manner (replying the
running version with new configs, terminating the stream when events are
arriving after a certain timepjoint)
* After termination, move the time-point ( offset, I am using Kafka)
backwards in time, in this case T + eps to allow rebuffering of the windows.
* Start the servicer reading from the new timepjoint, but not emitting any
output events until it has passed a defined time-point (in this case the
time-point of termination).

Do you have any suggestions on how to improve this process?

Best regards and thanks in advance for any input,
William
 

 Flink Version: 1.6.2



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Sliding window buffering on restart without save point

Konstantin Knauf-2
Hi William, 

first of all, I would like to give you two pointers regarding state migration: 

* If you set UUIDs for all operators you can change the topology of your job without breaking the compatibility with the savepoint [1]. State will be matched to the operator with the same UUID.
* In Flink 1.7.x (In noticed, you run 1.6.2, but still ;)) the community has improved the state migration capabilities of Flink quite a lot [2]. In particular, state migration will work out-of-the-box with Avro Types as long as schema changes are backward compatible in the Avro-sense, i.e. adding a field with a default value would not be a problem anymore.  In case of a window, the state type is determined by either the type of your events (no pre-aggregation) or your aggregation type (with pre-aggregation). 

If you follow these suggestions, you might be able to avoid a good amount of the savepoint incompatibilities. The strategy for the remaining cases depends, of course. Do you use an idempotent sink, i.e. is it ok if the job emits the same (correct) result twice? More general, what happens with the results of the aggregations?

Depending on your requirements https://issues.apache.org/jira/browse/FLINK-11458 might also be able to help you in the future.

Cheers, 

Konstantin

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/production_ready.html#set-uuids-for-operators
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/schema_evolution.html
On Fri, Feb 8, 2019 at 9:52 AM shater93 <[hidden email]> wrote:
Hello,

I am having a Flink pipeline processing data in several overlapping(sliding)
windows such that they span [t_i, t_i + T], where t_i is the window starting
time and T is the window size. The overlap is such that t_(I+1) - t_i = T/6
(i.e on every window size there is 6 overlapping windows).

When deploying in my CI/CD process to Kubernetes, there are sometimes
serialisation problems due to change of Flink DAG, checkpoint states and etc
as a symptom of, for instance a change of definition of these classes
(adding/removing a field). This leads to the fact that the process cannot
start from the save point that I am saving during a deploy. How could this
be managed in an efficient way? I understand that the way I am using
windowing is not optimal here so lets not focus on those solutions.

Currently, my only approach is:
* Shutdown the streaming process in a controlled manner (replying the
running version with new configs, terminating the stream when events are
arriving after a certain timepjoint)
* After termination, move the time-point ( offset, I am using Kafka)
backwards in time, in this case T + eps to allow rebuffering of the windows.
* Start the servicer reading from the new timepjoint, but not emitting any
output events until it has passed a defined time-point (in this case the
time-point of termination).

Do you have any suggestions on how to improve this process?

Best regards and thanks in advance for any input,
William


 Flink Version: 1.6.2



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen