Flink Kafka consumer with low latency requirement

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Kafka consumer with low latency requirement

xwang355

Dear Flink experts,

I am experimenting Flink for a use case where there is a tight latency requirements.

A stackoverflow article suggests that I can use setParallism(n) to process a Kafka partition in a multi-threaded way. My understanding is there is still one kafka consumer per partition, but by using setParallelism, I can spin up multiple worker threads to process the messages read from the consumer. 

And according to Fabian`s comments in this link:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-Flink-Kafka-connector-has-max-pending-offsets-concept-td28119.html
Flink is able to manage the offset correctly (commit in the right order).

Here is my questions, let`s say there is a Kafka topic with only one partition, and I setup a consumer with setParallism(2). Hypothetically,  worker threads call out to a REST service which may get slow or stuck periodically. If I want to make sure that the consumer overall is making progress even in face of a 'slow woker'. In other words, I`d like to have  multiple pending but uncommitted offsets by the fast worker even when the other worker is stuck. Is there such a knob  to tune in Flink? 

From my own experiment, I use Kafka consume group tool to to monitor the offset lag,  soon as one worker thread is stuck, the other cannot make any progress either. I really want the fast worker still progress to certain extend. For this use case, exactly once processing is not required. 

Thanks for helping.
Ben

  
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka consumer with low latency requirement

Fabian Hueske-2
Hi Ben,

Flink's Kafka consumers track their progress independent of any worker.
They keep track of the reading offset for themselves (committing progress to Kafka is optional and only necessary to have progress monitoring in Kafka's metrics).
As soon as a consumer reads and forwards an event, it is considered to be read. This means, the progress of the downstream worker does not influence the progress tracking at all.

In case of a topic with a single partition, you can use a consumer with parallelism 1 and connect a worker task with a higher parallelism to it.
The single consumer task will send the read events round-robin to the worker tasks.

Best, Fabian

Am Fr., 21. Juni 2019 um 05:48 Uhr schrieb wang xuchen <[hidden email]>:

Dear Flink experts,

I am experimenting Flink for a use case where there is a tight latency requirements.

A stackoverflow article suggests that I can use setParallism(n) to process a Kafka partition in a multi-threaded way. My understanding is there is still one kafka consumer per partition, but by using setParallelism, I can spin up multiple worker threads to process the messages read from the consumer. 

And according to Fabian`s comments in this link:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-Flink-Kafka-connector-has-max-pending-offsets-concept-td28119.html
Flink is able to manage the offset correctly (commit in the right order).

Here is my questions, let`s say there is a Kafka topic with only one partition, and I setup a consumer with setParallism(2). Hypothetically,  worker threads call out to a REST service which may get slow or stuck periodically. If I want to make sure that the consumer overall is making progress even in face of a 'slow woker'. In other words, I`d like to have  multiple pending but uncommitted offsets by the fast worker even when the other worker is stuck. Is there such a knob  to tune in Flink? 

From my own experiment, I use Kafka consume group tool to to monitor the offset lag,  soon as one worker thread is stuck, the other cannot make any progress either. I really want the fast worker still progress to certain extend. For this use case, exactly once processing is not required. 

Thanks for helping.
Ben

  
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka consumer with low latency requirement

xwang355
This post was updated on .
Fabian,

Thank you for replying.

If I understand your previous comment correctly, I setup up a consumer with
parallelism 1 and connect a worker task with parallelism 2.

If worker thread one is making a block call and stuck for 60s, the consumer
thread should continue fetching from the partition and feeding thread two.
From my reading of Flink documentation, if checkpointing is enabled, the
consumer should commit its own internal state back to Kafka to show progress
to external monitoring tool.

If that`s the case, during the 60s when thread one is stuck, checkpoint
should all succeed, thread two continuing chucking along merrily. Even
though the highest offset committed is the one less than the offset hold by
thread 1. After 60s, I should see a huge jump from the monitoring tool due
to the fact the thread 1 has released the offset and all offsets consumed by
thread 2 during the 60s can be committed.

However, what I have observed is that the as soon as thread one get stuck,
checkpointing is choked, consumer thread stopped feeding thread two and the
whole pipeline became stagnant.


Here are the thread dump of the two worker threads

"Map (1/2)" #59 prio=5 os_prio=31 tid=0x00007ffe29049800 nid=0xb603 in Object.wait() [0x0000700007520000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.Object.wait(Object.java:502)
        at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:539)
        - locked <0x00000007baf84040> (a java.util.ArrayDeque)
        at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
        at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Map (2/2)" #58 prio=5 os_prio=31 tid=0x00007ffe25057000 nid=0x14803 waiting on condition [0x000070000741d000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at com.dataartisans.ReadFromKafka.doWork(ReadFromKafka.java:94)
        at com.dataartisans.ReadFromKafka.access$000(ReadFromKafka.java:28)
        at com.dataartisans.ReadFromKafka$1.map(ReadFromKafka.java:158)
        at com.dataartisans.ReadFromKafka$1.map(ReadFromKafka.java:151)
        at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        - locked <0x00000007bb06e538> (a java.lang.Object)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

Could you please help me understand this behavior.

Thanks again.
Ben



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka consumer with low latency requirement

xwang355
In reply to this post by Fabian Hueske-2
Fabian,

Does the above stack trace looks like a deadlock?

        at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:539)
        - locked <0x00000007baf84040> (a java.util.ArrayDeque)
        at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka consumer with low latency requirement

Fabian Hueske-2
Hi,

What kind of function do you use to implement the operator that has the blocking call?
Did you have a look at the AsyncIO operator? It was designed for exactly such use cases.
It issues multiple asynchronous requests to an external service and waits for the response.

Best, Fabian

Am Mo., 24. Juni 2019 um 17:01 Uhr schrieb xwang355 <[hidden email]>:
Fabian,

Does the above stack trace looks like a deadlock?

        at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:539)
        - locked <0x00000007baf84040> (a java.util.ArrayDeque)
        at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka consumer with low latency requirement

xwang355
This post was updated on .
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka consumer with low latency requirement

xwang355
This post was updated on .
      env.enableCheckpointing(2000);
      FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("kf-events", new SimpleStringSchema(), properties2);
                ...
                messageStream.rebalance().map(new MapFunction<String, String>() {
                       ...
                        @Override
                        public String map(String value) throws Exception {
                                long tid = Thread.currentThread().getId();
                                doWork(tid);
                        }

                }).setParallelism(2).print();
...

// The thread with the lowest tid sleeps 60s when counter reaches 1000
private static void doWork(long tid) throws InterruptedException
{
               
  if (!sortedTid.contains(tid)) {
     sortedTid.add(tid);
  }

   // simulate a straggler, make the thread with the lowest tid a slow processor
   if (sortedTid.first() == tid) {
       if (counter++ == 1000){
          Thread.sleep(60,000);
        }

        Thread.sleep(20);
    } else {
        Thread.sleep(20);
                       
    }
}

By using setParallelism(2), I expect to have two threads processing records from the same Kafka partition. However, if one thread is choked in doWork, the other thread can not make progress either. The consumer offset lagging starts to build up as the result.

What I`d like to achieve is that, the 'healthy' thread continues to make progress to some extend (not forever), even though the the choked thread is holding the earlier offset from been committed, but the subsequent records in the partition are processed.

If the offset that the unhealthy thread eventually aborts, it is OK to reprocess  offsets again. For example, doWork does credit card fraud checking, I don`t one bad transaction to hold off all customers` credit card usage.

--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/