Hello,
We are using flink-connector-elasticsearch6_2.11 to ingest stream data to ES by using bulk requests. From ES metrics, we observed some bulk thread pool rejections. Contacted AWS team, their explanation is part of bulk request was rejected. Response body should include status for each item. For bulk thread pool rejection, the error code is 429. Our flink app override FailureHandler to process error cases. I checked Flink code, it has AfterBulk() method to handle item errors. FailureHandler() never received any 429 error. Is that flink issue? Or we need to config something to make it work? Thanks, Qihua |
Hi,
Have you tried to change bulk.flush.backoff.enable? According to the docs [1], the underlying ES BulkProcessor will retry (by default), so the provided failure handler might not be called. [1] https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor Regards, Roman On Thu, May 20, 2021 at 10:08 PM Qihua Yang <[hidden email]> wrote: > > Hello, > We are using flink-connector-elasticsearch6_2.11 to ingest stream data to ES by using bulk requests. From ES metrics, we observed some bulk thread pool rejections. Contacted AWS team, their explanation is part of bulk request was rejected. Response body should include status for each item. For bulk thread pool rejection, the error code is 429. > Our flink app override FailureHandler to process error cases. > I checked Flink code, it has AfterBulk() method to handle item errors. FailureHandler() never received any 429 error. > Is that flink issue? Or we need to config something to make it work? > Thanks, > > Qihua |
Thank you for the reply! Yes, we did config bulk.flush.backoff.enable. So, ES BulkProcessor retried after bulk request was partially rejected. And eventually that request was sent successfully? That is why failure handler was not called? Thanks, Qihua On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan <[hidden email]> wrote: Hi, |
> So, ES BulkProcessor retried after bulk request was partially rejected. And eventually that request was sent successfully? That is why failure handler was not called?
If the bulk request fails after the max number of retries (bulk.flush.backoff.retries), the failure handler will still be called. Best, Yangze Guo On Fri, May 21, 2021 at 5:53 AM Qihua Yang <[hidden email]> wrote: > > Thank you for the reply! > Yes, we did config bulk.flush.backoff.enable. > So, ES BulkProcessor retried after bulk request was partially rejected. And eventually that request was sent successfully? That is why failure handler was not called? > > Thanks, > Qihua > > On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan <[hidden email]> wrote: >> >> Hi, >> >> Have you tried to change bulk.flush.backoff.enable? >> According to the docs [1], the underlying ES BulkProcessor will retry >> (by default), so the provided failure handler might not be called. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor >> >> Regards, >> Roman >> >> On Thu, May 20, 2021 at 10:08 PM Qihua Yang <[hidden email]> wrote: >> > >> > Hello, >> > We are using flink-connector-elasticsearch6_2.11 to ingest stream data to ES by using bulk requests. From ES metrics, we observed some bulk thread pool rejections. Contacted AWS team, their explanation is part of bulk request was rejected. Response body should include status for each item. For bulk thread pool rejection, the error code is 429. >> > Our flink app override FailureHandler to process error cases. >> > I checked Flink code, it has AfterBulk() method to handle item errors. FailureHandler() never received any 429 error. >> > Is that flink issue? Or we need to config something to make it work? >> > Thanks, >> > >> > Qihua |
Got it! thanks for helping. On Thu, May 20, 2021 at 7:15 PM Yangze Guo <[hidden email]> wrote: > So, ES BulkProcessor retried after bulk request was partially rejected. And eventually that request was sent successfully? That is why failure handler was not called? |
If you are using es connector 6.*, actually there is a deadlock bug if the backoff is enabled. The 'retry' and 'flush' share one thread pool which has only one thread. Sometimes the one holding the thread tries to get the semaphore which is hold by the one
who tries to get the thread. Therefore please upgrade to connector 7.*.
发件人: Qihua Yang <[hidden email]>
发送时间: 2021年5月24日 23:17 收件人: Yangze Guo <[hidden email]> 抄送: [hidden email] <[hidden email]>; user <[hidden email]> 主题: Re: ES sink never receive error code Got it! thanks for helping.
On Thu, May 20, 2021 at 7:15 PM Yangze Guo <[hidden email]> wrote:
> So, ES BulkProcessor retried after bulk request was partially rejected. And eventually that request was sent successfully? That is why failure handler was not called? |
Jacky is right. It's a known issue and will be fixed in FLINK-21511.
Best, Yangze Guo On Tue, May 25, 2021 at 8:40 AM Jacky Yin 殷传旺 <[hidden email]> wrote: > > If you are using es connector 6.*, actually there is a deadlock bug if the backoff is enabled. The 'retry' and 'flush' share one thread pool which has only one thread. Sometimes the one holding the thread tries to get the semaphore which is hold by the one who tries to get the thread. Therefore please upgrade to connector 7.*. > > ________________________________ > 发件人: Qihua Yang <[hidden email]> > 发送时间: 2021年5月24日 23:17 > 收件人: Yangze Guo <[hidden email]> > 抄送: [hidden email] <[hidden email]>; user <[hidden email]> > 主题: Re: ES sink never receive error code > > Got it! thanks for helping. > > On Thu, May 20, 2021 at 7:15 PM Yangze Guo <[hidden email]> wrote: > > > So, ES BulkProcessor retried after bulk request was partially rejected. And eventually that request was sent successfully? That is why failure handler was not called? > > If the bulk request fails after the max number of retries > (bulk.flush.backoff.retries), the failure handler will still be > called. > > > Best, > Yangze Guo > > On Fri, May 21, 2021 at 5:53 AM Qihua Yang <[hidden email]> wrote: > > > > Thank you for the reply! > > Yes, we did config bulk.flush.backoff.enable. > > So, ES BulkProcessor retried after bulk request was partially rejected. And eventually that request was sent successfully? That is why failure handler was not called? > > > > Thanks, > > Qihua > > > > On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan <[hidden email]> wrote: > >> > >> Hi, > >> > >> Have you tried to change bulk.flush.backoff.enable? > >> According to the docs [1], the underlying ES BulkProcessor will retry > >> (by default), so the provided failure handler might not be called. > >> > >> [1] > >> https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor > >> > >> Regards, > >> Roman > >> > >> On Thu, May 20, 2021 at 10:08 PM Qihua Yang <[hidden email]> wrote: > >> > > >> > Hello, > >> > We are using flink-connector-elasticsearch6_2.11 to ingest stream data to ES by using bulk requests. From ES metrics, we observed some bulk thread pool rejections. Contacted AWS team, their explanation is part of bulk request was rejected. Response body should include status for each item. For bulk thread pool rejection, the error code is 429. > >> > Our flink app override FailureHandler to process error cases. > >> > I checked Flink code, it has AfterBulk() method to handle item errors. FailureHandler() never received any 429 error. > >> > Is that flink issue? Or we need to config something to make it work? > >> > Thanks, > >> > > >> > Qihua |
Free forum by Nabble | Edit this page |