does the flink sink only support bio?

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

does the flink sink only support bio?

Jinhua Luo
Hi, all.

The invoke method of sink seems no way to make async io? e.g. returns Future?

For example, the redis connector uses jedis lib to execute redis
command synchronously:

https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java

Then it will block the task thread of flink waiting the network
response from redis server per command?! Is it possible for other
operators running in the same thread with sink? If so, then it would
block them too?

I know flink has asyncio api, but it seems not for used by sink impl?

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html

Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: does the flink sink only support bio?

Stefan Richter
Hi,

Flink currently does not offer async sinks out of the box, but there is no fundamental problem against having them and we will probably offer something is this direction in the future. In the meantime, you can build something like this by replacing the sink with an async io operator that acts as sink (i.e. does the writes to the db) followed by a discarding sink for compliance with the API.

You need to be a bit careful if your sink needs exactly-once semantics. In this case things should either be idempotent or the db must support rolling back changes between checkpoints, e.g. via transactions. Commits should be triggered for confirmed checkpoints („notifyCheckpointComplete“).

Your assumptions about the blocking behavior of the non-async sinks is correct.

Best,
Stefan

> Am 08.12.2017 um 08:11 schrieb Jinhua Luo <[hidden email]>:
>
> Hi, all.
>
> The invoke method of sink seems no way to make async io? e.g. returns Future?
>
> For example, the redis connector uses jedis lib to execute redis
> command synchronously:
>
> https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
>
> Then it will block the task thread of flink waiting the network
> response from redis server per command?! Is it possible for other
> operators running in the same thread with sink? If so, then it would
> block them too?
>
> I know flink has asyncio api, but it seems not for used by sink impl?
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
>
> Thanks.

Reply | Threaded
Open this post in threaded view
|

Re: does the flink sink only support bio?

Jinhua Luo
Thank you very much!

I have two new questions:

1) the async operator must emit some value to the async collector
(even it acts as a sink), right?

2) How could I use CheckpointListener with async operator? Could you
give a simple example or doc page?


2017-12-08 18:25 GMT+08:00 Stefan Richter <[hidden email]>:

> Hi,
>
> Flink currently does not offer async sinks out of the box, but there is no fundamental problem against having them and we will probably offer something is this direction in the future. In the meantime, you can build something like this by replacing the sink with an async io operator that acts as sink (i.e. does the writes to the db) followed by a discarding sink for compliance with the API.
>
> You need to be a bit careful if your sink needs exactly-once semantics. In this case things should either be idempotent or the db must support rolling back changes between checkpoints, e.g. via transactions. Commits should be triggered for confirmed checkpoints („notifyCheckpointComplete“).
>
> Your assumptions about the blocking behavior of the non-async sinks is correct.
>
> Best,
> Stefan
>
>> Am 08.12.2017 um 08:11 schrieb Jinhua Luo <[hidden email]>:
>>
>> Hi, all.
>>
>> The invoke method of sink seems no way to make async io? e.g. returns Future?
>>
>> For example, the redis connector uses jedis lib to execute redis
>> command synchronously:
>>
>> https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
>>
>> Then it will block the task thread of flink waiting the network
>> response from redis server per command?! Is it possible for other
>> operators running in the same thread with sink? If so, then it would
>> block them too?
>>
>> I know flink has asyncio api, but it seems not for used by sink impl?
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
>>
>> Thanks.
>
Reply | Threaded
Open this post in threaded view
|

Re: does the flink sink only support bio?

Stefan Richter

> I have two new questions:
>
> 1) the async operator must emit some value to the async collector
> (even it acts as a sink), right?
>

I think so, but you should be able to simply return empty collection.

> 2) How could I use CheckpointListener with async operator? Could you
> give a simple example or doc page?
>

There is an interface „CheckpointListener“ that your user function can implement that gives you a callback for completed checkpoints.
Reply | Threaded
Open this post in threaded view
|

Re: does the flink sink only support bio?

Jinhua Luo
In reply to this post by Stefan Richter
2017-12-08 18:25 GMT+08:00 Stefan Richter <[hidden email]>:
> You need to be a bit careful if your sink needs exactly-once semantics. In this case things should either be idempotent or the db must support rolling back changes between checkpoints, e.g. via transactions. Commits should be triggered for confirmed checkpoints („notifyCheckpointComplete“).

I doubt if we have a risk here: in notifyCheckpointComplete, the
checkpoint was completed, and if the process crashes (or machine
failure) before it commits the db, the flink would restart the app,
restoring the state from the last checkpoint, but it would not invoke
notifyCheckpointComplete again? correct? if so, we would miss the
database ingress for the data between the last two checkpoints, am I
correct?
Reply | Threaded
Open this post in threaded view
|

Re: does the flink sink only support bio?

Stefan Richter


> Am 01.01.2018 um 15:22 schrieb Jinhua Luo <[hidden email]>:
>
> 2017-12-08 18:25 GMT+08:00 Stefan Richter <[hidden email]>:
>> You need to be a bit careful if your sink needs exactly-once semantics. In this case things should either be idempotent or the db must support rolling back changes between checkpoints, e.g. via transactions. Commits should be triggered for confirmed checkpoints („notifyCheckpointComplete“).
>
> I doubt if we have a risk here: in notifyCheckpointComplete, the
> checkpoint was completed, and if the process crashes (or machine
> failure) before it commits the db, the flink would restart the app,
> restoring the state from the last checkpoint, but it would not invoke
> notifyCheckpointComplete again? correct? if so, we would miss the
> database ingress for the data between the last two checkpoints, am I
> correct?

