Watermarks as "process completion" flags

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Watermarks as "process completion" flags

Anton Polyakov
Hi

I am very new to Flink and in fact never used it. My task (which I currently solve using home grown Redis-based solution) is quite simple - I have a system which produces some events (trades, it is a financial system) and computational chain which computes some measure accumulatively over these events. Those events form a long but finite stream, they are produced as a result of end of day flow. Computational logic forms a processing DAG which computes some measure over these events (VaR). Each trade is processed through DAG and at different stages might produce different set of subsequent events (like return vectors), eventually they all arrive into some aggregator which computes accumulated measure (reducer).

Ideally I would like to process trades as they appear (i.e. stream them) and once producer reaches end of portfolio (there will be no more trades), I need to write final resulting measure and mark it as “end of day record”. Of course I also could use a classical batch - i.e. wait until all trades are produced and then batch process them, but this will be too inefficient.

If I use Flink, I will need a sort of watermark saying - “done, no more trades” and once this watermark reaches end of DAG, final measure can be saved. More generally would be cool to have an indication at the end of DAG telling to which input stream position current measure corresponds.

I feel my problem is very typical yet I can’t find any solution. All examples operate either on infinite streams where nobody cares about completion or classical batch examples which rely on fact all input data is ready.

Can you please hint me.

Thank you vm
Anton
Reply | Threaded
Open this post in threaded view
|

Re: Watermarks as "process completion" flags

Maximilian Michels
Hi Anton,

You should be able to model your problem using the Flink Streaming
API. The actions you want to perform on the streamed records
correspond to transformations on Windows. You can indeed use
Watermarks to signal the window that a threshold for an action has
been reached. Otherwise an eviction policy should also do it.

Without more details about what you want to do I can only refer you to
the streaming API documentation:
Please see https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html

Thanks,
Max

On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov
<[hidden email]> wrote:

> Hi
>
> I am very new to Flink and in fact never used it. My task (which I currently solve using home grown Redis-based solution) is quite simple - I have a system which produces some events (trades, it is a financial system) and computational chain which computes some measure accumulatively over these events. Those events form a long but finite stream, they are produced as a result of end of day flow. Computational logic forms a processing DAG which computes some measure over these events (VaR). Each trade is processed through DAG and at different stages might produce different set of subsequent events (like return vectors), eventually they all arrive into some aggregator which computes accumulated measure (reducer).
>
> Ideally I would like to process trades as they appear (i.e. stream them) and once producer reaches end of portfolio (there will be no more trades), I need to write final resulting measure and mark it as “end of day record”. Of course I also could use a classical batch - i.e. wait until all trades are produced and then batch process them, but this will be too inefficient.
>
> If I use Flink, I will need a sort of watermark saying - “done, no more trades” and once this watermark reaches end of DAG, final measure can be saved. More generally would be cool to have an indication at the end of DAG telling to which input stream position current measure corresponds.
>
> I feel my problem is very typical yet I can’t find any solution. All examples operate either on infinite streams where nobody cares about completion or classical batch examples which rely on fact all input data is ready.
>
> Can you please hint me.
>
> Thank you vm
> Anton
Reply | Threaded
Open this post in threaded view
|

Re: Watermarks as "process completion" flags

Anton Polyakov
Hi Max

thanks for reply. From what I understand window works in a way that it buffers records while window is open, then apply transformation once window close is triggered and pass transformed result.
In my case then window will be open for few hours, then the whole amount of trades will be processed once window close is triggered. Actually I want to process events as they are produced without buffering them. It is more like a stream with some special mark versus windowing seems more like a batch (if I understand it correctly).

In other words - buffering and waiting for window to close, then processing will be equal to simply doing one-off processing when all events are produced. I am looking for a solution when I am processing events as they are produced and when source signals "done" my processing is also nearly done.


On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels <[hidden email]> wrote:
Hi Anton,

You should be able to model your problem using the Flink Streaming
API. The actions you want to perform on the streamed records
correspond to transformations on Windows. You can indeed use
Watermarks to signal the window that a threshold for an action has
been reached. Otherwise an eviction policy should also do it.

Without more details about what you want to do I can only refer you to
the streaming API documentation:
Please see https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html

Thanks,
Max

On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov
<[hidden email]> wrote:
> Hi
>
> I am very new to Flink and in fact never used it. My task (which I currently solve using home grown Redis-based solution) is quite simple - I have a system which produces some events (trades, it is a financial system) and computational chain which computes some measure accumulatively over these events. Those events form a long but finite stream, they are produced as a result of end of day flow. Computational logic forms a processing DAG which computes some measure over these events (VaR). Each trade is processed through DAG and at different stages might produce different set of subsequent events (like return vectors), eventually they all arrive into some aggregator which computes accumulated measure (reducer).
>
> Ideally I would like to process trades as they appear (i.e. stream them) and once producer reaches end of portfolio (there will be no more trades), I need to write final resulting measure and mark it as “end of day record”. Of course I also could use a classical batch - i.e. wait until all trades are produced and then batch process them, but this will be too inefficient.
>
> If I use Flink, I will need a sort of watermark saying - “done, no more trades” and once this watermark reaches end of DAG, final measure can be saved. More generally would be cool to have an indication at the end of DAG telling to which input stream position current measure corresponds.
>
> I feel my problem is very typical yet I can’t find any solution. All examples operate either on infinite streams where nobody cares about completion or classical batch examples which rely on fact all input data is ready.
>
> Can you please hint me.
>
> Thank you vm
> Anton

Reply | Threaded
Open this post in threaded view
|

Re: Watermarks as "process completion" flags

Fabian Hueske-2
Hi Anton,

If I got your requirements right, you are looking for a solution that continuously produces updated partial aggregates in a streaming fashion. When a  special event (no more trades) is received, you would like to store the last update as a final result. Is that correct?

You can compute continuous updates using a reduce() or fold() function. These will produce a new update for each incoming event.
For example:

val s: DataStream[(Int, Long)] = ...
s.keyBy(_._1)
  .reduce( (x,y) => (x._1, y._2 + y._2) )

would continuously compute a sum for every key (_._1) and produce an update for each incoming record.

You could add a flag to the record and implement a ReduceFunction that marks a record as final when the no-more-trades event is received.
With a filter and a data sink you could emit such final records to a persistent data store.

Btw.: You can also define custom trigger policies for windows. A custom trigger is called for each element that is added to a window and when certain timers expire. For example with a custom trigger, you can evaluate a window for every second element that is added. You can also define whether the elements in the window should be retained or removed after the evaluation.

Best, Fabian



2015-11-24 21:32 GMT+01:00 Anton Polyakov <[hidden email]>:
Hi Max

thanks for reply. From what I understand window works in a way that it buffers records while window is open, then apply transformation once window close is triggered and pass transformed result.
In my case then window will be open for few hours, then the whole amount of trades will be processed once window close is triggered. Actually I want to process events as they are produced without buffering them. It is more like a stream with some special mark versus windowing seems more like a batch (if I understand it correctly).

In other words - buffering and waiting for window to close, then processing will be equal to simply doing one-off processing when all events are produced. I am looking for a solution when I am processing events as they are produced and when source signals "done" my processing is also nearly done.


On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels <[hidden email]> wrote:
Hi Anton,

You should be able to model your problem using the Flink Streaming
API. The actions you want to perform on the streamed records
correspond to transformations on Windows. You can indeed use
Watermarks to signal the window that a threshold for an action has
been reached. Otherwise an eviction policy should also do it.

Without more details about what you want to do I can only refer you to
the streaming API documentation:
Please see https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html

