Back-pressure Status shows OK but records are backed up in kafka

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Back-pressure Status shows OK but records are backed up in kafka

Jins George
I have a Beam Pipeline consuming records from Kafka doing some
transformations and writing it to Hbase. I faced an issue in which
records were writing to Hbase at a slower rate than the incoming
messages to Kafka due to a temporary surge in the incoming traffic.

 From the flink UI, if I check the back pressure status, it shows OK. I
have one task which has all the operators including source.

Any idea why backpressure indicator would show OK, but messages are
backed up in Kafka.

Is there any other mechanism/metrics by which I can identify this
situation ?

I'm running Flink 1.2/w beam 2.0.

Thanks,
Jins George
Reply | Threaded
Open this post in threaded view
|

Re: Back-pressure Status shows OK but records are backed up in kafka

Ufuk Celebi
Hey Jins,

our current back pressure tracking mechanism does not work with Kafka
sources. To gather back pressure indicators we sample the main task
thread of a subtask. For most tasks, this is the thread that emits
records downstream (e.g. if you have a map function) and everything
works as expected. In case of the Kafka source though there is a
separate thread that consumes from Kafka and emits the records.
Therefore we sample the "wrong" thread and don't observe any
indicators for back pressure. :-( Unfortunately, this was not taking
into account when back pressure sampling was implemented.

There is this old issue to track this:
https://issues.apache.org/jira/browse/FLINK-3456

I'm not aware of any other way to track this situation. Maybe others
can chime in here...

– Ufuk


On Mon, Jan 8, 2018 at 8:16 AM, Jins George <[hidden email]> wrote:

> I have a Beam Pipeline consuming records from Kafka doing some
> transformations and writing it to Hbase. I faced an issue in which records
> were writing to Hbase at a slower rate than the incoming messages to Kafka
> due to a temporary surge in the incoming traffic.
>
> From the flink UI, if I check the back pressure status, it shows OK. I have
> one task which has all the operators including source.
>
> Any idea why backpressure indicator would show OK, but messages are backed
> up in Kafka.
>
> Is there any other mechanism/metrics by which I can identify this situation
> ?
>
> I'm running Flink 1.2/w beam 2.0.
>
> Thanks,
> Jins George
Reply | Threaded
Open this post in threaded view
|

Re: Back-pressure Status shows OK but records are backed up in kafka

Shannon Carey
Right, backpressure only measures backpressure on the inside of the Flink job. Ie. between Flink tasks.

Therefore, it’s up to you to monitor whether your Flink job is “keeping up” with the source stream. If you’re using Kafka, there’s a metric that the consumer library makes available. For example, for one of our jobs, in Graphite we have a metric that matches:

aliasByNode(stats.gauges.myapp.prod.us-east-1.myapp.*.taskmanager.*.job.com.example.$Job.operator.*.*.KafkaConsumer.records-lag-max, 18, 19)

The “$Job” is a variable which allows you to select the job. You can see that I have wildcards on other elements of the path, for example the TaskManager id, the operator name, the Task index, etc. Your metric is probably rooted somewhere else, but the thing you’re looking for is under operator.*.*.KafkaConsumer.records-lag-max.

Flink manages its offsets itself, rather than acting like a “normal” consumer which commits offsets to Kafka. However, in the docs I see that “setCommitOffsetsOnCheckpoints()” is enabled by default.  So, theoretically you can use any sort of tool similar to https://github.com/srotya/kafka-lag-monitor or https://github.com/linkedin/Burrow etc. which polls Kafka itself and produces metrics about consumer lag. However, for some reason, I don’t see our Flink consumer metrics showing up in our lag monitoring tool or in the Kafka command-line tools, so I’m not sure what’s going on there. Maybe it’s because Flink doesn’t show up as a consumer group? At first I thought that it might be because we’re not setting the “group.id” property, but as it turns out we are indeed setting it. In any case, we have to use the job’s metrics, and monitor that the job is up, rather than monitoring the offset in Kafka itself.

-Shannon

On 1/8/18, 1:52 AM, "Ufuk Celebi" <[hidden email]> wrote:

    Hey Jins,
   
    our current back pressure tracking mechanism does not work with Kafka
    sources. To gather back pressure indicators we sample the main task
    thread of a subtask. For most tasks, this is the thread that emits
    records downstream (e.g. if you have a map function) and everything
    works as expected. In case of the Kafka source though there is a
    separate thread that consumes from Kafka and emits the records.
    Therefore we sample the "wrong" thread and don't observe any
    indicators for back pressure. :-( Unfortunately, this was not taking
    into account when back pressure sampling was implemented.
   
    There is this old issue to track this:
    https://issues.apache.org/jira/browse/FLINK-3456
   
    I'm not aware of any other way to track this situation. Maybe others
    can chime in here...
   
    – Ufuk
   
   
    On Mon, Jan 8, 2018 at 8:16 AM, Jins George <[hidden email]> wrote:
    > I have a Beam Pipeline consuming records from Kafka doing some
    > transformations and writing it to Hbase. I faced an issue in which records
    > were writing to Hbase at a slower rate than the incoming messages to Kafka
    > due to a temporary surge in the incoming traffic.
    >
    > From the flink UI, if I check the back pressure status, it shows OK. I have
    > one task which has all the operators including source.
    >
    > Any idea why backpressure indicator would show OK, but messages are backed
    > up in Kafka.
    >
    > Is there any other mechanism/metrics by which I can identify this situation
    > ?
    >
    > I'm running Flink 1.2/w beam 2.0.
    >
    > Thanks,
    > Jins George
   
   

Reply | Threaded
Open this post in threaded view
|

Re: Back-pressure Status shows OK but records are backed up in kafka

Jins George
Thank You Ufuk & Shannon. Since my kafka consumer is
UnboundedKafkaSource from BEAM, not sure if  records-lag-max metrics is
exposed. Let me research further.

Thanks,
Jins George
On 01/08/2018 10:11 AM, Shannon Carey wrote:

> Right, backpressure only measures backpressure on the inside of the Flink job. Ie. between Flink tasks.
>
> Therefore, it’s up to you to monitor whether your Flink job is “keeping up” with the source stream. If you’re using Kafka, there’s a metric that the consumer library makes available. For example, for one of our jobs, in Graphite we have a metric that matches:
>
> aliasByNode(stats.gauges.myapp.prod.us-east-1.myapp.*.taskmanager.*.job.com.example.$Job.operator.*.*.KafkaConsumer.records-lag-max, 18, 19)
>
> The “$Job” is a variable which allows you to select the job. You can see that I have wildcards on other elements of the path, for example the TaskManager id, the operator name, the Task index, etc. Your metric is probably rooted somewhere else, but the thing you’re looking for is under operator.*.*.KafkaConsumer.records-lag-max.
>
> Flink manages its offsets itself, rather than acting like a “normal” consumer which commits offsets to Kafka. However, in the docs I see that “setCommitOffsetsOnCheckpoints()” is enabled by default.  So, theoretically you can use any sort of tool similar to https://github.com/srotya/kafka-lag-monitor or https://github.com/linkedin/Burrow etc. which polls Kafka itself and produces metrics about consumer lag. However, for some reason, I don’t see our Flink consumer metrics showing up in our lag monitoring tool or in the Kafka command-line tools, so I’m not sure what’s going on there. Maybe it’s because Flink doesn’t show up as a consumer group? At first I thought that it might be because we’re not setting the “group.id” property, but as it turns out we are indeed setting it. In any case, we have to use the job’s metrics, and monitor that the job is up, rather than monitoring the offset in Kafka itself.
>
> -Shannon
>
> On 1/8/18, 1:52 AM, "Ufuk Celebi" <[hidden email]> wrote:
>
>      Hey Jins,
>      
>      our current back pressure tracking mechanism does not work with Kafka
>      sources. To gather back pressure indicators we sample the main task
>      thread of a subtask. For most tasks, this is the thread that emits
>      records downstream (e.g. if you have a map function) and everything
>      works as expected. In case of the Kafka source though there is a
>      separate thread that consumes from Kafka and emits the records.
>      Therefore we sample the "wrong" thread and don't observe any
>      indicators for back pressure. :-( Unfortunately, this was not taking
>      into account when back pressure sampling was implemented.
>      
>      There is this old issue to track this:
>      https://issues.apache.org/jira/browse/FLINK-3456
>      
>      I'm not aware of any other way to track this situation. Maybe others
>      can chime in here...
>      
>      – Ufuk
>      
>      
>      On Mon, Jan 8, 2018 at 8:16 AM, Jins George <[hidden email]> wrote:
>      > I have a Beam Pipeline consuming records from Kafka doing some
>      > transformations and writing it to Hbase. I faced an issue in which records
>      > were writing to Hbase at a slower rate than the incoming messages to Kafka
>      > due to a temporary surge in the incoming traffic.
>      >
>      > From the flink UI, if I check the back pressure status, it shows OK. I have
>      > one task which has all the operators including source.
>      >
>      > Any idea why backpressure indicator would show OK, but messages are backed
>      > up in Kafka.
>      >
>      > Is there any other mechanism/metrics by which I can identify this situation
>      > ?
>      >
>      > I'm running Flink 1.2/w beam 2.0.
>      >
>      > Thanks,
>      > Jins George
>      
>      
>