I have a Beam Pipeline consuming records from Kafka doing some
transformations and writing it to Hbase. I faced an issue in which records were writing to Hbase at a slower rate than the incoming messages to Kafka due to a temporary surge in the incoming traffic. From the flink UI, if I check the back pressure status, it shows OK. I have one task which has all the operators including source. Any idea why backpressure indicator would show OK, but messages are backed up in Kafka. Is there any other mechanism/metrics by which I can identify this situation ? I'm running Flink 1.2/w beam 2.0. Thanks, Jins George |
Hey Jins,
our current back pressure tracking mechanism does not work with Kafka sources. To gather back pressure indicators we sample the main task thread of a subtask. For most tasks, this is the thread that emits records downstream (e.g. if you have a map function) and everything works as expected. In case of the Kafka source though there is a separate thread that consumes from Kafka and emits the records. Therefore we sample the "wrong" thread and don't observe any indicators for back pressure. :-( Unfortunately, this was not taking into account when back pressure sampling was implemented. There is this old issue to track this: https://issues.apache.org/jira/browse/FLINK-3456 I'm not aware of any other way to track this situation. Maybe others can chime in here... – Ufuk On Mon, Jan 8, 2018 at 8:16 AM, Jins George <[hidden email]> wrote: > I have a Beam Pipeline consuming records from Kafka doing some > transformations and writing it to Hbase. I faced an issue in which records > were writing to Hbase at a slower rate than the incoming messages to Kafka > due to a temporary surge in the incoming traffic. > > From the flink UI, if I check the back pressure status, it shows OK. I have > one task which has all the operators including source. > > Any idea why backpressure indicator would show OK, but messages are backed > up in Kafka. > > Is there any other mechanism/metrics by which I can identify this situation > ? > > I'm running Flink 1.2/w beam 2.0. > > Thanks, > Jins George |
Right, backpressure only measures backpressure on the inside of the Flink job. Ie. between Flink tasks.
Therefore, it’s up to you to monitor whether your Flink job is “keeping up” with the source stream. If you’re using Kafka, there’s a metric that the consumer library makes available. For example, for one of our jobs, in Graphite we have a metric that matches: aliasByNode(stats.gauges.myapp.prod.us-east-1.myapp.*.taskmanager.*.job.com.example.$Job.operator.*.*.KafkaConsumer.records-lag-max, 18, 19) The “$Job” is a variable which allows you to select the job. You can see that I have wildcards on other elements of the path, for example the TaskManager id, the operator name, the Task index, etc. Your metric is probably rooted somewhere else, but the thing you’re looking for is under operator.*.*.KafkaConsumer.records-lag-max. Flink manages its offsets itself, rather than acting like a “normal” consumer which commits offsets to Kafka. However, in the docs I see that “setCommitOffsetsOnCheckpoints()” is enabled by default. So, theoretically you can use any sort of tool similar to https://github.com/srotya/kafka-lag-monitor or https://github.com/linkedin/Burrow etc. which polls Kafka itself and produces metrics about consumer lag. However, for some reason, I don’t see our Flink consumer metrics showing up in our lag monitoring tool or in the Kafka command-line tools, so I’m not sure what’s going on there. Maybe it’s because Flink doesn’t show up as a consumer group? At first I thought that it might be because we’re not setting the “group.id” property, but as it turns out we are indeed setting it. In any case, we have to use the job’s metrics, and monitor that the job is up, rather than monitoring the offset in Kafka itself. -Shannon On 1/8/18, 1:52 AM, "Ufuk Celebi" <[hidden email]> wrote: Hey Jins, our current back pressure tracking mechanism does not work with Kafka sources. To gather back pressure indicators we sample the main task thread of a subtask. For most tasks, this is the thread that emits records downstream (e.g. if you have a map function) and everything works as expected. In case of the Kafka source though there is a separate thread that consumes from Kafka and emits the records. Therefore we sample the "wrong" thread and don't observe any indicators for back pressure. :-( Unfortunately, this was not taking into account when back pressure sampling was implemented. There is this old issue to track this: https://issues.apache.org/jira/browse/FLINK-3456 I'm not aware of any other way to track this situation. Maybe others can chime in here... – Ufuk On Mon, Jan 8, 2018 at 8:16 AM, Jins George <[hidden email]> wrote: > I have a Beam Pipeline consuming records from Kafka doing some > transformations and writing it to Hbase. I faced an issue in which records > were writing to Hbase at a slower rate than the incoming messages to Kafka > due to a temporary surge in the incoming traffic. > > From the flink UI, if I check the back pressure status, it shows OK. I have > one task which has all the operators including source. > > Any idea why backpressure indicator would show OK, but messages are backed > up in Kafka. > > Is there any other mechanism/metrics by which I can identify this situation > ? > > I'm running Flink 1.2/w beam 2.0. > > Thanks, > Jins George |
Thank You Ufuk & Shannon. Since my kafka consumer is
UnboundedKafkaSource from BEAM, not sure if records-lag-max metrics is exposed. Let me research further. Thanks, Jins George On 01/08/2018 10:11 AM, Shannon Carey wrote: > Right, backpressure only measures backpressure on the inside of the Flink job. Ie. between Flink tasks. > > Therefore, it’s up to you to monitor whether your Flink job is “keeping up” with the source stream. If you’re using Kafka, there’s a metric that the consumer library makes available. For example, for one of our jobs, in Graphite we have a metric that matches: > > aliasByNode(stats.gauges.myapp.prod.us-east-1.myapp.*.taskmanager.*.job.com.example.$Job.operator.*.*.KafkaConsumer.records-lag-max, 18, 19) > > The “$Job” is a variable which allows you to select the job. You can see that I have wildcards on other elements of the path, for example the TaskManager id, the operator name, the Task index, etc. Your metric is probably rooted somewhere else, but the thing you’re looking for is under operator.*.*.KafkaConsumer.records-lag-max. > > Flink manages its offsets itself, rather than acting like a “normal” consumer which commits offsets to Kafka. However, in the docs I see that “setCommitOffsetsOnCheckpoints()” is enabled by default. So, theoretically you can use any sort of tool similar to https://github.com/srotya/kafka-lag-monitor or https://github.com/linkedin/Burrow etc. which polls Kafka itself and produces metrics about consumer lag. However, for some reason, I don’t see our Flink consumer metrics showing up in our lag monitoring tool or in the Kafka command-line tools, so I’m not sure what’s going on there. Maybe it’s because Flink doesn’t show up as a consumer group? At first I thought that it might be because we’re not setting the “group.id” property, but as it turns out we are indeed setting it. In any case, we have to use the job’s metrics, and monitor that the job is up, rather than monitoring the offset in Kafka itself. > > -Shannon > > On 1/8/18, 1:52 AM, "Ufuk Celebi" <[hidden email]> wrote: > > Hey Jins, > > our current back pressure tracking mechanism does not work with Kafka > sources. To gather back pressure indicators we sample the main task > thread of a subtask. For most tasks, this is the thread that emits > records downstream (e.g. if you have a map function) and everything > works as expected. In case of the Kafka source though there is a > separate thread that consumes from Kafka and emits the records. > Therefore we sample the "wrong" thread and don't observe any > indicators for back pressure. :-( Unfortunately, this was not taking > into account when back pressure sampling was implemented. > > There is this old issue to track this: > https://issues.apache.org/jira/browse/FLINK-3456 > > I'm not aware of any other way to track this situation. Maybe others > can chime in here... > > – Ufuk > > > On Mon, Jan 8, 2018 at 8:16 AM, Jins George <[hidden email]> wrote: > > I have a Beam Pipeline consuming records from Kafka doing some > > transformations and writing it to Hbase. I faced an issue in which records > > were writing to Hbase at a slower rate than the incoming messages to Kafka > > due to a temporary surge in the incoming traffic. > > > > From the flink UI, if I check the back pressure status, it shows OK. I have > > one task which has all the operators including source. > > > > Any idea why backpressure indicator would show OK, but messages are backed > > up in Kafka. > > > > Is there any other mechanism/metrics by which I can identify this situation > > ? > > > > I'm running Flink 1.2/w beam 2.0. > > > > Thanks, > > Jins George > > > |
Free forum by Nabble | Edit this page |