Yes, that is correct. What I was talking about was more the opposite problem,i.e. committing too early. In that case, you could have committed for a checkpoint that failed afterwards, and recovery will start from an earlier checkpoint but with your commit already applied. You should only commit after you received the notification or else your semantics can be down to „at-least-once".

Reply | Threaded
Open this post in threaded view
|

Re: does the flink sink only support bio?

Jinhua Luo
Then how to implement exactly-once async io? That is, neither missing
data or duplicating data.

Is there some way to index data by checkpoint id and records which
checkpoints already commit to db? But that means we need MapState,
right?

However, the async-io operator normally follows other operators, e.g.
fold, so it normally faces the DataStream but not KeyedStream, and
DataStream only supports ListState, right?


2018-01-03 18:43 GMT+08:00 Stefan Richter <[hidden email]>:

>
>
>> Am 01.01.2018 um 15:22 schrieb Jinhua Luo <[hidden email]>:
>>
>> 2017-12-08 18:25 GMT+08:00 Stefan Richter <[hidden email]>:
>>> You need to be a bit careful if your sink needs exactly-once semantics. In this case things should either be idempotent or the db must support rolling back changes between checkpoints, e.g. via transactions. Commits should be triggered for confirmed checkpoints („notifyCheckpointComplete“).
>>
>> I doubt if we have a risk here: in notifyCheckpointComplete, the
>> checkpoint was completed, and if the process crashes (or machine
>> failure) before it commits the db, the flink would restart the app,
>> restoring the state from the last checkpoint, but it would not invoke
>> notifyCheckpointComplete again? correct? if so, we would miss the
>> database ingress for the data between the last two checkpoints, am I
>> correct?
>
> Yes, that is correct. What I was talking about was more the opposite problem,i.e. committing too early. In that case, you could have committed for a checkpoint that failed afterwards, and recovery will start from an earlier checkpoint but with your commit already applied. You should only commit after you received the notification or else your semantics can be down to „at-least-once".
>
Reply | Threaded
Open this post in threaded view
|

Re: does the flink sink only support bio?

Stefan Richter

Hi,


Then how to implement exactly-once async io? That is, neither missing
data or duplicating data.


"Fault Tolerance Guarantees:
The asynchronous I/O operator offers full exactly-once fault tolerance guarantees. It stores the records for in-flight asynchronous requests in checkpoints and restores/re-triggers the requests when recovering from a failure.“

So it is already handled by Flink in a way that supports exactly-once.

Is there some way to index data by checkpoint id and records which
checkpoints already commit to db? But that means we need MapState,
right?

The information required depends a bit on the store that you are using, maybe the last confirmed checkpoint id is enough, but maybe you require something more. This transaction information is probably not „by-key“, but „per-operator“, so I would suggest to use operator state (see next answer). Btw the implementation of async operators does something very similar to restore pending requests, and you can see the code in „AsyncWaitOperator".


However, the async-io operator normally follows other operators, e.g.
fold, so it normally faces the DataStream but not KeyedStream, and
DataStream only supports ListState, right?

You can use non-keyed state, aka operator state, to store such information. See here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state . It does not require a KeyedSteam.

Best,
Stefan



2018-01-03 18:43 GMT+08:00 Stefan Richter <[hidden email]>:


Am 01.01.2018 um 15:22 schrieb Jinhua Luo <[hidden email]>:

2017-12-08 18:25 GMT+08:00 Stefan Richter <[hidden email]>:
You need to be a bit careful if your sink needs exactly-once semantics. In this case things should either be idempotent or the db must support rolling back changes between checkpoints, e.g. via transactions. Commits should be triggered for confirmed checkpoints („notifyCheckpointComplete“).

I doubt if we have a risk here: in notifyCheckpointComplete, the
checkpoint was completed, and if the process crashes (or machine
failure) before it commits the db, the flink would restart the app,
restoring the state from the last checkpoint, but it would not invoke
notifyCheckpointComplete again? correct? if so, we would miss the
database ingress for the data between the last two checkpoints, am I
correct?

Yes, that is correct. What I was talking about was more the opposite problem,i.e. committing too early. In that case, you could have committed for a checkpoint that failed afterwards, and recovery will start from an earlier checkpoint but with your commit already applied. You should only commit after you received the notification or else your semantics can be down to „at-least-once".


Reply | Threaded
Open this post in threaded view
|

Re: does the flink sink only support bio?

Jinhua Luo
No, I mean how to implement exactly-once db commit (given our async io
target is mysql), not the state used by flink.
As mentioned in previous mail, if I commit db in
notifyCheckpointComplete, we have a risk to lost data (lost commit,
and flink restart would not trigger notifyCheckpointComplete for the
last checkpoint again).
On the other hand, if I update and commit per record, the sql/stored
procedure have to handle duplicate updates at failure restart.

So, when or where to commit so that we could get exactly-once db ingress.

2018-01-03 21:57 GMT+08:00 Stefan Richter <[hidden email]>:

>
> Hi,
>
>
> Then how to implement exactly-once async io? That is, neither missing
> data or duplicating data.
>
>
> From the docs about async IO here
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
> :
>
> "Fault Tolerance Guarantees:
> The asynchronous I/O operator offers full exactly-once fault tolerance
> guarantees. It stores the records for in-flight asynchronous requests in
> checkpoints and restores/re-triggers the requests when recovering from a
> failure.“
>
> So it is already handled by Flink in a way that supports exactly-once.
>
> Is there some way to index data by checkpoint id and records which
> checkpoints already commit to db? But that means we need MapState,
> right?
>
>
> The information required depends a bit on the store that you are using,
> maybe the last confirmed checkpoint id is enough, but maybe you require
> something more. This transaction information is probably not „by-key“, but
> „per-operator“, so I would suggest to use operator state (see next answer).
> Btw the implementation of async operators does something very similar to
> restore pending requests, and you can see the code in „AsyncWaitOperator".
>
>
> However, the async-io operator normally follows other operators, e.g.
> fold, so it normally faces the DataStream but not KeyedStream, and
> DataStream only supports ListState, right?
>
>
> You can use non-keyed state, aka operator state, to store such information.
> See here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state
> . It does not require a KeyedSteam.
>
> Best,
> Stefan
>
>
>
> 2018-01-03 18:43 GMT+08:00 Stefan Richter <[hidden email]>:
>
>
>
> Am 01.01.2018 um 15:22 schrieb Jinhua Luo <[hidden email]>:
>
> 2017-12-08 18:25 GMT+08:00 Stefan Richter <[hidden email]>:
>
> You need to be a bit careful if your sink needs exactly-once semantics. In
> this case things should either be idempotent or the db must support rolling
> back changes between checkpoints, e.g. via transactions. Commits should be
> triggered for confirmed checkpoints („notifyCheckpointComplete“).
>
>
> I doubt if we have a risk here: in notifyCheckpointComplete, the
> checkpoint was completed, and if the process crashes (or machine
> failure) before it commits the db, the flink would restart the app,
> restoring the state from the last checkpoint, but it would not invoke
> notifyCheckpointComplete again? correct? if so, we would miss the
> database ingress for the data between the last two checkpoints, am I
> correct?
>
>
> Yes, that is correct. What I was talking about was more the opposite
> problem,i.e. committing too early. In that case, you could have committed
> for a checkpoint that failed afterwards, and recovery will start from an
> earlier checkpoint but with your commit already applied. You should only
> commit after you received the notification or else your semantics can be
> down to „at-least-once".
>
>
Reply | Threaded
Open this post in threaded view
|

Re: does the flink sink only support bio?

Stefan Richter
I think a mix of async UPDATES and exactly-once all this might be tricky, and the typical use case for async IO is more about reads. So let’s take a step back: what would you like to achieve with this? Do you want a read-modify-update (e.g. a map function that queries and updates a DB) or just updates (like a sink based that goes against a DB). From the previous question, I assume the second case applies, in which case I wonder why you even need to be async for a sink? I think a much better approach could be based on Flink's TwoPhaseCommitSinkFunction, and maybe use some some batching to lower update costs.

On top of the TwoPhaseCommitSinkFunction, you could implement transactions against your DB, similar to e.g. this example with Postgres: http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/ .

Does this help or do you really need async read-modify-update?

Best,
Stefan

Am 03.01.2018 um 15:08 schrieb Jinhua Luo <[hidden email]>:

No, I mean how to implement exactly-once db commit (given our async io
target is mysql), not the state used by flink.
As mentioned in previous mail, if I commit db in
notifyCheckpointComplete, we have a risk to lost data (lost commit,
and flink restart would not trigger notifyCheckpointComplete for the
last checkpoint again).
On the other hand, if I update and commit per record, the sql/stored
procedure have to handle duplicate updates at failure restart.

So, when or where to commit so that we could get exactly-once db ingress.

2018-01-03 21:57 GMT+08:00 Stefan Richter <[hidden email]>:

Hi,


Then how to implement exactly-once async io? That is, neither missing
data or duplicating data.


From the docs about async IO here
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
:

"Fault Tolerance Guarantees:
The asynchronous I/O operator offers full exactly-once fault tolerance
guarantees. It stores the records for in-flight asynchronous requests in
checkpoints and restores/re-triggers the requests when recovering from a
failure.“

So it is already handled by Flink in a way that supports exactly-once.

Is there some way to index data by checkpoint id and records which
checkpoints already commit to db? But that means we need MapState,
right?


The information required depends a bit on the store that you are using,
maybe the last confirmed checkpoint id is enough, but maybe you require
something more. This transaction information is probably not „by-key“, but
„per-operator“, so I would suggest to use operator state (see next answer).
Btw the implementation of async operators does something very similar to
restore pending requests, and you can see the code in „AsyncWaitOperator".


However, the async-io operator normally follows other operators, e.g.
fold, so it normally faces the DataStream but not KeyedStream, and
DataStream only supports ListState, right?


You can use non-keyed state, aka operator state, to store such information.
See here:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state
. It does not require a KeyedSteam.

Best,
Stefan



2018-01-03 18:43 GMT+08:00 Stefan Richter <[hidden email]>:



Am 01.01.2018 um 15:22 schrieb Jinhua Luo <[hidden email]>:

2017-12-08 18:25 GMT+08:00 Stefan Richter <[hidden email]>:

You need to be a bit careful if your sink needs exactly-once semantics. In
this case things should either be idempotent or the db must support rolling
back changes between checkpoints, e.g. via transactions. Commits should be
triggered for confirmed checkpoints („notifyCheckpointComplete“).


I doubt if we have a risk here: in notifyCheckpointComplete, the
checkpoint was completed, and if the process crashes (or machine
failure) before it commits the db, the flink would restart the app,
restoring the state from the last checkpoint, but it would not invoke
notifyCheckpointComplete again? correct? if so, we would miss the
database ingress for the data between the last two checkpoints, am I
correct?