Thanks,
Max

On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov
<[hidden email]> wrote:
> Hi
>
> I am very new to Flink and in fact never used it. My task (which I currently solve using home grown Redis-based solution) is quite simple - I have a system which produces some events (trades, it is a financial system) and computational chain which computes some measure accumulatively over these events. Those events form a long but finite stream, they are produced as a result of end of day flow. Computational logic forms a processing DAG which computes some measure over these events (VaR). Each trade is processed through DAG and at different stages might produce different set of subsequent events (like return vectors), eventually they all arrive into some aggregator which computes accumulated measure (reducer).
>
> Ideally I would like to process trades as they appear (i.e. stream them) and once producer reaches end of portfolio (there will be no more trades), I need to write final resulting measure and mark it as “end of day record”. Of course I also could use a classical batch - i.e. wait until all trades are produced and then batch process them, but this will be too inefficient.
>
> If I use Flink, I will need a sort of watermark saying - “done, no more trades” and once this watermark reaches end of DAG, final measure can be saved. More generally would be cool to have an indication at the end of DAG telling to which input stream position current measure corresponds.
>
> I feel my problem is very typical yet I can’t find any solution. All examples operate either on infinite streams where nobody cares about completion or classical batch examples which rely on fact all input data is ready.
>
> Can you please hint me.
>
> Thank you vm
> Anton


Reply | Threaded
Open this post in threaded view
|

Re: Watermarks as "process completion" flags

Anton Polyakov
Hi Fabian

Defining a special flag for record seems like a checkpoint barrier. I think I will end up re-implementing checkpointing myself. I found the discussion in flink-dev: mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/… which seems to solve my task. Essentially they want to have a mechanism which will mark record produced by job as “last” and then wait until it’s fully propagated through DAG. Similarly to what I need. Essentially my job which produces trades can also thought as being finished once it produced all trades, then I just need to wait till latest trade produced by this job is processed.

So although windows can probably also be applied, I think propagating barrier through DAG and checkpointing at final job is what I need.

Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like triggering a custom checkoint or finishing streaming job)? 

On 24 Nov 2015, at 21:53, Fabian Hueske <[hidden email]> wrote:

Hi Anton,

If I got your requirements right, you are looking for a solution that continuously produces updated partial aggregates in a streaming fashion. When a  special event (no more trades) is received, you would like to store the last update as a final result. Is that correct?

You can compute continuous updates using a reduce() or fold() function. These will produce a new update for each incoming event.
For example:

val s: DataStream[(Int, Long)] = ...
s.keyBy(_._1)
  .reduce( (x,y) => (x._1, y._2 + y._2) )

would continuously compute a sum for every key (_._1) and produce an update for each incoming record.

You could add a flag to the record and implement a ReduceFunction that marks a record as final when the no-more-trades event is received.
With a filter and a data sink you could emit such final records to a persistent data store.

Btw.: You can also define custom trigger policies for windows. A custom trigger is called for each element that is added to a window and when certain timers expire. For example with a custom trigger, you can evaluate a window for every second element that is added. You can also define whether the elements in the window should be retained or removed after the evaluation.

Best, Fabian



2015-11-24 21:32 GMT+01:00 Anton Polyakov <[hidden email]>:
Hi Max

thanks for reply. From what I understand window works in a way that it buffers records while window is open, then apply transformation once window close is triggered and pass transformed result.
In my case then window will be open for few hours, then the whole amount of trades will be processed once window close is triggered. Actually I want to process events as they are produced without buffering them. It is more like a stream with some special mark versus windowing seems more like a batch (if I understand it correctly).

In other words - buffering and waiting for window to close, then processing will be equal to simply doing one-off processing when all events are produced. I am looking for a solution when I am processing events as they are produced and when source signals "done" my processing is also nearly done.


On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels <[hidden email]> wrote:
Hi Anton,

You should be able to model your problem using the Flink Streaming
API. The actions you want to perform on the streamed records
correspond to transformations on Windows. You can indeed use
Watermarks to signal the window that a threshold for an action has
been reached. Otherwise an eviction policy should also do it.

Without more details about what you want to do I can only refer you to
the streaming API documentation:
Please see https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html

Thanks,
Max

On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov
<[hidden email]> wrote:
> Hi
>
> I am very new to Flink and in fact never used it. My task (which I currently solve using home grown Redis-based solution) is quite simple - I have a system which produces some events (trades, it is a financial system) and computational chain which computes some measure accumulatively over these events. Those events form a long but finite stream, they are produced as a result of end of day flow. Computational logic forms a processing DAG which computes some measure over these events (VaR). Each trade is processed through DAG and at different stages might produce different set of subsequent events (like return vectors), eventually they all arrive into some aggregator which computes accumulated measure (reducer).
>
> Ideally I would like to process trades as they appear (i.e. stream them) and once producer reaches end of portfolio (there will be no more trades), I need to write final resulting measure and mark it as “end of day record”. Of course I also could use a classical batch - i.e. wait until all trades are produced and then batch process them, but this will be too inefficient.
>
> If I use Flink, I will need a sort of watermark saying - “done, no more trades” and once this watermark reaches end of DAG, final measure can be saved. More generally would be cool to have an indication at the end of DAG telling to which input stream position current measure corresponds.
>
> I feel my problem is very typical yet I can’t find any solution. All examples operate either on infinite streams where nobody cares about completion or classical batch examples which rely on fact all input data is ready.
>
> Can you please hint me.
>
> Thank you vm
> Anton



Reply | Threaded
Open this post in threaded view
|

Re: Watermarks as "process completion" flags

Anton Polyakov
I think I can turn my problem into a simpler one.

Effectively what I need - I need way to checkpoint certain events in input stream and once this checkpoint reaches end of DAG take some action. So I need a signal at the sink which can tell "all events in source before checkpointed event are now processed".

As far as I understand flagged record don't quite work since DAG doesn't propagate source events one-to-one. Some transformations might create 3 child events out of 1 source. If I want to make sure I fully processed source event, I need to wait till all childs are processed.



On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <[hidden email]> wrote:
Hi Fabian

Defining a special flag for record seems like a checkpoint barrier. I think I will end up re-implementing checkpointing myself. I found the discussion in flink-dev: mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/… which seems to solve my task. Essentially they want to have a mechanism which will mark record produced by job as “last” and then wait until it’s fully propagated through DAG. Similarly to what I need. Essentially my job which produces trades can also thought as being finished once it produced all trades, then I just need to wait till latest trade produced by this job is processed.

So although windows can probably also be applied, I think propagating barrier through DAG and checkpointing at final job is what I need.

Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like triggering a custom checkoint or finishing streaming job)? 

On 24 Nov 2015, at 21:53, Fabian Hueske <[hidden email]> wrote:

Hi Anton,

If I got your requirements right, you are looking for a solution that continuously produces updated partial aggregates in a streaming fashion. When a  special event (no more trades) is received, you would like to store the last update as a final result. Is that correct?

You can compute continuous updates using a reduce() or fold() function. These will produce a new update for each incoming event.
For example:

val s: DataStream[(Int, Long)] = ...
s.keyBy(_._1)
  .reduce( (x,y) => (x._1, y._2 + y._2) )

would continuously compute a sum for every key (_._1) and produce an update for each incoming record.

You could add a flag to the record and implement a ReduceFunction that marks a record as final when the no-more-trades event is received.
With a filter and a data sink you could emit such final records to a persistent data store.

Btw.: You can also define custom trigger policies for windows. A custom trigger is called for each element that is added to a window and when certain timers expire. For example with a custom trigger, you can evaluate a window for every second element that is added. You can also define whether the elements in the window should be retained or removed after the evaluation.

