ES sink never receive error code

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

ES sink never receive error code

Qihua Yang
Hello, 
We are using flink-connector-elasticsearch6_2.11 to ingest stream data to ES by using bulk requests. From ES metrics, we observed some bulk thread pool rejections. Contacted AWS team, their explanation is part of bulk request was rejected. Response body should include status for each item. For bulk thread pool rejection, the error code is 429.
Our flink app override FailureHandler to process error cases.
I checked Flink code, it has AfterBulk() method to handle item errors. FailureHandler() never received any 429 error. 
Is that flink issue? Or we need to config something to make it work?
Thanks,

Qihua
Reply | Threaded
Open this post in threaded view
|

Re: ES sink never receive error code

Roman Khachatryan
Hi,

Have you tried to change bulk.flush.backoff.enable?
According to the docs [1], the underlying ES BulkProcessor will retry
(by default), so the provided failure handler might not be called.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor

Regards,
Roman

On Thu, May 20, 2021 at 10:08 PM Qihua Yang <[hidden email]> wrote:
>
> Hello,
> We are using flink-connector-elasticsearch6_2.11 to ingest stream data to ES by using bulk requests. From ES metrics, we observed some bulk thread pool rejections. Contacted AWS team, their explanation is part of bulk request was rejected. Response body should include status for each item. For bulk thread pool rejection, the error code is 429.
> Our flink app override FailureHandler to process error cases.
> I checked Flink code, it has AfterBulk() method to handle item errors. FailureHandler() never received any 429 error.
> Is that flink issue? Or we need to config something to make it work?
> Thanks,
>
> Qihua
Reply | Threaded
Open this post in threaded view
|

Re: ES sink never receive error code

Qihua Yang
Thank you for the reply!
Yes, we did config bulk.flush.backoff.enable.
So, ES BulkProcessor retried after bulk request was partially rejected. And eventually that request was sent successfully? That is why failure handler was not called?

Thanks,
Qihua

On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan <[hidden email]> wrote:
Hi,

Have you tried to change bulk.flush.backoff.enable?
According to the docs [1], the underlying ES BulkProcessor will retry
(by default), so the provided failure handler might not be called.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor

Regards,
Roman

On Thu, May 20, 2021 at 10:08 PM Qihua Yang <[hidden email]> wrote:
>
> Hello,
> We are using flink-connector-elasticsearch6_2.11 to ingest stream data to ES by using bulk requests. From ES metrics, we observed some bulk thread pool rejections. Contacted AWS team, their explanation is part of bulk request was rejected. Response body should include status for each item. For bulk thread pool rejection, the error code is 429.
> Our flink app override FailureHandler to process error cases.
> I checked Flink code, it has AfterBulk() method to handle item errors. FailureHandler() never received any 429 error.
> Is that flink issue? Or we need to config something to make it work?
> Thanks,
>
> Qihua
Reply | Threaded
Open this post in threaded view
|

Re: ES sink never receive error code

Yangze Guo
> So, ES BulkProcessor retried after bulk request was partially rejected. And eventually that request was sent successfully? That is why failure handler was not called?

If the bulk request fails after the max number of retries
(bulk.flush.backoff.retries), the failure handler will still be
called.


Best,
Yangze Guo

On Fri, May 21, 2021 at 5:53 AM Qihua Yang <[hidden email]> wrote:

>
> Thank you for the reply!
> Yes, we did config bulk.flush.backoff.enable.
> So, ES BulkProcessor retried after bulk request was partially rejected. And eventually that request was sent successfully? That is why failure handler was not called?
>
> Thanks,
> Qihua
>
> On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan <[hidden email]> wrote:
>>
>> Hi,
>>
>> Have you tried to change bulk.flush.backoff.enable?
>> According to the docs [1], the underlying ES BulkProcessor will retry
>> (by default), so the provided failure handler might not be called.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor
>>
>> Regards,
>> Roman
>>
>> On Thu, May 20, 2021 at 10:08 PM Qihua Yang <[hidden email]> wrote:
>> >
>> > Hello,
>> > We are using flink-connector-elasticsearch6_2.11 to ingest stream data to ES by using bulk requests. From ES metrics, we observed some bulk thread pool rejections. Contacted AWS team, their explanation is part of bulk request was rejected. Response body should include status for each item. For bulk thread pool rejection, the error code is 429.
>> > Our flink app override FailureHandler to process error cases.
>> > I checked Flink code, it has AfterBulk() method to handle item errors. FailureHandler() never received any 429 error.
>> > Is that flink issue? Or we need to config something to make it work?
>> > Thanks,
>> >
>> > Qihua
Reply | Threaded
Open this post in threaded view
|

Re: ES sink never receive error code

Qihua Yang
Got it! thanks for helping.

On Thu, May 20, 2021 at 7:15 PM Yangze Guo <[hidden email]> wrote:
> So, ES BulkProcessor retried after bulk request was partially rejected. And eventually that request was sent successfully? That is why failure handler was not called?

If the bulk request fails after the max number of retries
(bulk.flush.backoff.retries), the failure handler will still be
called.


Best,
Yangze Guo

On Fri, May 21, 2021 at 5:53 AM Qihua Yang <[hidden email]> wrote:
>
> Thank you for the reply!
> Yes, we did config bulk.flush.backoff.enable.
> So, ES BulkProcessor retried after bulk request was partially rejected. And eventually that request was sent successfully? That is why failure handler was not called?
>
> Thanks,
> Qihua
>
> On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan <[hidden email]> wrote:
>>
>> Hi,
>>
>> Have you tried to change bulk.flush.backoff.enable?
>> According to the docs [1], the underlying ES BulkProcessor will retry
>> (by default), so the provided failure handler might not be called.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor
>>
>> Regards,
>> Roman
>>
>> On Thu, May 20, 2021 at 10:08 PM Qihua Yang <[hidden email]> wrote:
>> >
>> > Hello,
>> > We are using flink-connector-elasticsearch6_2.11 to ingest stream data to ES by using bulk requests. From ES metrics, we observed some bulk thread pool rejections. Contacted AWS team, their explanation is part of bulk request was rejected. Response body should include status for each item. For bulk thread pool rejection, the error code is 429.
>> > Our flink app override FailureHandler to process error cases.
>> > I checked Flink code, it has AfterBulk() method to handle item errors. FailureHandler() never received any 429 error.
>> > Is that flink issue? Or we need to config something to make it work?
>> > Thanks,
>> >
>> > Qihua
Reply | Threaded
Open this post in threaded view
|

回复: ES sink never receive error code

Jacky Yin 殷传旺
If you are using es connector 6.*, actually there is a deadlock bug if the backoff is enabled. The 'retry' and 'flush' share one thread pool which has only one thread. Sometimes the one holding the thread tries to get the semaphore which is hold by the one who tries to get the thread. Therefore please upgrade to connector 7.*.  
 

发件人: Qihua Yang <[hidden email]>
发送时间: 2021年5月24日 23:17
收件人: Yangze Guo <[hidden email]>
抄送: [hidden email] <[hidden email]>; user <[hidden email]>
主题: Re: ES sink never receive error code
 
Got it! thanks for helping.

On Thu, May 20, 2021 at 7:15 PM Yangze Guo <[hidden email]> wrote:
> So, ES BulkProcessor retried after bulk request was partially rejected. And eventually that request was sent successfully? That is why failure handler was not called?

If the bulk request fails after the max number of retries
(bulk.flush.backoff.retries), the failure handler will still be
called.


Best,
Yangze Guo