Yes, that is correct. What I was talking about was more the opposite
problem,i.e. committing too early. In that case, you could have committed
for a checkpoint that failed afterwards, and recovery will start from an
earlier checkpoint but with your commit already applied. You should only
commit after you received the notification or else your semantics can be
down to „at-least-once".



Reply | Threaded
Open this post in threaded view
|

Re: does the flink sink only support bio?

Jinhua Luo
The TwoPhaseCommitSinkFunction seems to record the transaction status
in the state just like what I imagine above, correct?
and if the progress fails before commit, in the later restart, the
commit would be triggered again, correct? So the commit would not be
forgotten, correct?

2018-01-03 22:54 GMT+08:00 Stefan Richter <[hidden email]>:

> I think a mix of async UPDATES and exactly-once all this might be tricky,
> and the typical use case for async IO is more about reads. So let’s take a
> step back: what would you like to achieve with this? Do you want a
> read-modify-update (e.g. a map function that queries and updates a DB) or
> just updates (like a sink based that goes against a DB). From the previous
> question, I assume the second case applies, in which case I wonder why you
> even need to be async for a sink? I think a much better approach could be
> based on Flink's TwoPhaseCommitSinkFunction, and maybe use some some
> batching to lower update costs.
>
> On top of the TwoPhaseCommitSinkFunction, you could implement transactions
> against your DB, similar to e.g. this example with Postgres:
> http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/
> .
>
> Does this help or do you really need async read-modify-update?
>
> Best,
> Stefan
>
>
> Am 03.01.2018 um 15:08 schrieb Jinhua Luo <[hidden email]>:
>
> No, I mean how to implement exactly-once db commit (given our async io
> target is mysql), not the state used by flink.
> As mentioned in previous mail, if I commit db in
> notifyCheckpointComplete, we have a risk to lost data (lost commit,
> and flink restart would not trigger notifyCheckpointComplete for the
> last checkpoint again).
> On the other hand, if I update and commit per record, the sql/stored
> procedure have to handle duplicate updates at failure restart.
>
> So, when or where to commit so that we could get exactly-once db ingress.
>
> 2018-01-03 21:57 GMT+08:00 Stefan Richter <[hidden email]>:
>
>
> Hi,
>
>
> Then how to implement exactly-once async io? That is, neither missing
> data or duplicating data.
>
>
> From the docs about async IO here
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
> :
>
> "Fault Tolerance Guarantees:
> The asynchronous I/O operator offers full exactly-once fault tolerance
> guarantees. It stores the records for in-flight asynchronous requests in
> checkpoints and restores/re-triggers the requests when recovering from a
> failure.“
>
> So it is already handled by Flink in a way that supports exactly-once.
>
> Is there some way to index data by checkpoint id and records which
> checkpoints already commit to db? But that means we need MapState,
> right?
>
>
> The information required depends a bit on the store that you are using,
> maybe the last confirmed checkpoint id is enough, but maybe you require
> something more. This transaction information is probably not „by-key“, but
> „per-operator“, so I would suggest to use operator state (see next answer).
> Btw the implementation of async operators does something very similar to
> restore pending requests, and you can see the code in „AsyncWaitOperator".
>
>
> However, the async-io operator normally follows other operators, e.g.
> fold, so it normally faces the DataStream but not KeyedStream, and
> DataStream only supports ListState, right?
>
>
> You can use non-keyed state, aka operator state, to store such information.
> See here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state
> . It does not require a KeyedSteam.
>
> Best,
> Stefan
>
>
>
> 2018-01-03 18:43 GMT+08:00 Stefan Richter <[hidden email]>:
>
>
>
> Am 01.01.2018 um 15:22 schrieb Jinhua Luo <[hidden email]>:
>
> 2017-12-08 18:25 GMT+08:00 Stefan Richter <[hidden email]>:
>
> You need to be a bit careful if your sink needs exactly-once semantics. In
> this case things should either be idempotent or the db must support rolling
> back changes between checkpoints, e.g. via transactions. Commits should be
> triggered for confirmed checkpoints („notifyCheckpointComplete“).
>
>
> I doubt if we have a risk here: in notifyCheckpointComplete, the
> checkpoint was completed, and if the process crashes (or machine
> failure) before it commits the db, the flink would restart the app,
> restoring the state from the last checkpoint, but it would not invoke
> notifyCheckpointComplete again? correct? if so, we would miss the
> database ingress for the data between the last two checkpoints, am I
> correct?
>
>
> Yes, that is correct. What I was talking about was more the opposite
> problem,i.e. committing too early. In that case, you could have committed
> for a checkpoint that failed afterwards, and recovery will start from an
> earlier checkpoint but with your commit already applied. You should only
> commit after you received the notification or else your semantics can be
> down to „at-least-once".
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: does the flink sink only support bio?

Stefan Richter
Yes, that is how it works.