Best, Fabian



2015-11-24 21:32 GMT+01:00 Anton Polyakov <[hidden email]>:
Hi Max

thanks for reply. From what I understand window works in a way that it buffers records while window is open, then apply transformation once window close is triggered and pass transformed result.
In my case then window will be open for few hours, then the whole amount of trades will be processed once window close is triggered. Actually I want to process events as they are produced without buffering them. It is more like a stream with some special mark versus windowing seems more like a batch (if I understand it correctly).

In other words - buffering and waiting for window to close, then processing will be equal to simply doing one-off processing when all events are produced. I am looking for a solution when I am processing events as they are produced and when source signals "done" my processing is also nearly done.


On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels <[hidden email]> wrote:
Hi Anton,

You should be able to model your problem using the Flink Streaming
API. The actions you want to perform on the streamed records
correspond to transformations on Windows. You can indeed use
Watermarks to signal the window that a threshold for an action has
been reached. Otherwise an eviction policy should also do it.

Without more details about what you want to do I can only refer you to
the streaming API documentation:
Please see https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html

Thanks,
Max

On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov
<[hidden email]> wrote:
> Hi
>
> I am very new to Flink and in fact never used it. My task (which I currently solve using home grown Redis-based solution) is quite simple - I have a system which produces some events (trades, it is a financial system) and computational chain which computes some measure accumulatively over these events. Those events form a long but finite stream, they are produced as a result of end of day flow. Computational logic forms a processing DAG which computes some measure over these events (VaR). Each trade is processed through DAG and at different stages might produce different set of subsequent events (like return vectors), eventually they all arrive into some aggregator which computes accumulated measure (reducer).
>
> Ideally I would like to process trades as they appear (i.e. stream them) and once producer reaches end of portfolio (there will be no more trades), I need to write final resulting measure and mark it as “end of day record”. Of course I also could use a classical batch - i.e. wait until all trades are produced and then batch process them, but this will be too inefficient.
>
> If I use Flink, I will need a sort of watermark saying - “done, no more trades” and once this watermark reaches end of DAG, final measure can be saved. More generally would be cool to have an indication at the end of DAG telling to which input stream position current measure corresponds.
>
> I feel my problem is very typical yet I can’t find any solution. All examples operate either on infinite streams where nobody cares about completion or classical batch examples which rely on fact all input data is ready.
>
> Can you please hint me.
>
> Thank you vm
> Anton




Reply | Threaded
Open this post in threaded view
|

Re: Watermarks as "process completion" flags

Stephan Ewen
Hi Anton!

That you can do!

You can look at the interfaces "Checkpointed" and "checkpointNotifier". There you will get a call at every checkpoint (and can look at what records are before that checkpoint). You also get a call once the checkpoint is complete, which corresponds to the point when everything has flown through the DAG.

I think it is nice to implement it like that, because it works non-blocking: The stream continues while the the records-you-wait-for flow through the DAG, and you get an asynchronous notification once they have flown all the way through.

Greetings,
Stephan


On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov <[hidden email]> wrote:
I think I can turn my problem into a simpler one.

Effectively what I need - I need way to checkpoint certain events in input stream and once this checkpoint reaches end of DAG take some action. So I need a signal at the sink which can tell "all events in source before checkpointed event are now processed".

As far as I understand flagged record don't quite work since DAG doesn't propagate source events one-to-one. Some transformations might create 3 child events out of 1 source. If I want to make sure I fully processed source event, I need to wait till all childs are processed.



On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <[hidden email]> wrote:
Hi Fabian

Defining a special flag for record seems like a checkpoint barrier. I think I will end up re-implementing checkpointing myself. I found the discussion in flink-dev: mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/… which seems to solve my task. Essentially they want to have a mechanism which will mark record produced by job as “last” and then wait until it’s fully propagated through DAG. Similarly to what I need. Essentially my job which produces trades can also thought as being finished once it produced all trades, then I just need to wait till latest trade produced by this job is processed.

So although windows can probably also be applied, I think propagating barrier through DAG and checkpointing at final job is what I need.

Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like triggering a custom checkoint or finishing streaming job)? 

On 24 Nov 2015, at 21:53, Fabian Hueske <[hidden email]> wrote:

Hi Anton,

If I got your requirements right, you are looking for a solution that continuously produces updated partial aggregates in a streaming fashion. When a  special event (no more trades) is received, you would like to store the last update as a final result. Is that correct?

You can compute continuous updates using a reduce() or fold() function. These will produce a new update for each incoming event.
For example:

val s: DataStream[(Int, Long)] = ...
s.keyBy(_._1)
  .reduce( (x,y) => (x._1, y._2 + y._2) )

would continuously compute a sum for every key (_._1) and produce an update for each incoming record.

You could add a flag to the record and implement a ReduceFunction that marks a record as final when the no-more-trades event is received.
With a filter and a data sink you could emit such final records to a persistent data store.

Btw.: You can also define custom trigger policies for windows. A custom trigger is called for each element that is added to a window and when certain timers expire. For example with a custom trigger, you can evaluate a window for every second element that is added. You can also define whether the elements in the window should be retained or removed after the evaluation.

Best, Fabian



2015-11-24 21:32 GMT+01:00 Anton Polyakov <[hidden email]>:
Hi Max

thanks for reply. From what I understand window works in a way that it buffers records while window is open, then apply transformation once window close is triggered and pass transformed result.
In my case then window will be open for few hours, then the whole amount of trades will be processed once window close is triggered. Actually I want to process events as they are produced without buffering them. It is more like a stream with some special mark versus windowing seems more like a batch (if I understand it correctly).

In other words - buffering and waiting for window to close, then processing will be equal to simply doing one-off processing when all events are produced. I am looking for a solution when I am processing events as they are produced and when source signals "done" my processing is also nearly done.


On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels <[hidden email]> wrote:
Hi Anton,

You should be able to model your problem using the Flink Streaming
API. The actions you want to perform on the streamed records
correspond to transformations on Windows. You can indeed use
Watermarks to signal the window that a threshold for an action has
been reached. Otherwise an eviction policy should also do it.

Without more details about what you want to do I can only refer you to
the streaming API documentation:
Please see https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html

Thanks,
Max

On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov
<[hidden email]> wrote:
> Hi
>
> I am very new to Flink and in fact never used it. My task (which I currently solve using home grown Redis-based solution) is quite simple - I have a system which produces some events (trades, it is a financial system) and computational chain which computes some measure accumulatively over these events. Those events form a long but finite stream, they are produced as a result of end of day flow. Computational logic forms a processing DAG which computes some measure over these events (VaR). Each trade is processed through DAG and at different stages might produce different set of subsequent events (like return vectors), eventually they all arrive into some aggregator which computes accumulated measure (reducer).
>
> Ideally I would like to process trades as they appear (i.e. stream them) and once producer reaches end of portfolio (there will be no more trades), I need to write final resulting measure and mark it as “end of day record”. Of course I also could use a classical batch - i.e. wait until all trades are produced and then batch process them, but this will be too inefficient.
>
> If I use Flink, I will need a sort of watermark saying - “done, no more trades” and once this watermark reaches end of DAG, final measure can be saved. More generally would be cool to have an indication at the end of DAG telling to which input stream position current measure corresponds.
>
> I feel my problem is very typical yet I can’t find any solution. All examples operate either on infinite streams where nobody cares about completion or classical batch examples which rely on fact all input data is ready.
>
> Can you please hint me.
>
> Thank you vm
> Anton