On Fri, May 21, 2021 at 5:53 AM Qihua Yang <[hidden email]> wrote:
>
> Thank you for the reply!
> Yes, we did config bulk.flush.backoff.enable.
> So, ES BulkProcessor retried after bulk request was partially rejected. And eventually that request was sent successfully? That is why failure handler was not called?
>
> Thanks,
> Qihua
>
> On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan <[hidden email]> wrote:
>>
>> Hi,
>>
>> Have you tried to change bulk.flush.backoff.enable?
>> According to the docs [1], the underlying ES BulkProcessor will retry
>> (by default), so the provided failure handler might not be called.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor
>>
>> Regards,
>> Roman
>>
>> On Thu, May 20, 2021 at 10:08 PM Qihua Yang <[hidden email]> wrote:
>> >
>> > Hello,
>> > We are using flink-connector-elasticsearch6_2.11 to ingest stream data to ES by using bulk requests. From ES metrics, we observed some bulk thread pool rejections. Contacted AWS team, their explanation is part of bulk request was rejected. Response body should include status for each item. For bulk thread pool rejection, the error code is 429.
>> > Our flink app override FailureHandler to process error cases.
>> > I checked Flink code, it has AfterBulk() method to handle item errors. FailureHandler() never received any 429 error.
>> > Is that flink issue? Or we need to config something to make it work?
>> > Thanks,
>> >
>> > Qihua
Reply | Threaded
Open this post in threaded view
|

Re: ES sink never receive error code

Yangze Guo
Jacky is right. It's a known issue and will be fixed in FLINK-21511.

Best,
Yangze Guo

On Tue, May 25, 2021 at 8:40 AM Jacky Yin 殷传旺 <[hidden email]> wrote:

>
> If you are using es connector 6.*, actually there is a deadlock bug if the backoff is enabled. The 'retry' and 'flush' share one thread pool which has only one thread. Sometimes the one holding the thread tries to get the semaphore which is hold by the one who tries to get the thread. Therefore please upgrade to connector 7.*.
>
> ________________________________
> 发件人: Qihua Yang <[hidden email]>
> 发送时间: 2021年5月24日 23:17
> 收件人: Yangze Guo <[hidden email]>
> 抄送: [hidden email] <[hidden email]>; user <[hidden email]>
> 主题: Re: ES sink never receive error code
>
> Got it! thanks for helping.
>
> On Thu, May 20, 2021 at 7:15 PM Yangze Guo <[hidden email]> wrote:
>
> > So, ES BulkProcessor retried after bulk request was partially rejected. And eventually that request was sent successfully? That is why failure handler was not called?
>
> If the bulk request fails after the max number of retries
> (bulk.flush.backoff.retries), the failure handler will still be
> called.
>
>
> Best,
> Yangze Guo
>
> On Fri, May 21, 2021 at 5:53 AM Qihua Yang <[hidden email]> wrote:
> >
> > Thank you for the reply!
> > Yes, we did config bulk.flush.backoff.enable.
> > So, ES BulkProcessor retried after bulk request was partially rejected. And eventually that request was sent successfully? That is why failure handler was not called?
> >
> > Thanks,
> > Qihua
> >
> > On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan <[hidden email]> wrote:
> >>
> >> Hi,
> >>
> >> Have you tried to change bulk.flush.backoff.enable?
> >> According to the docs [1], the underlying ES BulkProcessor will retry
> >> (by default), so the provided failure handler might not be called.
> >>
> >> [1]
> >> https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor
> >>
> >> Regards,
> >> Roman
> >>
> >> On Thu, May 20, 2021 at 10:08 PM Qihua Yang <[hidden email]> wrote:
> >> >
> >> > Hello,
> >> > We are using flink-connector-elasticsearch6_2.11 to ingest stream data to ES by using bulk requests. From ES metrics, we observed some bulk thread pool rejections. Contacted AWS team, their explanation is part of bulk request was rejected. Response body should include status for each item. For bulk thread pool rejection, the error code is 429.
> >> > Our flink app override FailureHandler to process error cases.
> >> > I checked Flink code, it has AfterBulk() method to handle item errors. FailureHandler() never received any 429 error.
> >> > Is that flink issue? Or we need to config something to make it work?
> >> > Thanks,
> >> >
> >> > Qihua