> Am 04.01.2018 um 14:47 schrieb Jinhua Luo <[hidden email]>:
>
> The TwoPhaseCommitSinkFunction seems to record the transaction status
> in the state just like what I imagine above, correct?
> and if the progress fails before commit, in the later restart, the
> commit would be triggered again, correct? So the commit would not be
> forgotten, correct?
>
> 2018-01-03 22:54 GMT+08:00 Stefan Richter <[hidden email]>:
>> I think a mix of async UPDATES and exactly-once all this might be tricky,
>> and the typical use case for async IO is more about reads. So let’s take a
>> step back: what would you like to achieve with this? Do you want a
>> read-modify-update (e.g. a map function that queries and updates a DB) or
>> just updates (like a sink based that goes against a DB). From the previous
>> question, I assume the second case applies, in which case I wonder why you
>> even need to be async for a sink? I think a much better approach could be
>> based on Flink's TwoPhaseCommitSinkFunction, and maybe use some some
>> batching to lower update costs.
>>
>> On top of the TwoPhaseCommitSinkFunction, you could implement transactions
>> against your DB, similar to e.g. this example with Postgres:
>> http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/
>> .
>>
>> Does this help or do you really need async read-modify-update?
>>
>> Best,
>> Stefan
>>
>>
>> Am 03.01.2018 um 15:08 schrieb Jinhua Luo <[hidden email]>:
>>
>> No, I mean how to implement exactly-once db commit (given our async io
>> target is mysql), not the state used by flink.
>> As mentioned in previous mail, if I commit db in
>> notifyCheckpointComplete, we have a risk to lost data (lost commit,
>> and flink restart would not trigger notifyCheckpointComplete for the
>> last checkpoint again).
>> On the other hand, if I update and commit per record, the sql/stored
>> procedure have to handle duplicate updates at failure restart.
>>
>> So, when or where to commit so that we could get exactly-once db ingress.
>>
>> 2018-01-03 21:57 GMT+08:00 Stefan Richter <[hidden email]>:
>>
>>
>> Hi,
>>
>>
>> Then how to implement exactly-once async io? That is, neither missing
>> data or duplicating data.
>>
>>
>> From the docs about async IO here
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
>> :
>>
>> "Fault Tolerance Guarantees:
>> The asynchronous I/O operator offers full exactly-once fault tolerance
>> guarantees. It stores the records for in-flight asynchronous requests in
>> checkpoints and restores/re-triggers the requests when recovering from a
>> failure.“
>>
>> So it is already handled by Flink in a way that supports exactly-once.
>>
>> Is there some way to index data by checkpoint id and records which
>> checkpoints already commit to db? But that means we need MapState,
>> right?
>>
>>
>> The information required depends a bit on the store that you are using,
>> maybe the last confirmed checkpoint id is enough, but maybe you require
>> something more. This transaction information is probably not „by-key“, but
>> „per-operator“, so I would suggest to use operator state (see next answer).
>> Btw the implementation of async operators does something very similar to
>> restore pending requests, and you can see the code in „AsyncWaitOperator".
>>
>>
>> However, the async-io operator normally follows other operators, e.g.
>> fold, so it normally faces the DataStream but not KeyedStream, and
>> DataStream only supports ListState, right?
>>
>>
>> You can use non-keyed state, aka operator state, to store such information.
>> See here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state
>> . It does not require a KeyedSteam.
>>
>> Best,
>> Stefan
>>
>>
>>
>> 2018-01-03 18:43 GMT+08:00 Stefan Richter <[hidden email]>:
>>
>>
>>
>> Am 01.01.2018 um 15:22 schrieb Jinhua Luo <[hidden email]>:
>>
>> 2017-12-08 18:25 GMT+08:00 Stefan Richter <[hidden email]>:
>>
>> You need to be a bit careful if your sink needs exactly-once semantics. In
>> this case things should either be idempotent or the db must support rolling
>> back changes between checkpoints, e.g. via transactions. Commits should be
>> triggered for confirmed checkpoints („notifyCheckpointComplete“).
>>
>>
>> I doubt if we have a risk here: in notifyCheckpointComplete, the
>> checkpoint was completed, and if the process crashes (or machine
>> failure) before it commits the db, the flink would restart the app,
>> restoring the state from the last checkpoint, but it would not invoke
>> notifyCheckpointComplete again? correct? if so, we would miss the
>> database ingress for the data between the last two checkpoints, am I
>> correct?
>>
>>
>> Yes, that is correct. What I was talking about was more the opposite
>> problem,i.e. committing too early. In that case, you could have committed
>> for a checkpoint that failed afterwards, and recovery will start from an
>> earlier checkpoint but with your commit already applied. You should only
>> commit after you received the notification or else your semantics can be
>> down to „at-least-once".
>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: does the flink sink only support bio?

Tony Wei
Hi Stefan,

Since TwoPhaseCommitSinkFunction is new to me, I would like to know more details.

There are two more questions:
1.  If `snapshotState` failed at the first checkpoint, does it mean there is no state and no transaction can be aborted by default?
2. I saw FlinkKafkaProducer011 has a transaction id pool, which has multiple ids to be reused by producer, and it aborts all ids in this pool in the `initializeState`. Is this pool designed for the situation in the first problem or something I haven't noticed?

Thank you.

Best Regards,
Tony Wei

2018-01-04 22:15 GMT+08:00 Stefan Richter <[hidden email]>:
Yes, that is how it works.