Reply | Threaded
Open this post in threaded view
|

Re: Watermarks as "process completion" flags

Anton Polyakov
Hi Stephan

thanks that looks super. But source needs then to emit checkpoint. At the source, while reading source events I can find out that - this is the source event I want to take actions after. So if at ssource I can then emit checkpoint and catch it at the end of the DAG that would solve my problem (well, I also need to somehow distinguish my checkpoint from Flink's auto-generated ones).

Sorry for being too chatty, this is the topic where I need expert opinion, can't find out the answer by just googling.


On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen <[hidden email]> wrote:
Hi Anton!

That you can do!

You can look at the interfaces "Checkpointed" and "checkpointNotifier". There you will get a call at every checkpoint (and can look at what records are before that checkpoint). You also get a call once the checkpoint is complete, which corresponds to the point when everything has flown through the DAG.

I think it is nice to implement it like that, because it works non-blocking: The stream continues while the the records-you-wait-for flow through the DAG, and you get an asynchronous notification once they have flown all the way through.

Greetings,
Stephan


On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov <[hidden email]> wrote:
I think I can turn my problem into a simpler one.

Effectively what I need - I need way to checkpoint certain events in input stream and once this checkpoint reaches end of DAG take some action. So I need a signal at the sink which can tell "all events in source before checkpointed event are now processed".

As far as I understand flagged record don't quite work since DAG doesn't propagate source events one-to-one. Some transformations might create 3 child events out of 1 source. If I want to make sure I fully processed source event, I need to wait till all childs are processed.



On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <[hidden email]> wrote:
Hi Fabian

Defining a special flag for record seems like a checkpoint barrier. I think I will end up re-implementing checkpointing myself. I found the discussion in flink-dev: mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/… which seems to solve my task. Essentially they want to have a mechanism which will mark record produced by job as “last” and then wait until it’s fully propagated through DAG. Similarly to what I need. Essentially my job which produces trades can also thought as being finished once it produced all trades, then I just need to wait till latest trade produced by this job is processed.

So although windows can probably also be applied, I think propagating barrier through DAG and checkpointing at final job is what I need.

Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like triggering a custom checkoint or finishing streaming job)? 

On 24 Nov 2015, at 21:53, Fabian Hueske <[hidden email]> wrote:

Hi Anton,

If I got your requirements right, you are looking for a solution that continuously produces updated partial aggregates in a streaming fashion. When a  special event (no more trades) is received, you would like to store the last update as a final result. Is that correct?

You can compute continuous updates using a reduce() or fold() function. These will produce a new update for each incoming event.
For example:

val s: DataStream[(Int, Long)] = ...
s.keyBy(_._1)
  .reduce( (x,y) => (x._1, y._2 + y._2) )

would continuously compute a sum for every key (_._1) and produce an update for each incoming record.

You could add a flag to the record and implement a ReduceFunction that marks a record as final when the no-more-trades event is received.
With a filter and a data sink you could emit such final records to a persistent data store.

Btw.: You can also define custom trigger policies for windows. A custom trigger is called for each element that is added to a window and when certain timers expire. For example with a custom trigger, you can evaluate a window for every second element that is added. You can also define whether the elements in the window should be retained or removed after the evaluation.

Best, Fabian



2015-11-24 21:32 GMT+01:00 Anton Polyakov <[hidden email]>:
Hi Max

thanks for reply. From what I understand window works in a way that it buffers records while window is open, then apply transformation once window close is triggered and pass transformed result.
In my case then window will be open for few hours, then the whole amount of trades will be processed once window close is triggered. Actually I want to process events as they are produced without buffering them. It is more like a stream with some special mark versus windowing seems more like a batch (if I understand it correctly).

In other words - buffering and waiting for window to close, then processing will be equal to simply doing one-off processing when all events are produced. I am looking for a solution when I am processing events as they are produced and when source signals "done" my processing is also nearly done.


On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels <[hidden email]> wrote:
Hi Anton,

You should be able to model your problem using the Flink Streaming
API. The actions you want to perform on the streamed records
correspond to transformations on Windows. You can indeed use
Watermarks to signal the window that a threshold for an action has
been reached. Otherwise an eviction policy should also do it.

Without more details about what you want to do I can only refer you to
the streaming API documentation:
Please see https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html

Thanks,
Max

On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov
<[hidden email]> wrote:
> Hi
>
> I am very new to Flink and in fact never used it. My task (which I currently solve using home grown Redis-based solution) is quite simple - I have a system which produces some events (trades, it is a financial system) and computational chain which computes some measure accumulatively over these events. Those events form a long but finite stream, they are produced as a result of end of day flow. Computational logic forms a processing DAG which computes some measure over these events (VaR). Each trade is processed through DAG and at different stages might produce different set of subsequent events (like return vectors), eventually they all arrive into some aggregator which computes accumulated measure (reducer).
>
> Ideally I would like to process trades as they appear (i.e. stream them) and once producer reaches end of portfolio (there will be no more trades), I need to write final resulting measure and mark it as “end of day record”. Of course I also could use a classical batch - i.e. wait until all trades are produced and then batch process them, but this will be too inefficient.
>
> If I use Flink, I will need a sort of watermark saying - “done, no more trades” and once this watermark reaches end of DAG, final measure can be saved. More generally would be cool to have an indication at the end of DAG telling to which input stream position current measure corresponds.
>
> I feel my problem is very typical yet I can’t find any solution. All examples operate either on infinite streams where nobody cares about completion or classical batch examples which rely on fact all input data is ready.
>
> Can you please hint me.
>
> Thank you vm
> Anton






Reply | Threaded
Open this post in threaded view
|

Re: Watermarks as "process completion" flags

Anton Polyakov
In reply to this post by Stephan Ewen
Hi Stephan

thanks that looks super. But source needs then to emit checkpoint. At the source, while reading source events I can find out that - this is the source event I want to take actions after. So if at ssource I can then emit checkpoint and catch it at the end of the DAG that would solve my problem (well, I also need to somehow distinguish my checkpoint from Flink's auto-generated ones).

Sorry for being too chatty, this is the topic where I need expert opinion, can't find out the answer by just googling.


On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen <[hidden email]> wrote:
Hi Anton!

That you can do!

You can look at the interfaces "Checkpointed" and "checkpointNotifier". There you will get a call at every checkpoint (and can look at what records are before that checkpoint). You also get a call once the checkpoint is complete, which corresponds to the point when everything has flown through the DAG.

I think it is nice to implement it like that, because it works non-blocking: The stream continues while the the records-you-wait-for flow through the DAG, and you get an asynchronous notification once they have flown all the way through.

Greetings,
Stephan


On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov <[hidden email]> wrote:
I think I can turn my problem into a simpler one.

Effectively what I need - I need way to checkpoint certain events in input stream and once this checkpoint reaches end of DAG take some action. So I need a signal at the sink which can tell "all events in source before checkpointed event are now processed".

As far as I understand flagged record don't quite work since DAG doesn't propagate source events one-to-one. Some transformations might create 3 child events out of 1 source. If I want to make sure I fully processed source event, I need to wait till all childs are processed.



On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <[hidden email]> wrote:
Hi Fabian

Defining a special flag for record seems like a checkpoint barrier. I think I will end up re-implementing checkpointing myself. I found the discussion in flink-dev: mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/… which seems to solve my task. Essentially they want to have a mechanism which will mark record produced by job as “last” and then wait until it’s fully propagated through DAG. Similarly to what I need. Essentially my job which produces trades can also thought as being finished once it produced all trades, then I just need to wait till latest trade produced by this job is processed.

So although windows can probably also be applied, I think propagating barrier through DAG and checkpointing at final job is what I need.

Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like triggering a custom checkoint or finishing streaming job)? 

