How to deal with blocking calls inside a Sink?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to deal with blocking calls inside a Sink?

Federico D'Ambrosio
Hi, I've implemented a sink for Hive as a RichSinkFunction, but once I've integrated it in my current flink job, I noticed that the processing of the events slowed down really bad, I guess because of some blocking calls that need to be when interacting with hive streaming api.

So, what can be done to make it so the throughput doesn't get hurt by these calls? I guess increasing (by a lot) the parallelism of the sink operator could be a solution, but I'd think it's not really a good one.

Maybe using the AsyncFunction API? Decoupling the sink in a buffer which sends the data + operations to be made in the asyncInvoke method of the AsyncFunction?

Any suggestion is appreciated.
Kind regards,
Federico D'Ambrosio
Reply | Threaded
Open this post in threaded view
|

Re: How to deal with blocking calls inside a Sink?

Timo Walther
Hi Federico,

would it help to buffer events first and perform batches of insertions for better throughtput? I saw some similar work recently here: https://tech.signavio.com/2017/postgres-flink-sink

But I would first try the AsyncIO approach, because actually this is a use case it was made for.

Regards,
Timo


Am 10/2/17 um 11:53 AM schrieb Federico D'Ambrosio:
Hi, I've implemented a sink for Hive as a RichSinkFunction, but once I've integrated it in my current flink job, I noticed that the processing of the events slowed down really bad, I guess because of some blocking calls that need to be when interacting with hive streaming api.

So, what can be done to make it so the throughput doesn't get hurt by these calls? I guess increasing (by a lot) the parallelism of the sink operator could be a solution, but I'd think it's not really a good one.

Maybe using the AsyncFunction API? Decoupling the sink in a buffer which sends the data + operations to be made in the asyncInvoke method of the AsyncFunction?

Any suggestion is appreciated.
Kind regards,
Federico D'Ambrosio


Reply | Threaded
Open this post in threaded view
|

Re: How to deal with blocking calls inside a Sink?

Federico D'Ambrosio-2
Hi Timo,

thank you for your response. Just yesterday I tried using the jdbc connector and unfortunately I found out that HivePreparedStatement and HiveStatement implementations still don't have an addBatch implementation, whose interface is being used in the connector. The first dirty solution that came to my mind was to slightly modify the current JDBCOutputFormat in order to make a concatenation of insert queries, thus strings, to pass on to the execute() method of the Statement.

I guess that using AsyncIO would be really the best approach, as you're saying.

Regards,
Federico

2017-10-02 12:17 GMT+02:00 Timo Walther <[hidden email]>:
Hi Federico,

would it help to buffer events first and perform batches of insertions for better throughtput? I saw some similar work recently here: https://tech.signavio.com/2017/postgres-flink-sink

But I would first try the AsyncIO approach, because actually this is a use case it was made for.

Regards,
Timo


Am 10/2/17 um 11:53 AM schrieb Federico D'Ambrosio:
Hi, I've implemented a sink for Hive as a RichSinkFunction, but once I've integrated it in my current flink job, I noticed that the processing of the events slowed down really bad, I guess because of some blocking calls that need to be when interacting with hive streaming api.

So, what can be done to make it so the throughput doesn't get hurt by these calls? I guess increasing (by a lot) the parallelism of the sink operator could be a solution, but I'd think it's not really a good one.

Maybe using the AsyncFunction API? Decoupling the sink in a buffer which sends the data + operations to be made in the asyncInvoke method of the AsyncFunction?

Any suggestion is appreciated.
Kind regards,
Federico D'Ambrosio





--
Federico D'Ambrosio