> Am 04.01.2018 um 14:47 schrieb Jinhua Luo <[hidden email]>:
>
> The TwoPhaseCommitSinkFunction seems to record the transaction status
> in the state just like what I imagine above, correct?
> and if the progress fails before commit, in the later restart, the
> commit would be triggered again, correct? So the commit would not be
> forgotten, correct?
>
> 2018-01-03 22:54 GMT+08:00 Stefan Richter <[hidden email]>:
>> I think a mix of async UPDATES and exactly-once all this might be tricky,
>> and the typical use case for async IO is more about reads. So let’s take a
>> step back: what would you like to achieve with this? Do you want a
>> read-modify-update (e.g. a map function that queries and updates a DB) or
>> just updates (like a sink based that goes against a DB). From the previous
>> question, I assume the second case applies, in which case I wonder why you
>> even need to be async for a sink? I think a much better approach could be
>> based on Flink's TwoPhaseCommitSinkFunction, and maybe use some some
>> batching to lower update costs.
>>
>> On top of the TwoPhaseCommitSinkFunction, you could implement transactions
>> against your DB, similar to e.g. this example with Postgres:
>> http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/
>> .
>>
>> Does this help or do you really need async read-modify-update?
>>
>> Best,
>> Stefan
>>
>>
>> Am 03.01.2018 um 15:08 schrieb Jinhua Luo <[hidden email]>:
>>
>> No, I mean how to implement exactly-once db commit (given our async io
>> target is mysql), not the state used by flink.
>> As mentioned in previous mail, if I commit db in
>> notifyCheckpointComplete, we have a risk to lost data (lost commit,
>> and flink restart would not trigger notifyCheckpointComplete for the
>> last checkpoint again).
>> On the other hand, if I update and commit per record, the sql/stored
>> procedure have to handle duplicate updates at failure restart.
>>
>> So, when or where to commit so that we could get exactly-once db ingress.
>>
>> 2018-01-03 21:57 GMT+08:00 Stefan Richter <[hidden email]>:
>>
>>
>> Hi,
>>
>>
>> Then how to implement exactly-once async io? That is, neither missing
>> data or duplicating data.
>>
>>
>> From the docs about async IO here
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
>> :
>>
>> "Fault Tolerance Guarantees:
>> The asynchronous I/O operator offers full exactly-once fault tolerance
>> guarantees. It stores the records for in-flight asynchronous requests in
>> checkpoints and restores/re-triggers the requests when recovering from a
>> failure.“
>>
>> So it is already handled by Flink in a way that supports exactly-once.
>>
>> Is there some way to index data by checkpoint id and records which
>> checkpoints already commit to db? But that means we need MapState,
>> right?
>>
>>
>> The information required depends a bit on the store that you are using,
>> maybe the last confirmed checkpoint id is enough, but maybe you require
>> something more. This transaction information is probably not „by-key“, but
>> „per-operator“, so I would suggest to use operator state (see next answer).
>> Btw the implementation of async operators does something very similar to
>> restore pending requests, and you can see the code in „AsyncWaitOperator".
>>
>>
>> However, the async-io operator normally follows other operators, e.g.
>> fold, so it normally faces the DataStream but not KeyedStream, and
>> DataStream only supports ListState, right?
>>
>>
>> You can use non-keyed state, aka operator state, to store such information.
>> See here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state
>> . It does not require a KeyedSteam.
>>
>> Best,
>> Stefan
>>
>>
>>
>> 2018-01-03 18:43 GMT+08:00 Stefan Richter <[hidden email]>:
>>
>>
>>
>> Am 01.01.2018 um 15:22 schrieb Jinhua Luo <[hidden email]>:
>>
>> 2017-12-08 18:25 GMT+08:00 Stefan Richter <[hidden email]>:
>>
>> You need to be a bit careful if your sink needs exactly-once semantics. In
>> this case things should either be idempotent or the db must support rolling
>> back changes between checkpoints, e.g. via transactions. Commits should be
>> triggered for confirmed checkpoints („notifyCheckpointComplete“).
>>
>>
>> I doubt if we have a risk here: in notifyCheckpointComplete, the
>> checkpoint was completed, and if the process crashes (or machine
>> failure) before it commits the db, the flink would restart the app,
>> restoring the state from the last checkpoint, but it would not invoke
>> notifyCheckpointComplete again? correct? if so, we would miss the
>> database ingress for the data between the last two checkpoints, am I
>> correct?
>>
>>
>> Yes, that is correct. What I was talking about was more the opposite
>> problem,i.e. committing too early. In that case, you could have committed
>> for a checkpoint that failed afterwards, and recovery will start from an
>> earlier checkpoint but with your commit already applied. You should only
>> commit after you received the notification or else your semantics can be
>> down to „at-least-once".
>>
>>
>>


Reply | Threaded
Open this post in threaded view
|

Re: does the flink sink only support bio?

Stefan Richter
Hi,

1.  If `snapshotState` failed at the first checkpoint, does it mean there is no state and no transaction can be aborted by default?

This is a general problem and not only limited to the first checkpoint. Whenever you open a transaction, there is no guaranteed way to store it in persistent state to abort it in case of failure. In theory, your job can crash at any point after you just opened a transaction. So in the end I guess you must rely on something like e.g. timeout based mechanism. You can do some _best effort_ to proactively cancel uncommitted transactions through methods like states, listing them in files, or having a fixed pool of transaction ids and iterate them all for cancellation on a restart.

2. I saw FlinkKafkaProducer011 has a transaction id pool, which has multiple ids to be reused by producer, and it aborts all ids in this pool in the `initializeState`. Is this pool designed for the situation in the first problem or something I haven't noticed?

This implementation is very specific for KafkaProducer and is not necessarily a good blueprint for what you are planning. In particular, in this case there is a fixed and limited universe of all potential transaction ids that a task can potentially (re)use, so after a restart without state we can simply iterate all possible transaction ids and issue a cancel for all of them. In general, you don’t always know all possible transaction ids in a way that allows you to opportunistically cancel all potential orphaned transactions. 

2018-01-04 22:15 GMT+08:00 Stefan Richter <[hidden email]>:
Yes, that is how it works.