On 24 Nov 2015, at 21:53, Fabian Hueske <[hidden email]> wrote:

Hi Anton,

If I got your requirements right, you are looking for a solution that continuously produces updated partial aggregates in a streaming fashion. When a  special event (no more trades) is received, you would like to store the last update as a final result. Is that correct?

You can compute continuous updates using a reduce() or fold() function. These will produce a new update for each incoming event.
For example:

val s: DataStream[(Int, Long)] = ...
s.keyBy(_._1)
  .reduce( (x,y) => (x._1, y._2 + y._2) )

would continuously compute a sum for every key (_._1) and produce an update for each incoming record.

You could add a flag to the record and implement a ReduceFunction that marks a record as final when the no-more-trades event is received.
With a filter and a data sink you could emit such final records to a persistent data store.

Btw.: You can also define custom trigger policies for windows. A custom trigger is called for each element that is added to a window and when certain timers expire. For example with a custom trigger, you can evaluate a window for every second element that is added. You can also define whether the elements in the window should be retained or removed after the evaluation.

Best, Fabian



2015-11-24 21:32 GMT+01:00 Anton Polyakov <[hidden email]>:
Hi Max

thanks for reply. From what I understand window works in a way that it buffers records while window is open, then apply transformation once window close is triggered and pass transformed result.
In my case then window will be open for few hours, then the whole amount of trades will be processed once window close is triggered. Actually I want to process events as they are produced without buffering them. It is more like a stream with some special mark versus windowing seems more like a batch (if I understand it correctly).

In other words - buffering and waiting for window to close, then processing will be equal to simply doing one-off processing when all events are produced. I am looking for a solution when I am processing events as they are produced and when source signals "done" my processing is also nearly done.


On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels <[hidden email]> wrote:
Hi Anton,

You should be able to model your problem using the Flink Streaming
API. The actions you want to perform on the streamed records
correspond to transformations on Windows. You can indeed use
Watermarks to signal the window that a threshold for an action has
been reached. Otherwise an eviction policy should also do it.

Without more details about what you want to do I can only refer you to
the streaming API documentation:
Please see https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html

Thanks,
Max

On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov
<[hidden email]> wrote:
> Hi
>
> I am very new to Flink and in fact never used it. My task (which I currently solve using home grown Redis-based solution) is quite simple - I have a system which produces some events (trades, it is a financial system) and computational chain which computes some measure accumulatively over these events. Those events form a long but finite stream, they are produced as a result of end of day flow. Computational logic forms a processing DAG which computes some measure over these events (VaR). Each trade is processed through DAG and at different stages might produce different set of subsequent events (like return vectors), eventually they all arrive into some aggregator which computes accumulated measure (reducer).
>
> Ideally I would like to process trades as they appear (i.e. stream them) and once producer reaches end of portfolio (there will be no more trades), I need to write final resulting measure and mark it as “end of day record”. Of course I also could use a classical batch - i.e. wait until all trades are produced and then batch process them, but this will be too inefficient.
>
> If I use Flink, I will need a sort of watermark saying - “done, no more trades” and once this watermark reaches end of DAG, final measure can be saved. More generally would be cool to have an indication at the end of DAG telling to which input stream position current measure corresponds.
>
> I feel my problem is very typical yet I can’t find any solution. All examples operate either on infinite streams where nobody cares about completion or classical batch examples which rely on fact all input data is ready.
>
> Can you please hint me.
>
> Thank you vm
> Anton






Reply | Threaded
Open this post in threaded view
|

Re: Watermarks as "process completion" flags

Stephan Ewen
Hi!

If you implement the "Checkpointed" interface, you get the function calls to "snapshotState()" at the point when the checkpoint barrier arrives at an operator. So, the call to "snapshotState()" in the sink is when the barrier reaches the sink. The call to "checkpointComplete()" in the sources comes after all barriers have reached all sinks.

Have a look here for an illustration about barriers flowing with the stream: https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/stream_checkpointing.html

Stephan


On Mon, Nov 30, 2015 at 11:51 AM, Anton Polyakov <[hidden email]> wrote:
Hi Stephan

thanks that looks super. But source needs then to emit checkpoint. At the source, while reading source events I can find out that - this is the source event I want to take actions after. So if at ssource I can then emit checkpoint and catch it at the end of the DAG that would solve my problem (well, I also need to somehow distinguish my checkpoint from Flink's auto-generated ones).

Sorry for being too chatty, this is the topic where I need expert opinion, can't find out the answer by just googling.


On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen <[hidden email]> wrote:
Hi Anton!

That you can do!

You can look at the interfaces "Checkpointed" and "checkpointNotifier". There you will get a call at every checkpoint (and can look at what records are before that checkpoint). You also get a call once the checkpoint is complete, which corresponds to the point when everything has flown through the DAG.

I think it is nice to implement it like that, because it works non-blocking: The stream continues while the the records-you-wait-for flow through the DAG, and you get an asynchronous notification once they have flown all the way through.

Greetings,
Stephan


On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov <[hidden email]> wrote:
I think I can turn my problem into a simpler one.

Effectively what I need - I need way to checkpoint certain events in input stream and once this checkpoint reaches end of DAG take some action. So I need a signal at the sink which can tell "all events in source before checkpointed event are now processed".

As far as I understand flagged record don't quite work since DAG doesn't propagate source events one-to-one. Some transformations might create 3 child events out of 1 source. If I want to make sure I fully processed source event, I need to wait till all childs are processed.



On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <[hidden email]> wrote:
Hi Fabian

Defining a special flag for record seems like a checkpoint barrier. I think I will end up re-implementing checkpointing myself. I found the discussion in flink-dev: mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/… which seems to solve my task. Essentially they want to have a mechanism which will mark record produced by job as “last” and then wait until it’s fully propagated through DAG. Similarly to what I need. Essentially my job which produces trades can also thought as being finished once it produced all trades, then I just need to wait till latest trade produced by this job is processed.

So although windows can probably also be applied, I think propagating barrier through DAG and checkpointing at final job is what I need.

Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like triggering a custom checkoint or finishing streaming job)? 

On 24 Nov 2015, at 21:53, Fabian Hueske <[hidden email]> wrote:

Hi Anton,

If I got your requirements right, you are looking for a solution that continuously produces updated partial aggregates in a streaming fashion. When a  special event (no more trades) is received, you would like to store the last update as a final result. Is that correct?

You can compute continuous updates using a reduce() or fold() function. These will produce a new update for each incoming event.
For example:

val s: DataStream[(Int, Long)] = ...
s.keyBy(_._1)
  .reduce( (x,y) => (x._1, y._2 + y._2) )

would continuously compute a sum for every key (_._1) and produce an update for each incoming record.

You could add a flag to the record and implement a ReduceFunction that marks a record as final when the no-more-trades event is received.
With a filter and a data sink you could emit such final records to a persistent data store.

Btw.: You can also define custom trigger policies for windows. A custom trigger is called for each element that is added to a window and when certain timers expire. For example with a custom trigger, you can evaluate a window for every second element that is added. You can also define whether the elements in the window should be retained or removed after the evaluation.

Best, Fabian



2015-11-24 21:32 GMT+01:00 Anton Polyakov <[hidden email]>:
Hi Max

thanks for reply. From what I understand window works in a way that it buffers records while window is open, then apply transformation once window close is triggered and pass transformed result.
In my case then window will be open for few hours, then the whole amount of trades will be processed once window close is triggered. Actually I want to process events as they are produced without buffering them. It is more like a stream with some special mark versus windowing seems more like a batch (if I understand it correctly).

