Stoppable Job And Web UI Questions

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Stoppable Job And Web UI Questions

Yan Chou Chen
Several new questions:
- Stoppable job
I read threads mentioning that a streaming job can be stopped [1][2].
However looks like it can only be called through command line. Is it
possible to programmatically stop the streaming job from within the
job itself? For instance, a Kafka consumer streaming job reaches
predefined condition, then call stop() from within e.g. MapFunction?

- Web UI (jobmanager-host:8081) information
I have a Kafka consumer which reads records from Kafka. In web ui's
Subtasks tab where it has "Records sent", does it imply the records
read by consumer? For instance, I deliver say 1k string record
(SimpleStringSchema) to Kafka; can I expect 1k "Records sent"
displayed on web ui once all those records read by consumer?

This leads to another question. I have a streaming job which exploits
map function e.g. stream.map(new MyMapFunction). Within the
MyMapFunction impl I count per input and write the count to external
places. Later on I sum the count value for MyMapFunction based on
Parallelism supplied. So for example I run map(MyMapFunction) with 4
parallelism, MyMapFunction processes 400, 500, 400, 500 count
respectively. Later on the sum of all count is 1800. However this sum
value is different from web ui which has higher "Record sent" e.g. 8k.
Does that mean "Records sent" in web ui does not mean the records
processed by MyMapFunction? How do I interpret the value in this
column or how can I know if all messages delivered to Kafka are fully
processed i.e. 1k records delivered to Kafka and 1k records read out
of Kafka?

Thanks.

[1]. http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3C57155C30.8010401@...%3E

[2]. https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/StoppableFunction.html
Reply | Threaded
Open this post in threaded view
|

Re: Stoppable Job And Web UI Questions

Till Rohrmann

Hi Yan Chou Chen,

  1. At the moment Flink sources have to implement a certain interface, StoppableFunction, to be stoppable. If they do, then you can stop them via the CLI or the web interface. This cannot be triggered from within a job.

    However, you have a far better way to properly terminate a Flink job with your custom sources. Simply terminate the SourceFunction (leaving the read loop) once you’ve detected that you’ve met your termination criterion. Once all sources have done that, the job will be properly terminated and go into the state FINISHED. That has the advantage that you reach a consensus when to terminate. Otherwise there might be a dictator which orders the other tasks to stop even though they might still have some work left to do.

  2. The number of records sent is the sum of all records sent by this task. These records include the watermarks as well as the actual stream records containing your data (read from Kafka). As such, this number will always be an upper bound for the number of actually read records (e.g. from Kafka) by your source.

  3. Given that also others might deliver messages to the same Kafka topic and that you have multiple partitions, I think it is not easy to know when your 1000 messages have been processed.

If you’re the only one who writes to this Kafka topic, you can use an accumulator to count the number of messages sent. The accumulator is live updated in the web ui’s tasks overview (if you click on the job and then the tab accumulators).

input.map(new RichMapFunction<Integer, Integer>() {
            IntCounter intCounter = null;

            @Override
            public void open(Configuration config) {
                intCounter = this.getRuntimeContext().getIntCounter("messages");
            }
            @Override
            public Integer map(Integer integer) throws Exception {
                intCounter.add(1);
                return integer;
            }
        })

Cheers,
Till


On Wed, Jun 22, 2016 at 4:39 PM, Yan Chou Chen <[hidden email]> wrote:
Several new questions:
- Stoppable job
I read threads mentioning that a streaming job can be stopped [1][2].
However looks like it can only be called through command line. Is it
possible to programmatically stop the streaming job from within the
job itself? For instance, a Kafka consumer streaming job reaches
predefined condition, then call stop() from within e.g. MapFunction?

- Web UI (jobmanager-host:8081) information
I have a Kafka consumer which reads records from Kafka. In web ui's
Subtasks tab where it has "Records sent", does it imply the records
read by consumer? For instance, I deliver say 1k string record
(SimpleStringSchema) to Kafka; can I expect 1k "Records sent"
displayed on web ui once all those records read by consumer?

This leads to another question. I have a streaming job which exploits
map function e.g. stream.map(new MyMapFunction). Within the
MyMapFunction impl I count per input and write the count to external
places. Later on I sum the count value for MyMapFunction based on
Parallelism supplied. So for example I run map(MyMapFunction) with 4
parallelism, MyMapFunction processes 400, 500, 400, 500 count
respectively. Later on the sum of all count is 1800. However this sum
value is different from web ui which has higher "Record sent" e.g. 8k.
Does that mean "Records sent" in web ui does not mean the records
processed by MyMapFunction? How do I interpret the value in this
column or how can I know if all messages delivered to Kafka are fully
processed i.e. 1k records delivered to Kafka and 1k records read out
of Kafka?

Thanks.

[1]. http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3C57155C30.8010401@...%3E

[2]. https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/StoppableFunction.html