> Am 04.01.2018 um 14:47 schrieb Jinhua Luo <[hidden email]>:
>
> The TwoPhaseCommitSinkFunction seems to record the transaction status
> in the state just like what I imagine above, correct?
> and if the progress fails before commit, in the later restart, the
> commit would be triggered again, correct? So the commit would not be
> forgotten, correct?
>
> 2018-01-03 22:54 GMT+08:00 Stefan Richter <[hidden email]>:
>> I think a mix of async UPDATES and exactly-once all this might be tricky,
>> and the typical use case for async IO is more about reads. So let’s take a
>> step back: what would you like to achieve with this? Do you want a
>> read-modify-update (e.g. a map function that queries and updates a DB) or
>> just updates (like a sink based that goes against a DB). From the previous
>> question, I assume the second case applies, in which case I wonder why you
>> even need to be async for a sink? I think a much better approach could be
>> based on Flink's TwoPhaseCommitSinkFunction, and maybe use some some
>> batching to lower update costs.
>>
>> On top of the TwoPhaseCommitSinkFunction, you could implement transactions
>> against your DB, similar to e.g. this example with Postgres:
>> http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/
>> .
>>
>> Does this help or do you really need async read-modify-update?
>>
>> Best,
>> Stefan
>>
>>
>> Am 03.01.2018 um 15:08 schrieb Jinhua Luo <[hidden email]>:
>>
>> No, I mean how to implement exactly-once db commit (given our async io
>> target is mysql), not the state used by flink.
>> As mentioned in previous mail, if I commit db in
>> notifyCheckpointComplete, we have a risk to lost data (lost commit,
>> and flink restart would not trigger notifyCheckpointComplete for the
>> last checkpoint again).
>> On the other hand, if I update and commit per record, the sql/stored
>> procedure have to handle duplicate updates at failure restart.
>>
>> So, when or where to commit so that we could get exactly-once db ingress.
>>
>> 2018-01-03 21:57 GMT+08:00 Stefan Richter <[hidden email]>:
>>
>>
>> Hi,
>>
>>
>> Then how to implement exactly-once async io? That is, neither missing
>> data or duplicating data.
>>
>>
>> From the docs about async IO here
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
>> :
>>
>> "Fault Tolerance Guarantees:
>> The asynchronous I/O operator offers full exactly-once fault tolerance
>> guarantees. It stores the records for in-flight asynchronous requests in
>> checkpoints and restores/re-triggers the requests when recovering from a
>> failure.“
>>
>> So it is already handled by Flink in a way that supports exactly-once.
>>
>> Is there some way to index data by checkpoint id and records which
>> checkpoints already commit to db? But that means we need MapState,
>> right?
>>
>>
>> The information required depends a bit on the store that you are using,
>> maybe the last confirmed checkpoint id is enough, but maybe you require
>> something more. This transaction information is probably not „by-key“, but
>> „per-operator“, so I would suggest to use operator state (see next answer).
>> Btw the implementation of async operators does something very similar to
>> restore pending requests, and you can see the code in „AsyncWaitOperator".
>>
>>
>> However, the async-io operator normally follows other operators, e.g.
>> fold, so it normally faces the DataStream but not KeyedStream, and
>> DataStream only supports ListState, right?
>>
>>
>> You can use non-keyed state, aka operator state, to store such information.
>> See here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state
>> . It does not require a KeyedSteam.
>>
>> Best,
>> Stefan
>>
>>
>>
>> 2018-01-03 18:43 GMT+08:00 Stefan Richter <[hidden email]>:
>>
>>
>>
>> Am 01.01.2018 um 15:22 schrieb Jinhua Luo <[hidden email]>:
>>
>> 2017-12-08 18:25 GMT+08:00 Stefan Richter <[hidden email]>:
>>
>> You need to be a bit careful if your sink needs exactly-once semantics. In
>> this case things should either be idempotent or the db must support rolling
>> back changes between checkpoints, e.g. via transactions. Commits should be
>> triggered for confirmed checkpoints („notifyCheckpointComplete“).
>>
>>
>> I doubt if we have a risk here: in notifyCheckpointComplete, the
>> checkpoint was completed, and if the process crashes (or machine
>> failure) before it commits the db, the flink would restart the app,
>> restoring the state from the last checkpoint, but it would not invoke
>> notifyCheckpointComplete again? correct? if so, we would miss the
>> database ingress for the data between the last two checkpoints, am I
>> correct?
>>
>>
>> Yes, that is correct. What I was talking about was more the opposite
>> problem,i.e. committing too early. In that case, you could have committed
>> for a checkpoint that failed afterwards, and recovery will start from an
>> earlier checkpoint but with your commit already applied. You should only
>> commit after you received the notification or else your semantics can be
>> down to „at-least-once".
>>
>>
>>



Reply | Threaded
Open this post in threaded view
|

Re: does the flink sink only support bio?

Tony Wei
Hi Stefan,

Your reply really helps me a lot. Thank you.

2018-01-08 19:38 GMT+08:00 Stefan Richter <[hidden email]>:
Hi,

1.  If `snapshotState` failed at the first checkpoint, does it mean there is no state and no transaction can be aborted by default?

This is a general problem and not only limited to the first checkpoint. Whenever you open a transaction, there is no guaranteed way to store it in persistent state to abort it in case of failure. In theory, your job can crash at any point after you just opened a transaction. So in the end I guess you must rely on something like e.g. timeout based mechanism. You can do some _best effort_ to proactively cancel uncommitted transactions through methods like states, listing them in files, or having a fixed pool of transaction ids and iterate them all for cancellation on a restart.