In other words - buffering and waiting for window to close, then processing will be equal to simply doing one-off processing when all events are produced. I am looking for a solution when I am processing events as they are produced and when source signals "done" my processing is also nearly done.


On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels <[hidden email]> wrote:
Hi Anton,

You should be able to model your problem using the Flink Streaming
API. The actions you want to perform on the streamed records
correspond to transformations on Windows. You can indeed use
Watermarks to signal the window that a threshold for an action has
been reached. Otherwise an eviction policy should also do it.

Without more details about what you want to do I can only refer you to
the streaming API documentation:
Please see https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html

Thanks,
Max

On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov
<[hidden email]> wrote:
> Hi
>
> I am very new to Flink and in fact never used it. My task (which I currently solve using home grown Redis-based solution) is quite simple - I have a system which produces some events (trades, it is a financial system) and computational chain which computes some measure accumulatively over these events. Those events form a long but finite stream, they are produced as a result of end of day flow. Computational logic forms a processing DAG which computes some measure over these events (VaR). Each trade is processed through DAG and at different stages might produce different set of subsequent events (like return vectors), eventually they all arrive into some aggregator which computes accumulated measure (reducer).
>
> Ideally I would like to process trades as they appear (i.e. stream them) and once producer reaches end of portfolio (there will be no more trades), I need to write final resulting measure and mark it as “end of day record”. Of course I also could use a classical batch - i.e. wait until all trades are produced and then batch process them, but this will be too inefficient.
>
> If I use Flink, I will need a sort of watermark saying - “done, no more trades” and once this watermark reaches end of DAG, final measure can be saved. More generally would be cool to have an indication at the end of DAG telling to which input stream position current measure corresponds.
>
> I feel my problem is very typical yet I can’t find any solution. All examples operate either on infinite streams where nobody cares about completion or classical batch examples which rely on fact all input data is ready.
>
> Can you please hint me.
>
> Thank you vm
> Anton







Reply | Threaded
Open this post in threaded view
|

Re: Watermarks as "process completion" flags

Anton Polyakov
Hi Stephan

sorry for misunderstanding, but how do I make sure barrier is placed at the proper time? How does my source "force" checkpoint to start happening once it finds that all needed elements are now produced?

On Mon, Nov 30, 2015 at 2:13 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

If you implement the "Checkpointed" interface, you get the function calls to "snapshotState()" at the point when the checkpoint barrier arrives at an operator. So, the call to "snapshotState()" in the sink is when the barrier reaches the sink. The call to "checkpointComplete()" in the sources comes after all barriers have reached all sinks.

Have a look here for an illustration about barriers flowing with the stream: https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/stream_checkpointing.html

Stephan


On Mon, Nov 30, 2015 at 11:51 AM, Anton Polyakov <[hidden email]> wrote:
Hi Stephan

thanks that looks super. But source needs then to emit checkpoint. At the source, while reading source events I can find out that - this is the source event I want to take actions after. So if at ssource I can then emit checkpoint and catch it at the end of the DAG that would solve my problem (well, I also need to somehow distinguish my checkpoint from Flink's auto-generated ones).

Sorry for being too chatty, this is the topic where I need expert opinion, can't find out the answer by just googling.


On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen <[hidden email]> wrote:
Hi Anton!

That you can do!

You can look at the interfaces "Checkpointed" and "checkpointNotifier". There you will get a call at every checkpoint (and can look at what records are before that checkpoint). You also get a call once the checkpoint is complete, which corresponds to the point when everything has flown through the DAG.

I think it is nice to implement it like that, because it works non-blocking: The stream continues while the the records-you-wait-for flow through the DAG, and you get an asynchronous notification once they have flown all the way through.

Greetings,
Stephan


On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov <[hidden email]> wrote:
I think I can turn my problem into a simpler one.

Effectively what I need - I need way to checkpoint certain events in input stream and once this checkpoint reaches end of DAG take some action. So I need a signal at the sink which can tell "all events in source before checkpointed event are now processed".

As far as I understand flagged record don't quite work since DAG doesn't propagate source events one-to-one. Some transformations might create 3 child events out of 1 source. If I want to make sure I fully processed source event, I need to wait till all childs are processed.



On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <[hidden email]> wrote:
Hi Fabian

Defining a special flag for record seems like a checkpoint barrier. I think I will end up re-implementing checkpointing myself. I found the discussion in flink-dev: mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/… which seems to solve my task. Essentially they want to have a mechanism which will mark record produced by job as “last” and then wait until it’s fully propagated through DAG. Similarly to what I need. Essentially my job which produces trades can also thought as being finished once it produced all trades, then I just need to wait till latest trade produced by this job is processed.

So although windows can probably also be applied, I think propagating barrier through DAG and checkpointing at final job is what I need.

Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like triggering a custom checkoint or finishing streaming job)? 

On 24 Nov 2015, at 21:53, Fabian Hueske <[hidden email]> wrote:

Hi Anton,

If I got your requirements right, you are looking for a solution that continuously produces updated partial aggregates in a streaming fashion. When a  special event (no more trades) is received, you would like to store the last update as a final result. Is that correct?

You can compute continuous updates using a reduce() or fold() function. These will produce a new update for each incoming event.
For example:

val s: DataStream[(Int, Long)] = ...
s.keyBy(_._1)
  .reduce( (x,y) => (x._1, y._2 + y._2) )

would continuously compute a sum for every key (_._1) and produce an update for each incoming record.

You could add a flag to the record and implement a ReduceFunction that marks a record as final when the no-more-trades event is received.
With a filter and a data sink you could emit such final records to a persistent data store.

Btw.: You can also define custom trigger policies for windows. A custom trigger is called for each element that is added to a window and when certain timers expire. For example with a custom trigger, you can evaluate a window for every second element that is added. You can also define whether the elements in the window should be retained or removed after the evaluation.

Best, Fabian



2015-11-24 21:32 GMT+01:00 Anton Polyakov <[hidden email]>:
Hi Max

thanks for reply. From what I understand window works in a way that it buffers records while window is open, then apply transformation once window close is triggered and pass transformed result.
In my case then window will be open for few hours, then the whole amount of trades will be processed once window close is triggered. Actually I want to process events as they are produced without buffering them. It is more like a stream with some special mark versus windowing seems more like a batch (if I understand it correctly).

In other words - buffering and waiting for window to close, then processing will be equal to simply doing one-off processing when all events are produced. I am looking for a solution when I am processing events as they are produced and when source signals "done" my processing is also nearly done.


On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels <[hidden email]> wrote:
Hi Anton,

You should be able to model your problem using the Flink Streaming
API. The actions you want to perform on the streamed records
correspond to transformations on Windows. You can indeed use
Watermarks to signal the window that a threshold for an action has
been reached. Otherwise an eviction policy should also do it.

Without more details about what you want to do I can only refer you to
the streaming API documentation:
Please see https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html

Thanks,
Max

