Sink buffering

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Sink buffering

nragon
Hi,

Just like mentioned at Berlin FF17, Pravega talk, can we simulate, somehow,
sink buffering(pravega transactions) and coordinate them with checkpoints?
My intension is to buffer records before sending them to hbase.
Any opinions or tips?

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Sink buffering

nragon
Reply | Threaded
Open this post in threaded view
|

Re: Sink buffering

Piotr Nowojski
In reply to this post by nragon
Hi,

Do you mean buffer on state and you want to achieve exactly-once HBase sink? If so keep in mind that you will need some kind of transactions support in HBase to make it 100% reliable.

Without transactions, buffering messages on state only reduces chance of duplicated records. How much “reduced” depends on checkpointing interval and how long does it take to rewrite messages from state buffer to HBase - if checkpoint interval is 10s, rewriting takes 2s, your duplicate chances are 2/10 of what you would have without buffering, because you will actively write to HBase only 20% of time you would normally write without the buffering.

Having said that you can take a look at following classes on which you could base your sink:
GenericWriteAheadSink - probably this is better for you
TwoPhaseCommitSinkFunction - this one is in master branch, but hasn’t yet been released

Piotrek

> On Sep 29, 2017, at 6:21 PM, nragon <[hidden email]> wrote:
>
> Hi,
>
> Just like mentioned at Berlin FF17, Pravega talk, can we simulate, somehow,
> sink buffering(pravega transactions) and coordinate them with checkpoints?
> My intension is to buffer records before sending them to hbase.
> Any opinions or tips?
>
> Thanks
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Sink buffering

nragon
Thanks for you opinion on this.
TwoPhaseCommitSinkFunction would probably be the best solution overall.
Using this with something like Phoenix or Tephra would probably work.
This always depends on checkpointing interval right?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Sink buffering

Piotr Nowojski
What do you mean by "This always depends on checkpointing interval right?”?

In TwoPhaseCommitSinkFunction, transactions are being committed on each Flink checkpoint. I guess same applies to GenericWriteAheadSink. The first one just commits/pre-commits the data on checkpoint, second rewrites them from the state.

If your sink supports transactions (and especially it can handle committing same transaction twice), then both should be able to provide exactly-once.

Piotrek

> On Oct 4, 2017, at 10:30 AM, nragon <[hidden email]> wrote:
>
> Thanks for you opinion on this.
> TwoPhaseCommitSinkFunction would probably be the best solution overall.
> Using this with something like Phoenix or Tephra would probably work.
> This always depends on checkpointing interval right?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Sink buffering

nragon
checkpointing interval ~= transactions are being committed on each Flink
checkpoint
So, if i set my checkpoint interval to 10000ms, every 10000ms there will be
a commit, right?
If I understoop correctly, TwoPhaseCommitSinkFunction stores transactions
into it's state as for GenericWriteAheadSink it stores events which are then
commited on completed?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Sink buffering

Piotr Nowojski
Interval - Yes.

TwoPhaseCommitSinkFunction - yes, but it depends how will you implement your “Transaction” class, it wouldn’t make a lot of sense, but you could store events inside the transaction “pojo”.

Piotrek

> On Oct 4, 2017, at 12:45 PM, nragon <[hidden email]> wrote:
>
> checkpointing interval ~= transactions are being committed on each Flink
> checkpoint
> So, if i set my checkpoint interval to 10000ms, every 10000ms there will be
> a commit, right?
> If I understoop correctly, TwoPhaseCommitSinkFunction stores transactions
> into it's state as for GenericWriteAheadSink it stores events which are then
> commited on completed?
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Sink buffering

nragon