2. I saw FlinkKafkaProducer011 has a transaction id pool, which has multiple ids to be reused by producer, and it aborts all ids in this pool in the `initializeState`. Is this pool designed for the situation in the first problem or something I haven't noticed?

This implementation is very specific for KafkaProducer and is not necessarily a good blueprint for what you are planning. In particular, in this case there is a fixed and limited universe of all potential transaction ids that a task can potentially (re)use, so after a restart without state we can simply iterate all possible transaction ids and issue a cancel for all of them. In general, you don’t always know all possible transaction ids in a way that allows you to opportunistically cancel all potential orphaned transactions. 

2018-01-04 22:15 GMT+08:00 Stefan Richter <[hidden email]>:
Yes, that is how it works.

> Am 04.01.2018 um 14:47 schrieb Jinhua Luo <[hidden email]>:
>
> The TwoPhaseCommitSinkFunction seems to record the transaction status
> in the state just like what I imagine above, correct?
> and if the progress fails before commit, in the later restart, the
> commit would be triggered again, correct? So the commit would not be
> forgotten, correct?
>
> 2018-01-03 22:54 GMT+08:00 Stefan Richter <[hidden email]>:
>> I think a mix of async UPDATES and exactly-once all this might be tricky,
>> and the typical use case for async IO is more about reads. So let’s take a
>> step back: what would you like to achieve with this? Do you want a
>> read-modify-update (e.g. a map function that queries and updates a DB) or
>> just updates (like a sink based that goes against a DB). From the previous
>> question, I assume the second case applies, in which case I wonder why you
>> even need to be async for a sink? I think a much better approach could be
>> based on Flink's TwoPhaseCommitSinkFunction, and maybe use some some
>> batching to lower update costs.
>>
>> On top of the TwoPhaseCommitSinkFunction, you could implement transactions
>> against your DB, similar to e.g. this example with Postgres:
>> http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/
>> .
>>
>> Does this help or do you really need async read-modify-update?
>>
>> Best,
>> Stefan
>>
>>
>> Am 03.01.2018 um 15:08 schrieb Jinhua Luo <[hidden email]>:
>>
>> No, I mean how to implement exactly-once db commit (given our async io
>> target is mysql), not the state used by flink.
>> As mentioned in previous mail, if I commit db in
>> notifyCheckpointComplete, we have a risk to lost data (lost commit,
>> and flink restart would not trigger notifyCheckpointComplete for the
>> last checkpoint again).
>> On the other hand, if I update and commit per record, the sql/stored
>> procedure have to handle duplicate updates at failure restart.
>>
>> So, when or where to commit so that we could get exactly-once db ingress.
>>
>> 2018-01-03 21:57 GMT+08:00 Stefan Richter <[hidden email]>:
>>
>>
>> Hi,
>>
>>
>> Then how to implement exactly-once async io? That is, neither missing
>> data or duplicating data.
>>
>>
>> From the docs about async IO here
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
>> :
>>
>> "Fault Tolerance Guarantees:
>> The asynchronous I/O operator offers full exactly-once fault tolerance
>> guarantees. It stores the records for in-flight asynchronous requests in
>> checkpoints and restores/re-triggers the requests when recovering from a
>> failure.“
>>
>> So it is already handled by Flink in a way that supports exactly-once.
>>
>> Is there some way to index data by checkpoint id and records which
>> checkpoints already commit to db? But that means we need MapState,
>> right?
>>
>>
>> The information required depends a bit on the store that you are using,
>> maybe the last confirmed checkpoint id is enough, but maybe you require
>> something more. This transaction information is probably not „by-key“, but
>> „per-operator“, so I would suggest to use operator state (see next answer).
>> Btw the implementation of async operators does something very similar to
>> restore pending requests, and you can see the code in „AsyncWaitOperator".
>>
>>
>> However, the async-io operator normally follows other operators, e.g.
>> fold, so it normally faces the DataStream but not KeyedStream, and
>> DataStream only supports ListState, right?
>>
>>
>> You can use non-keyed state, aka operator state, to store such information.
>> See here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state
>> . It does not require a KeyedSteam.
>>
>> Best,
>> Stefan
>>
>>
>>
>> 2018-01-03 18:43 GMT+08:00 Stefan Richter <[hidden email]>:
>>
>>
>>
>> Am 01.01.2018 um 15:22 schrieb Jinhua Luo <[hidden email]>:
>>
>> 2017-12-08 18:25 GMT+08:00 Stefan Richter <[hidden email]>:
>>
>> You need to be a bit careful if your sink needs exactly-once semantics. In
>> this case things should either be idempotent or the db must support rolling
>> back changes between checkpoints, e.g. via transactions. Commits should be
>> triggered for confirmed checkpoints („notifyCheckpointComplete“).
>>
>>
>> I doubt if we have a risk here: in notifyCheckpointComplete, the
>> checkpoint was completed, and if the process crashes (or machine
>> failure) before it commits the db, the flink would restart the app,
>> restoring the state from the last checkpoint, but it would not invoke
>> notifyCheckpointComplete again? correct? if so, we would miss the
>> database ingress for the data between the last two checkpoints, am I
>> correct?
>>
>>
>> Yes, that is correct. What I was talking about was more the opposite
>> problem,i.e. committing too early. In that case, you could have committed
>> for a checkpoint that failed afterwards, and recovery will start from an
>> earlier checkpoint but with your commit already applied. You should only
>> commit after you received the notification or else your semantics can be
>> down to „at-least-once".
>>
>>
>>