On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov
<[hidden email]> wrote:
> Hi
>
> I am very new to Flink and in fact never used it. My task (which I currently solve using home grown Redis-based solution) is quite simple - I have a system which produces some events (trades, it is a financial system) and computational chain which computes some measure accumulatively over these events. Those events form a long but finite stream, they are produced as a result of end of day flow. Computational logic forms a processing DAG which computes some measure over these events (VaR). Each trade is processed through DAG and at different stages might produce different set of subsequent events (like return vectors), eventually they all arrive into some aggregator which computes accumulated measure (reducer).
>
> Ideally I would like to process trades as they appear (i.e. stream them) and once producer reaches end of portfolio (there will be no more trades), I need to write final resulting measure and mark it as “end of day record”. Of course I also could use a classical batch - i.e. wait until all trades are produced and then batch process them, but this will be too inefficient.
>
> If I use Flink, I will need a sort of watermark saying - “done, no more trades” and once this watermark reaches end of DAG, final measure can be saved. More generally would be cool to have an indication at the end of DAG telling to which input stream position current measure corresponds.
>
> I feel my problem is very typical yet I can’t find any solution. All examples operate either on infinite streams where nobody cares about completion or classical batch examples which rely on fact all input data is ready.
>
> Can you please hint me.
>
> Thank you vm
> Anton








Reply | Threaded
Open this post in threaded view
|

Re: Watermarks as "process completion" flags

Stephan Ewen
You cannot force a barrier at one point in time. At what time checkpoints are triggered is decided by the master node.

I think in your case you can use the checkpoint and notification calls to figure out when data has flown through the DAG, but you cannot force a barrier at a specific point.

On Mon, Nov 30, 2015 at 3:33 PM, Anton Polyakov <[hidden email]> wrote:
Hi Stephan

sorry for misunderstanding, but how do I make sure barrier is placed at the proper time? How does my source "force" checkpoint to start happening once it finds that all needed elements are now produced?

On Mon, Nov 30, 2015 at 2:13 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

If you implement the "Checkpointed" interface, you get the function calls to "snapshotState()" at the point when the checkpoint barrier arrives at an operator. So, the call to "snapshotState()" in the sink is when the barrier reaches the sink. The call to "checkpointComplete()" in the sources comes after all barriers have reached all sinks.

Have a look here for an illustration about barriers flowing with the stream: https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/stream_checkpointing.html

Stephan


On Mon, Nov 30, 2015 at 11:51 AM, Anton Polyakov <[hidden email]> wrote:
Hi Stephan

thanks that looks super. But source needs then to emit checkpoint. At the source, while reading source events I can find out that - this is the source event I want to take actions after. So if at ssource I can then emit checkpoint and catch it at the end of the DAG that would solve my problem (well, I also need to somehow distinguish my checkpoint from Flink's auto-generated ones).

Sorry for being too chatty, this is the topic where I need expert opinion, can't find out the answer by just googling.


On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen <[hidden email]> wrote:
Hi Anton!

That you can do!

You can look at the interfaces "Checkpointed" and "checkpointNotifier". There you will get a call at every checkpoint (and can look at what records are before that checkpoint). You also get a call once the checkpoint is complete, which corresponds to the point when everything has flown through the DAG.

I think it is nice to implement it like that, because it works non-blocking: The stream continues while the the records-you-wait-for flow through the DAG, and you get an asynchronous notification once they have flown all the way through.

Greetings,
Stephan


On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov <[hidden email]> wrote:
I think I can turn my problem into a simpler one.

Effectively what I need - I need way to checkpoint certain events in input stream and once this checkpoint reaches end of DAG take some action. So I need a signal at the sink which can tell "all events in source before checkpointed event are now processed".

As far as I understand flagged record don't quite work since DAG doesn't propagate source events one-to-one. Some transformations might create 3 child events out of 1 source. If I want to make sure I fully processed source event, I need to wait till all childs are processed.



On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <[hidden email]> wrote:
Hi Fabian

Defining a special flag for record seems like a checkpoint barrier. I think I will end up re-implementing checkpointing myself. I found the discussion in flink-dev: mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/… which seems to solve my task. Essentially they want to have a mechanism which will mark record produced by job as “last” and then wait until it’s fully propagated through DAG. Similarly to what I need. Essentially my job which produces trades can also thought as being finished once it produced all trades, then I just need to wait till latest trade produced by this job is processed.

So although windows can probably also be applied, I think propagating barrier through DAG and checkpointing at final job is what I need.

Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like triggering a custom checkoint or finishing streaming job)? 

On 24 Nov 2015, at 21:53, Fabian Hueske <[hidden email]> wrote:

Hi Anton,

If I got your requirements right, you are looking for a solution that continuously produces updated partial aggregates in a streaming fashion. When a  special event (no more trades) is received, you would like to store the last update as a final result. Is that correct?

You can compute continuous updates using a reduce() or fold() function. These will produce a new update for each incoming event.
For example:

val s: DataStream[(Int, Long)] = ...
s.keyBy(_._1)
  .reduce( (x,y) => (x._1, y._2 + y._2) )

would continuously compute a sum for every key (_._1) and produce an update for each incoming record.

You could add a flag to the record and implement a ReduceFunction that marks a record as final when the no-more-trades event is received.
With a filter and a data sink you could emit such final records to a persistent data store.

Btw.: You can also define custom trigger policies for windows. A custom trigger is called for each element that is added to a window and when certain timers expire. For example with a custom trigger, you can evaluate a window for every second element that is added. You can also define whether the elements in the window should be retained or removed after the evaluation.

Best, Fabian



2015-11-24 21:32 GMT+01:00 Anton Polyakov <[hidden email]>:
Hi Max

thanks for reply. From what I understand window works in a way that it buffers records while window is open, then apply transformation once window close is triggered and pass transformed result.
In my case then window will be open for few hours, then the whole amount of trades will be processed once window close is triggered. Actually I want to process events as they are produced without buffering them. It is more like a stream with some special mark versus windowing seems more like a batch (if I understand it correctly).

In other words - buffering and waiting for window to close, then processing will be equal to simply doing one-off processing when all events are produced. I am looking for a solution when I am processing events as they are produced and when source signals "done" my processing is also nearly done.


On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels <[hidden email]> wrote:
Hi Anton,

You should be able to model your problem using the Flink Streaming
API. The actions you want to perform on the streamed records
correspond to transformations on Windows. You can indeed use
Watermarks to signal the window that a threshold for an action has
been reached. Otherwise an eviction policy should also do it.

Without more details about what you want to do I can only refer you to
the streaming API documentation:
Please see https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html

Thanks,
Max

On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov
<[hidden email]> wrote:
> Hi
>
> I am very new to Flink and in fact never used it. My task (which I currently solve using home grown Redis-based solution) is quite simple - I have a system which produces some events (trades, it is a financial system) and computational chain which computes some measure accumulatively over these events. Those events form a long but finite stream, they are produced as a result of end of day flow. Computational logic forms a processing DAG which computes some measure over these events (VaR). Each trade is processed through DAG and at different stages might produce different set of subsequent events (like return vectors), eventually they all arrive into some aggregator which computes accumulated measure (reducer).
>
> Ideally I would like to process trades as they appear (i.e. stream them) and once producer reaches end of portfolio (there will be no more trades), I need to write final resulting measure and mark it as “end of day record”. Of course I also could use a classical batch - i.e. wait until all trades are produced and then batch process them, but this will be too inefficient.
>
> If I use Flink, I will need a sort of watermark saying - “done, no more trades” and once this watermark reaches end of DAG, final measure can be saved. More generally would be cool to have an indication at the end of DAG telling to which input stream position current measure corresponds.
>
> I feel my problem is very typical yet I can’t find any solution. All examples operate either on infinite streams where nobody cares about completion or classical batch examples which rely on fact all input data is ready.
>
> Can you please hint me.
>
> Thank you vm
> Anton









Reply | Threaded
Open this post in threaded view
|

Watermarks as "process completion" flags

Anton Polyakov
I think overall it would a very usefull feature to have ability to track procession of source stream events by attaching barriers to them and reacting on them in processing stages. working with time windows cant help since processing can involve some long running operations (eg db queries) and working with markers/event counts cant work either as diring processing events might spawn child events.

However without ability to specify where in the source you put a barrier one cant do it.


On Mon, Nov 30, 2015 at 3:35 PM, Stephan Ewen <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;sewen@apache.org&#39;);" target="_blank">sewen@...> wrote:
You cannot force a barrier at one point in time. At what time checkpoints are triggered is decided by the master node.

I think in your case you can use the checkpoint and notification calls to figure out when data has flown through the DAG, but you cannot force a barrier at a specific point.

On Mon, Nov 30, 2015 at 3:33 PM, Anton Polyakov <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;polyakov.anton@gmail.com&#39;);" target="_blank">polyakov.anton@...> wrote:
Hi Stephan

sorry for misunderstanding, but how do I make sure barrier is placed at the proper time? How does my source "force" checkpoint to start happening once it finds that all needed elements are now produced?

On Mon, Nov 30, 2015 at 2:13 PM, Stephan Ewen <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;sewen@apache.org&#39;);" target="_blank">sewen@...> wrote:
Hi!

If you implement the "Checkpointed" interface, you get the function calls to "snapshotState()" at the point when the checkpoint barrier arrives at an operator. So, the call to "snapshotState()" in the sink is when the barrier reaches the sink. The call to "checkpointComplete()" in the sources comes after all barriers have reached all sinks.

Have a look here for an illustration about barriers flowing with the stream: https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/stream_checkpointing.html

Stephan


On Mon, Nov 30, 2015 at 11:51 AM, Anton Polyakov <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;polyakov.anton@gmail.com&#39;);" target="_blank">polyakov.anton@...> wrote:
Hi Stephan

thanks that looks super. But source needs then to emit checkpoint. At the source, while reading source events I can find out that - this is the source event I want to take actions after. So if at ssource I can then emit checkpoint and catch it at the end of the DAG that would solve my problem (well, I also need to somehow distinguish my checkpoint from Flink's auto-generated ones).

Sorry for being too chatty, this is the topic where I need expert opinion, can't find out the answer by just googling.


On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;sewen@apache.org&#39;);" target="_blank">sewen@...> wrote:
Hi Anton!

That you can do!

You can look at the interfaces "Checkpointed" and "checkpointNotifier". There you will get a call at every checkpoint (and can look at what records are before that checkpoint). You also get a call once the checkpoint is complete, which corresponds to the point when everything has flown through the DAG.

I think it is nice to implement it like that, because it works non-blocking: The stream continues while the the records-you-wait-for flow through the DAG, and you get an asynchronous notification once they have flown all the way through.

Greetings,
Stephan


On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;polyakov.anton@gmail.com&#39;);" target="_blank">polyakov.anton@...> wrote:
I think I can turn my problem into a simpler one.

Effectively what I need - I need way to checkpoint certain events in input stream and once this checkpoint reaches end of DAG take some action. So I need a signal at the sink which can tell "all events in source before checkpointed event are now processed".

As far as I understand flagged record don't quite work since DAG doesn't propagate source events one-to-one. Some transformations might create 3 child events out of 1 source. If I want to make sure I fully processed source event, I need to wait till all childs are processed.



On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;polyakov.anton@gmail.com&#39;);" target="_blank">polyakov.anton@...> wrote:
Hi Fabian

Defining a special flag for record seems like a checkpoint barrier. I think I will end up re-implementing checkpointing myself. I found the discussion in flink-dev: mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/… which seems to solve my task. Essentially they want to have a mechanism which will mark record produced by job as “last” and then wait until it’s fully propagated through DAG. Similarly to what I need. Essentially my job which produces trades can also thought as being finished once it produced all trades, then I just need to wait till latest trade produced by this job is processed.

So although windows can probably also be applied, I think propagating barrier through DAG and checkpointing at final job is what I need.

Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like triggering a custom checkoint or finishing streaming job)? 

On 24 Nov 2015, at 21:53, Fabian Hueske <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;fhueske@gmail.com&#39;);" target="_blank">fhueske@...> wrote:

Hi Anton,

If I got your requirements right, you are looking for a solution that continuously produces updated partial aggregates in a streaming fashion. When a  special event (no more trades) is received, you would like to store the last update as a final result. Is that correct?

You can compute continuous updates using a reduce() or fold() function. These will produce a new update for each incoming event.
For example:

val s: DataStream[(Int, Long)] = ...
s.keyBy(_._1)
  .reduce( (x,y) => (x._1, y._2 + y._2) )

would continuously compute a sum for every key (_._1) and produce an update for each incoming record.

You could add a flag to the record and implement a ReduceFunction that marks a record as final when the no-more-trades event is received.
With a filter and a data sink you could emit such final records to a persistent data store.

Btw.: You can also define custom trigger policies for windows. A custom trigger is called for each element that is added to a window and when certain timers expire. For example with a custom trigger, you can evaluate a window for every second element that is added. You can also define whether the elements in the window should be retained or removed after the evaluation.

Best, Fabian



2015-11-24 21:32 GMT+01:00 Anton Polyakov <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;polyakov.anton@gmail.com&#39;);" target="_blank">polyakov.anton@...>:
Hi Max

thanks for reply. From what I understand window works in a way that it buffers records while window is open, then apply transformation once window close is triggered and pass transformed result.
In my case then window will be open for few hours, then the whole amount of trades will be processed once window close is triggered. Actually I want to process events as they are produced without buffering them. It is more like a stream with some special mark versus windowing seems more like a batch (if I understand it correctly).

In other words - buffering and waiting for window to close, then processing will be equal to simply doing one-off processing when all events are produced. I am looking for a solution when I am processing events as they are produced and when source signals "done" my processing is also nearly done.


On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;mxm@apache.org&#39;);" target="_blank">mxm@...> wrote:
Hi Anton,

You should be able to model your problem using the Flink Streaming
API. The actions you want to perform on the streamed records
correspond to transformations on Windows. You can indeed use
Watermarks to signal the window that a threshold for an action has
been reached. Otherwise an eviction policy should also do it.

Without more details about what you want to do I can only refer you to
the streaming API documentation:
Please see https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html

Thanks,
Max

On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov
<<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;polyakov.anton@gmail.com&#39;);" target="_blank">polyakov.anton@...> wrote:
> Hi
>
> I am very new to Flink and in fact never used it. My task (which I currently solve using home grown Redis-based solution) is quite simple - I have a system which produces some events (trades, it is a financial system) and computational chain which computes some measure accumulatively over these events. Those events form a long but finite stream, they are produced as a result of end of day flow. Computational logic forms a processing DAG which computes some measure over these events (VaR). Each trade is processed through DAG and at different stages might produce different set of subsequent events (like return vectors), eventually they all arrive into some aggregator which computes accumulated measure (reducer).
>
> Ideally I would like to process trades as they appear (i.e. stream them) and once producer reaches end of portfolio (there will be no more trades), I need to write final resulting measure and mark it as “end of day record”. Of course I also could use a classical batch - i.e. wait until all trades are produced and then batch process them, but this will be too inefficient.
>
> If I use Flink, I will need a sort of watermark saying - “done, no more trades” and once this watermark reaches end of DAG, final measure can be saved. More generally would be cool to have an indication at the end of DAG telling to which input stream position current measure corresponds.
>
> I feel my problem is very typical yet I can’t find any solution. All examples operate either on infinite streams where nobody cares about completion or classical batch examples which rely on fact all input data is ready.
>
> Can you please hint me.
>
> Thank you vm
> Anton