Checkpoint takes long with FlinkKafkaConsumer

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Checkpoint takes long with FlinkKafkaConsumer

Hironori Ogibayashi
Hello,

I am running Flink job which reads topics from Kafka and write results
to Redis. I use FsStatebackend with HDFS.

I noticed that taking checkpoint takes serveral minutes and sometimes expires.
---
2016-06-14 17:25:40,734 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Completed checkpoint 1456 (in 257956 ms)
2016-06-14 17:25:40,735 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Triggering checkpoint 1457 @ 1465892740734
2016-06-14 17:35:40,735 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Checkpoint 1457 expired before completing.
2016-06-14 17:35:40,736 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Triggering checkpoint 1458 @ 1465893340735
2016-06-14 17:45:40,736 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Checkpoint 1458 expired before completing.
2016-06-14 17:45:40,737 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Triggering checkpoint 1459 @ 1465893940736
2016-06-14 17:55:40,738 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Checkpoint 1459 expired before completing.
2016-06-14 17:55:40,739 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Triggering checkpoint 1460 @ 1465894540738
---

According to WebUI, checkpoint size is just 1MB. Why checkpointing
takes so long?

I took jstack during checkpointing. It looks that checkpointing thread
is blocked in commitOffsets.

----
"Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
[0x00007f2b3ddfc000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
        - waiting to lock <0x0000000659111b58> (a
org.apache.kafka.clients.consumer.KafkaConsumer)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
        - locked <0x0000000659111cc8> (a java.lang.Object)
        at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
---

Blocker is this.

---
"Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
[0x00007f2b3dbfa000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
        - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
        - locked <0x0000000659457db8> (a java.util.Collections$UnmodifiableSet)
        - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
        at org.apache.kafka.common.network.Selector.select(Selector.java:425)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
        - locked <0x0000000659111b58> (a
org.apache.kafka.clients.consumer.KafkaConsumer)
---

If someone could advise me of the cause or the way to investigate
further, that would be appreciated.

Thanks,
Hironori
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint takes long with FlinkKafkaConsumer

Kostas Kloudas
Hello Hironori,

Are you using the latest Flink version?
There were some changes in the FlinkConsumer in the latest releases.

Thanks,
Kostas

> On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi <[hidden email]> wrote:
>
> Hello,
>
> I am running Flink job which reads topics from Kafka and write results
> to Redis. I use FsStatebackend with HDFS.
>
> I noticed that taking checkpoint takes serveral minutes and sometimes expires.
> ---
> 2016-06-14 17:25:40,734 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Completed checkpoint 1456 (in 257956 ms)
> 2016-06-14 17:25:40,735 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 1457 @ 1465892740734
> 2016-06-14 17:35:40,735 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Checkpoint 1457 expired before completing.
> 2016-06-14 17:35:40,736 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 1458 @ 1465893340735
> 2016-06-14 17:45:40,736 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Checkpoint 1458 expired before completing.
> 2016-06-14 17:45:40,737 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 1459 @ 1465893940736
> 2016-06-14 17:55:40,738 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Checkpoint 1459 expired before completing.
> 2016-06-14 17:55:40,739 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 1460 @ 1465894540738
> ---
>
> According to WebUI, checkpoint size is just 1MB. Why checkpointing
> takes so long?
>
> I took jstack during checkpointing. It looks that checkpointing thread
> is blocked in commitOffsets.
>
> ----
> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
> [0x00007f2b3ddfc000]
>   java.lang.Thread.State: BLOCKED (on object monitor)
>        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
>        - waiting to lock <0x0000000659111b58> (a
> org.apache.kafka.clients.consumer.KafkaConsumer)
>        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
>        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
>        at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
>        - locked <0x0000000659111cc8> (a java.lang.Object)
>        at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
>        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>        at java.lang.Thread.run(Thread.java:745)
> ---
>
> Blocker is this.
>
> ---
> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
> [0x00007f2b3dbfa000]
>   java.lang.Thread.State: RUNNABLE
>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>        - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
>        - locked <0x0000000659457db8> (a java.util.Collections$UnmodifiableSet)
>        - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>        at org.apache.kafka.common.network.Selector.select(Selector.java:425)
>        at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
>        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
>        - locked <0x0000000659111b58> (a
> org.apache.kafka.clients.consumer.KafkaConsumer)
> ---
>
> If someone could advise me of the cause or the way to investigate
> further, that would be appreciated.
>
> Thanks,
> Hironori

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint takes long with FlinkKafkaConsumer

Hironori Ogibayashi
Kostas,

Thank you for your response.
Yes, I am using latest Flink, which is 1.0.3.

Thanks,
Hironori

2016-06-14 19:02 GMT+09:00 Kostas Kloudas <[hidden email]>:

> Hello Hironori,
>
> Are you using the latest Flink version?
> There were some changes in the FlinkConsumer in the latest releases.
>
> Thanks,
> Kostas
>
>> On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi <[hidden email]> wrote:
>>
>> Hello,
>>
>> I am running Flink job which reads topics from Kafka and write results
>> to Redis. I use FsStatebackend with HDFS.
>>
>> I noticed that taking checkpoint takes serveral minutes and sometimes expires.
>> ---
>> 2016-06-14 17:25:40,734 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Completed checkpoint 1456 (in 257956 ms)
>> 2016-06-14 17:25:40,735 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Triggering checkpoint 1457 @ 1465892740734
>> 2016-06-14 17:35:40,735 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Checkpoint 1457 expired before completing.
>> 2016-06-14 17:35:40,736 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Triggering checkpoint 1458 @ 1465893340735
>> 2016-06-14 17:45:40,736 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Checkpoint 1458 expired before completing.
>> 2016-06-14 17:45:40,737 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Triggering checkpoint 1459 @ 1465893940736
>> 2016-06-14 17:55:40,738 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Checkpoint 1459 expired before completing.
>> 2016-06-14 17:55:40,739 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Triggering checkpoint 1460 @ 1465894540738
>> ---
>>
>> According to WebUI, checkpoint size is just 1MB. Why checkpointing
>> takes so long?
>>
>> I took jstack during checkpointing. It looks that checkpointing thread
>> is blocked in commitOffsets.
>>
>> ----
>> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
>> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
>> [0x00007f2b3ddfc000]
>>   java.lang.Thread.State: BLOCKED (on object monitor)
>>        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
>>        - waiting to lock <0x0000000659111b58> (a
>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
>>        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
>>        at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
>>        - locked <0x0000000659111cc8> (a java.lang.Object)
>>        at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
>>        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>        at java.lang.Thread.run(Thread.java:745)
>> ---
>>
>> Blocker is this.
>>
>> ---
>> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
>> [0x00007f2b3dbfa000]
>>   java.lang.Thread.State: RUNNABLE
>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>>        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>>        - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
>>        - locked <0x0000000659457db8> (a java.util.Collections$UnmodifiableSet)
>>        - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>>        at org.apache.kafka.common.network.Selector.select(Selector.java:425)
>>        at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>>        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>>        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>>        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>>        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>>        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
>>        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>>        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
>>        - locked <0x0000000659111b58> (a
>> org.apache.kafka.clients.consumer.KafkaConsumer)
>> ---
>>
>> If someone could advise me of the cause or the way to investigate
>> further, that would be appreciated.
>>
>> Thanks,
>> Hironori
>
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint takes long with FlinkKafkaConsumer

Kostas Kloudas
Hi Hironori,

Could you also provide the logs of the taskManager?

As you described, it seems that the consumer is stuck in the polling loop, although Flink polls with
a timeout. This would normally mean that periodically it should release the lock for the checkpoints to go through.

The logs of the task manager can help at clarifying why this does not happen.

Thanks,
Kostas

> On Jun 14, 2016, at 12:48 PM, Hironori Ogibayashi <[hidden email]> wrote:
>
> Kostas,
>
> Thank you for your response.
> Yes, I am using latest Flink, which is 1.0.3.
>
> Thanks,
> Hironori
>
> 2016-06-14 19:02 GMT+09:00 Kostas Kloudas <[hidden email]>:
>> Hello Hironori,
>>
>> Are you using the latest Flink version?
>> There were some changes in the FlinkConsumer in the latest releases.
>>
>> Thanks,
>> Kostas
>>
>>> On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi <[hidden email]> wrote:
>>>
>>> Hello,
>>>
>>> I am running Flink job which reads topics from Kafka and write results
>>> to Redis. I use FsStatebackend with HDFS.
>>>
>>> I noticed that taking checkpoint takes serveral minutes and sometimes expires.
>>> ---
>>> 2016-06-14 17:25:40,734 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Completed checkpoint 1456 (in 257956 ms)
>>> 2016-06-14 17:25:40,735 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Triggering checkpoint 1457 @ 1465892740734
>>> 2016-06-14 17:35:40,735 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Checkpoint 1457 expired before completing.
>>> 2016-06-14 17:35:40,736 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Triggering checkpoint 1458 @ 1465893340735
>>> 2016-06-14 17:45:40,736 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Checkpoint 1458 expired before completing.
>>> 2016-06-14 17:45:40,737 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Triggering checkpoint 1459 @ 1465893940736
>>> 2016-06-14 17:55:40,738 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Checkpoint 1459 expired before completing.
>>> 2016-06-14 17:55:40,739 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Triggering checkpoint 1460 @ 1465894540738
>>> ---
>>>
>>> According to WebUI, checkpoint size is just 1MB. Why checkpointing
>>> takes so long?
>>>
>>> I took jstack during checkpointing. It looks that checkpointing thread
>>> is blocked in commitOffsets.
>>>
>>> ----
>>> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
>>> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
>>> [0x00007f2b3ddfc000]
>>>  java.lang.Thread.State: BLOCKED (on object monitor)
>>>       at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
>>>       - waiting to lock <0x0000000659111b58> (a
>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>>       at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
>>>       at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
>>>       at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
>>>       - locked <0x0000000659111cc8> (a java.lang.Object)
>>>       at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
>>>       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>       at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>>       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>       at java.lang.Thread.run(Thread.java:745)
>>> ---
>>>
>>> Blocker is this.
>>>
>>> ---
>>> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
>>> [0x00007f2b3dbfa000]
>>>  java.lang.Thread.State: RUNNABLE
>>>       at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>       at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>>>       at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>>>       at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>>>       - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
>>>       - locked <0x0000000659457db8> (a java.util.Collections$UnmodifiableSet)
>>>       - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
>>>       at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>>>       at org.apache.kafka.common.network.Selector.select(Selector.java:425)
>>>       at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>>>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>>>       at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>>>       at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>>>       at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>>>       at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
>>>       - locked <0x0000000659111b58> (a
>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>> ---
>>>
>>> If someone could advise me of the cause or the way to investigate
>>> further, that would be appreciated.
>>>
>>> Thanks,
>>> Hironori
>>

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint takes long with FlinkKafkaConsumer

Hironori Ogibayashi
Kostas,

I have attached a log file from one of the taskManager. (The same host
I executed jstack)
I noticed that there are lots of "Marking the coordinator 2147482645
dead" message in the log.
MyContinuousProcessingTimeTriggerGlobal in the log is my custom
trigger which is based on
ContinuousProcessingTimeTrigger but clean up windows when it received
specific log records.

Thanks,
Hironori

2016-06-14 21:23 GMT+09:00 Kostas Kloudas <[hidden email]>:

> Hi Hironori,
>
> Could you also provide the logs of the taskManager?
>
> As you described, it seems that the consumer is stuck in the polling loop, although Flink polls with
> a timeout. This would normally mean that periodically it should release the lock for the checkpoints to go through.
>
> The logs of the task manager can help at clarifying why this does not happen.
>
> Thanks,
> Kostas
>
>> On Jun 14, 2016, at 12:48 PM, Hironori Ogibayashi <[hidden email]> wrote:
>>
>> Kostas,
>>
>> Thank you for your response.
>> Yes, I am using latest Flink, which is 1.0.3.
>>
>> Thanks,
>> Hironori
>>
>> 2016-06-14 19:02 GMT+09:00 Kostas Kloudas <[hidden email]>:
>>> Hello Hironori,
>>>
>>> Are you using the latest Flink version?
>>> There were some changes in the FlinkConsumer in the latest releases.
>>>
>>> Thanks,
>>> Kostas
>>>
>>>> On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi <[hidden email]> wrote:
>>>>
>>>> Hello,
>>>>
>>>> I am running Flink job which reads topics from Kafka and write results
>>>> to Redis. I use FsStatebackend with HDFS.
>>>>
>>>> I noticed that taking checkpoint takes serveral minutes and sometimes expires.
>>>> ---
>>>> 2016-06-14 17:25:40,734 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Completed checkpoint 1456 (in 257956 ms)
>>>> 2016-06-14 17:25:40,735 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Triggering checkpoint 1457 @ 1465892740734
>>>> 2016-06-14 17:35:40,735 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Checkpoint 1457 expired before completing.
>>>> 2016-06-14 17:35:40,736 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Triggering checkpoint 1458 @ 1465893340735
>>>> 2016-06-14 17:45:40,736 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Checkpoint 1458 expired before completing.
>>>> 2016-06-14 17:45:40,737 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Triggering checkpoint 1459 @ 1465893940736
>>>> 2016-06-14 17:55:40,738 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Checkpoint 1459 expired before completing.
>>>> 2016-06-14 17:55:40,739 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Triggering checkpoint 1460 @ 1465894540738
>>>> ---
>>>>
>>>> According to WebUI, checkpoint size is just 1MB. Why checkpointing
>>>> takes so long?
>>>>
>>>> I took jstack during checkpointing. It looks that checkpointing thread
>>>> is blocked in commitOffsets.
>>>>
>>>> ----
>>>> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
>>>> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
>>>> [0x00007f2b3ddfc000]
>>>>  java.lang.Thread.State: BLOCKED (on object monitor)
>>>>       at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
>>>>       - waiting to lock <0x0000000659111b58> (a
>>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>>>       at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
>>>>       at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
>>>>       at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
>>>>       - locked <0x0000000659111cc8> (a java.lang.Object)
>>>>       at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
>>>>       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>>       at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>>>       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>       at java.lang.Thread.run(Thread.java:745)
>>>> ---
>>>>
>>>> Blocker is this.
>>>>
>>>> ---
>>>> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
>>>> [0x00007f2b3dbfa000]
>>>>  java.lang.Thread.State: RUNNABLE
>>>>       at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>>       at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>>>>       at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>>>>       at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>>>>       - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
>>>>       - locked <0x0000000659457db8> (a java.util.Collections$UnmodifiableSet)
>>>>       - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
>>>>       at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>>>>       at org.apache.kafka.common.network.Selector.select(Selector.java:425)
>>>>       at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>>>>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>>>>       at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>>>>       at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>>>>       at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
>>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>>>>       at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
>>>>       - locked <0x0000000659111b58> (a
>>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>>> ---
>>>>
>>>> If someone could advise me of the cause or the way to investigate
>>>> further, that would be appreciated.
>>>>
>>>> Thanks,
>>>> Hironori
>>>
>

flink-flink-taskmanager-0-FLINK1503.log.gz (122K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint takes long with FlinkKafkaConsumer

Kostas Kloudas
Hello Hironori,

The logs just show that you get stuck in the Kafka consumer polling loop, 
which does not allow the consumer lock to be released. Thus the Flink
part of the consumer is never actually called.

To my understanding this does not seem to be a Flink issue.
Or at least this is not shown from the logs.

From googling a bit, I found this:


which relates the problem to network issues. 

Have you tried posting the problem also to the Kafka mailing list?
Can it be that the kafka broker fails and tries to reconnect but does not 
make it?

Kostas

On Jun 14, 2016, at 2:59 PM, Hironori Ogibayashi <[hidden email]> wrote:

Kostas,

I have attached a log file from one of the taskManager. (The same host
I executed jstack)
I noticed that there are lots of "Marking the coordinator 2147482645
dead" message in the log.
MyContinuousProcessingTimeTriggerGlobal in the log is my custom
trigger which is based on
ContinuousProcessingTimeTrigger but clean up windows when it received
specific log records.

Thanks,
Hironori

2016-06-14 21:23 GMT+09:00 Kostas Kloudas <[hidden email]>:
Hi Hironori,

Could you also provide the logs of the taskManager?

As you described, it seems that the consumer is stuck in the polling loop, although Flink polls with
a timeout. This would normally mean that periodically it should release the lock for the checkpoints to go through.

The logs of the task manager can help at clarifying why this does not happen.

Thanks,
Kostas

On Jun 14, 2016, at 12:48 PM, Hironori Ogibayashi <[hidden email]> wrote:

Kostas,

Thank you for your response.
Yes, I am using latest Flink, which is 1.0.3.

Thanks,
Hironori

2016-06-14 19:02 GMT+09:00 Kostas Kloudas <[hidden email]>:
Hello Hironori,

Are you using the latest Flink version?
There were some changes in the FlinkConsumer in the latest releases.

Thanks,
Kostas

On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi <[hidden email]> wrote:

Hello,

I am running Flink job which reads topics from Kafka and write results
to Redis. I use FsStatebackend with HDFS.

I noticed that taking checkpoint takes serveral minutes and sometimes expires.
---
2016-06-14 17:25:40,734 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Completed checkpoint 1456 (in 257956 ms)
2016-06-14 17:25:40,735 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Triggering checkpoint 1457 @ 1465892740734
2016-06-14 17:35:40,735 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Checkpoint 1457 expired before completing.
2016-06-14 17:35:40,736 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Triggering checkpoint 1458 @ 1465893340735
2016-06-14 17:45:40,736 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Checkpoint 1458 expired before completing.
2016-06-14 17:45:40,737 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Triggering checkpoint 1459 @ 1465893940736
2016-06-14 17:55:40,738 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Checkpoint 1459 expired before completing.
2016-06-14 17:55:40,739 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Triggering checkpoint 1460 @ 1465894540738
---

According to WebUI, checkpoint size is just 1MB. Why checkpointing
takes so long?

I took jstack during checkpointing. It looks that checkpointing thread
is blocked in commitOffsets.

----
"Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
[0x00007f2b3ddfc000]
java.lang.Thread.State: BLOCKED (on object monitor)
     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
     - waiting to lock <0x0000000659111b58> (a
org.apache.kafka.clients.consumer.KafkaConsumer)
     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
     at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
     - locked <0x0000000659111cc8> (a java.lang.Object)
     at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
     at java.util.concurrent.FutureTask.run(FutureTask.java:262)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
     at java.lang.Thread.run(Thread.java:745)
---

Blocker is this.

---
"Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
[0x00007f2b3dbfa000]
java.lang.Thread.State: RUNNABLE
     at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
     at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
     at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
     at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
     - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
     - locked <0x0000000659457db8> (a java.util.Collections$UnmodifiableSet)
     - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
     at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
     at org.apache.kafka.common.network.Selector.select(Selector.java:425)
     at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
     at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
     at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
     - locked <0x0000000659111b58> (a
org.apache.kafka.clients.consumer.KafkaConsumer)
---

If someone could advise me of the cause or the way to investigate
further, that would be appreciated.

Thanks,
Hironori


<flink-flink-taskmanager-0-FLINK1503.log.gz>

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint takes long with FlinkKafkaConsumer

Hironori Ogibayashi
Kostas,

Thank you for your advise. I have posted my question to the Kafka mailing list.
I think Kafka brokers are fine because no errors on producer side with
15,000 msg/sec and
from OS metrics, all of my brokers receives almost the same amount of
network traffic.

Thanks,
Hironori




2016-06-14 22:40 GMT+09:00 Kostas Kloudas <[hidden email]>:

> Hello Hironori,
>
> The logs just show that you get stuck in the Kafka consumer polling loop,
> which does not allow the consumer lock to be released. Thus the Flink
> part of the consumer is never actually called.
>
> To my understanding this does not seem to be a Flink issue.
> Or at least this is not shown from the logs.
>
> From googling a bit, I found this:
>
> http://stackoverflow.com/questions/35636739/kafka-consumer-marking-the-coordinator-2147483647-dead
>
> which relates the problem to network issues.
>
> Have you tried posting the problem also to the Kafka mailing list?
> Can it be that the kafka broker fails and tries to reconnect but does not
> make it?
>
> Kostas
>
> On Jun 14, 2016, at 2:59 PM, Hironori Ogibayashi <[hidden email]>
> wrote:
>
> Kostas,
>
> I have attached a log file from one of the taskManager. (The same host
> I executed jstack)
> I noticed that there are lots of "Marking the coordinator 2147482645
> dead" message in the log.
> MyContinuousProcessingTimeTriggerGlobal in the log is my custom
> trigger which is based on
> ContinuousProcessingTimeTrigger but clean up windows when it received
> specific log records.
>
> Thanks,
> Hironori
>
> 2016-06-14 21:23 GMT+09:00 Kostas Kloudas <[hidden email]>:
>
> Hi Hironori,
>
> Could you also provide the logs of the taskManager?
>
> As you described, it seems that the consumer is stuck in the polling loop,
> although Flink polls with
> a timeout. This would normally mean that periodically it should release the
> lock for the checkpoints to go through.
>
> The logs of the task manager can help at clarifying why this does not
> happen.
>
> Thanks,
> Kostas
>
> On Jun 14, 2016, at 12:48 PM, Hironori Ogibayashi <[hidden email]>
> wrote:
>
> Kostas,
>
> Thank you for your response.
> Yes, I am using latest Flink, which is 1.0.3.
>
> Thanks,
> Hironori
>
> 2016-06-14 19:02 GMT+09:00 Kostas Kloudas <[hidden email]>:
>
> Hello Hironori,
>
> Are you using the latest Flink version?
> There were some changes in the FlinkConsumer in the latest releases.
>
> Thanks,
> Kostas
>
> On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi <[hidden email]>
> wrote:
>
> Hello,
>
> I am running Flink job which reads topics from Kafka and write results
> to Redis. I use FsStatebackend with HDFS.
>
> I noticed that taking checkpoint takes serveral minutes and sometimes
> expires.
> ---
> 2016-06-14 17:25:40,734 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Completed checkpoint 1456 (in 257956 ms)
> 2016-06-14 17:25:40,735 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 1457 @ 1465892740734
> 2016-06-14 17:35:40,735 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Checkpoint 1457 expired before completing.
> 2016-06-14 17:35:40,736 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 1458 @ 1465893340735
> 2016-06-14 17:45:40,736 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Checkpoint 1458 expired before completing.
> 2016-06-14 17:45:40,737 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 1459 @ 1465893940736
> 2016-06-14 17:55:40,738 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Checkpoint 1459 expired before completing.
> 2016-06-14 17:55:40,739 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Triggering checkpoint 1460 @ 1465894540738
> ---
>
> According to WebUI, checkpoint size is just 1MB. Why checkpointing
> takes so long?
>
> I took jstack during checkpointing. It looks that checkpointing thread
> is blocked in commitOffsets.
>
> ----
> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
> [0x00007f2b3ddfc000]
> java.lang.Thread.State: BLOCKED (on object monitor)
>      at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
>      - waiting to lock <0x0000000659111b58> (a
> org.apache.kafka.clients.consumer.KafkaConsumer)
>      at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
>      at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
>      at
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
>      - locked <0x0000000659111cc8> (a java.lang.Object)
>      at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
>      at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>      at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>      at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>      at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>      at java.lang.Thread.run(Thread.java:745)
> ---
>
> Blocker is this.
>
> ---
> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
> [0x00007f2b3dbfa000]
> java.lang.Thread.State: RUNNABLE
>      at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>      at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>      at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>      at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>      - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
>      - locked <0x0000000659457db8> (a java.util.Collections$UnmodifiableSet)
>      - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
>      at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>      at org.apache.kafka.common.network.Selector.select(Selector.java:425)
>      at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>      at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>      at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>      at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>      at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
>      at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>      at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
>      - locked <0x0000000659111b58> (a
> org.apache.kafka.clients.consumer.KafkaConsumer)
> ---
>
> If someone could advise me of the cause or the way to investigate
> further, that would be appreciated.
>
> Thanks,
> Hironori
>
>
>
> <flink-flink-taskmanager-0-FLINK1503.log.gz>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint takes long with FlinkKafkaConsumer

Ufuk Celebi
Hey Hironori,

thanks for reporting this. Could you please update this thread when
you have more information from the Kafka list?

– Ufuk


On Wed, Jun 15, 2016 at 2:48 PM, Hironori Ogibayashi
<[hidden email]> wrote:

> Kostas,
>
> Thank you for your advise. I have posted my question to the Kafka mailing list.
> I think Kafka brokers are fine because no errors on producer side with
> 15,000 msg/sec and
> from OS metrics, all of my brokers receives almost the same amount of
> network traffic.
>
> Thanks,
> Hironori
>
>
>
>
> 2016-06-14 22:40 GMT+09:00 Kostas Kloudas <[hidden email]>:
>> Hello Hironori,
>>
>> The logs just show that you get stuck in the Kafka consumer polling loop,
>> which does not allow the consumer lock to be released. Thus the Flink
>> part of the consumer is never actually called.
>>
>> To my understanding this does not seem to be a Flink issue.
>> Or at least this is not shown from the logs.
>>
>> From googling a bit, I found this:
>>
>> http://stackoverflow.com/questions/35636739/kafka-consumer-marking-the-coordinator-2147483647-dead
>>
>> which relates the problem to network issues.
>>
>> Have you tried posting the problem also to the Kafka mailing list?
>> Can it be that the kafka broker fails and tries to reconnect but does not
>> make it?
>>
>> Kostas
>>
>> On Jun 14, 2016, at 2:59 PM, Hironori Ogibayashi <[hidden email]>
>> wrote:
>>
>> Kostas,
>>
>> I have attached a log file from one of the taskManager. (The same host
>> I executed jstack)
>> I noticed that there are lots of "Marking the coordinator 2147482645
>> dead" message in the log.
>> MyContinuousProcessingTimeTriggerGlobal in the log is my custom
>> trigger which is based on
>> ContinuousProcessingTimeTrigger but clean up windows when it received
>> specific log records.
>>
>> Thanks,
>> Hironori
>>
>> 2016-06-14 21:23 GMT+09:00 Kostas Kloudas <[hidden email]>:
>>
>> Hi Hironori,
>>
>> Could you also provide the logs of the taskManager?
>>
>> As you described, it seems that the consumer is stuck in the polling loop,
>> although Flink polls with
>> a timeout. This would normally mean that periodically it should release the
>> lock for the checkpoints to go through.
>>
>> The logs of the task manager can help at clarifying why this does not
>> happen.
>>
>> Thanks,
>> Kostas
>>
>> On Jun 14, 2016, at 12:48 PM, Hironori Ogibayashi <[hidden email]>
>> wrote:
>>
>> Kostas,
>>
>> Thank you for your response.
>> Yes, I am using latest Flink, which is 1.0.3.
>>
>> Thanks,
>> Hironori
>>
>> 2016-06-14 19:02 GMT+09:00 Kostas Kloudas <[hidden email]>:
>>
>> Hello Hironori,
>>
>> Are you using the latest Flink version?
>> There were some changes in the FlinkConsumer in the latest releases.
>>
>> Thanks,
>> Kostas
>>
>> On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi <[hidden email]>
>> wrote:
>>
>> Hello,
>>
>> I am running Flink job which reads topics from Kafka and write results
>> to Redis. I use FsStatebackend with HDFS.
>>
>> I noticed that taking checkpoint takes serveral minutes and sometimes
>> expires.
>> ---
>> 2016-06-14 17:25:40,734 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Completed checkpoint 1456 (in 257956 ms)
>> 2016-06-14 17:25:40,735 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Triggering checkpoint 1457 @ 1465892740734
>> 2016-06-14 17:35:40,735 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Checkpoint 1457 expired before completing.
>> 2016-06-14 17:35:40,736 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Triggering checkpoint 1458 @ 1465893340735
>> 2016-06-14 17:45:40,736 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Checkpoint 1458 expired before completing.
>> 2016-06-14 17:45:40,737 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Triggering checkpoint 1459 @ 1465893940736
>> 2016-06-14 17:55:40,738 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Checkpoint 1459 expired before completing.
>> 2016-06-14 17:55:40,739 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Triggering checkpoint 1460 @ 1465894540738
>> ---
>>
>> According to WebUI, checkpoint size is just 1MB. Why checkpointing
>> takes so long?
>>
>> I took jstack during checkpointing. It looks that checkpointing thread
>> is blocked in commitOffsets.
>>
>> ----
>> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
>> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
>> [0x00007f2b3ddfc000]
>> java.lang.Thread.State: BLOCKED (on object monitor)
>>      at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
>>      - waiting to lock <0x0000000659111b58> (a
>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>      at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
>>      at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
>>      at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
>>      - locked <0x0000000659111cc8> (a java.lang.Object)
>>      at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
>>      at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>      at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>      at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>      at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>      at java.lang.Thread.run(Thread.java:745)
>> ---
>>
>> Blocker is this.
>>
>> ---
>> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
>> [0x00007f2b3dbfa000]
>> java.lang.Thread.State: RUNNABLE
>>      at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>      at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>>      at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>>      at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>>      - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
>>      - locked <0x0000000659457db8> (a java.util.Collections$UnmodifiableSet)
>>      - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
>>      at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>>      at org.apache.kafka.common.network.Selector.select(Selector.java:425)
>>      at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>>      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>>      at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>>      at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>>      at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>>      at
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
>>      at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>>      at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
>>      - locked <0x0000000659111b58> (a
>> org.apache.kafka.clients.consumer.KafkaConsumer)
>> ---
>>
>> If someone could advise me of the cause or the way to investigate
>> further, that would be appreciated.
>>
>> Thanks,
>> Hironori
>>
>>
>>
>> <flink-flink-taskmanager-0-FLINK1503.log.gz>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint takes long with FlinkKafkaConsumer

Hironori Ogibayashi
Ufuk,

Yes, of course. I will be sure to update when I got some more information.

Hironori

2016-06-16 1:56 GMT+09:00 Ufuk Celebi <[hidden email]>:

> Hey Hironori,
>
> thanks for reporting this. Could you please update this thread when
> you have more information from the Kafka list?
>
> – Ufuk
>
>
> On Wed, Jun 15, 2016 at 2:48 PM, Hironori Ogibayashi
> <[hidden email]> wrote:
>> Kostas,
>>
>> Thank you for your advise. I have posted my question to the Kafka mailing list.
>> I think Kafka brokers are fine because no errors on producer side with
>> 15,000 msg/sec and
>> from OS metrics, all of my brokers receives almost the same amount of
>> network traffic.
>>
>> Thanks,
>> Hironori
>>
>>
>>
>>
>> 2016-06-14 22:40 GMT+09:00 Kostas Kloudas <[hidden email]>:
>>> Hello Hironori,
>>>
>>> The logs just show that you get stuck in the Kafka consumer polling loop,
>>> which does not allow the consumer lock to be released. Thus the Flink
>>> part of the consumer is never actually called.
>>>
>>> To my understanding this does not seem to be a Flink issue.
>>> Or at least this is not shown from the logs.
>>>
>>> From googling a bit, I found this:
>>>
>>> http://stackoverflow.com/questions/35636739/kafka-consumer-marking-the-coordinator-2147483647-dead
>>>
>>> which relates the problem to network issues.
>>>
>>> Have you tried posting the problem also to the Kafka mailing list?
>>> Can it be that the kafka broker fails and tries to reconnect but does not
>>> make it?
>>>
>>> Kostas
>>>
>>> On Jun 14, 2016, at 2:59 PM, Hironori Ogibayashi <[hidden email]>
>>> wrote:
>>>
>>> Kostas,
>>>
>>> I have attached a log file from one of the taskManager. (The same host
>>> I executed jstack)
>>> I noticed that there are lots of "Marking the coordinator 2147482645
>>> dead" message in the log.
>>> MyContinuousProcessingTimeTriggerGlobal in the log is my custom
>>> trigger which is based on
>>> ContinuousProcessingTimeTrigger but clean up windows when it received
>>> specific log records.
>>>
>>> Thanks,
>>> Hironori
>>>
>>> 2016-06-14 21:23 GMT+09:00 Kostas Kloudas <[hidden email]>:
>>>
>>> Hi Hironori,
>>>
>>> Could you also provide the logs of the taskManager?
>>>
>>> As you described, it seems that the consumer is stuck in the polling loop,
>>> although Flink polls with
>>> a timeout. This would normally mean that periodically it should release the
>>> lock for the checkpoints to go through.
>>>
>>> The logs of the task manager can help at clarifying why this does not
>>> happen.
>>>
>>> Thanks,
>>> Kostas
>>>
>>> On Jun 14, 2016, at 12:48 PM, Hironori Ogibayashi <[hidden email]>
>>> wrote:
>>>
>>> Kostas,
>>>
>>> Thank you for your response.
>>> Yes, I am using latest Flink, which is 1.0.3.
>>>
>>> Thanks,
>>> Hironori
>>>
>>> 2016-06-14 19:02 GMT+09:00 Kostas Kloudas <[hidden email]>:
>>>
>>> Hello Hironori,
>>>
>>> Are you using the latest Flink version?
>>> There were some changes in the FlinkConsumer in the latest releases.
>>>
>>> Thanks,
>>> Kostas
>>>
>>> On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi <[hidden email]>
>>> wrote:
>>>
>>> Hello,
>>>
>>> I am running Flink job which reads topics from Kafka and write results
>>> to Redis. I use FsStatebackend with HDFS.
>>>
>>> I noticed that taking checkpoint takes serveral minutes and sometimes
>>> expires.
>>> ---
>>> 2016-06-14 17:25:40,734 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Completed checkpoint 1456 (in 257956 ms)
>>> 2016-06-14 17:25:40,735 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Triggering checkpoint 1457 @ 1465892740734
>>> 2016-06-14 17:35:40,735 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Checkpoint 1457 expired before completing.
>>> 2016-06-14 17:35:40,736 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Triggering checkpoint 1458 @ 1465893340735
>>> 2016-06-14 17:45:40,736 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Checkpoint 1458 expired before completing.
>>> 2016-06-14 17:45:40,737 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Triggering checkpoint 1459 @ 1465893940736
>>> 2016-06-14 17:55:40,738 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Checkpoint 1459 expired before completing.
>>> 2016-06-14 17:55:40,739 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Triggering checkpoint 1460 @ 1465894540738
>>> ---
>>>
>>> According to WebUI, checkpoint size is just 1MB. Why checkpointing
>>> takes so long?
>>>
>>> I took jstack during checkpointing. It looks that checkpointing thread
>>> is blocked in commitOffsets.
>>>
>>> ----
>>> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
>>> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
>>> [0x00007f2b3ddfc000]
>>> java.lang.Thread.State: BLOCKED (on object monitor)
>>>      at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
>>>      - waiting to lock <0x0000000659111b58> (a
>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>>      at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
>>>      at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
>>>      at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
>>>      - locked <0x0000000659111cc8> (a java.lang.Object)
>>>      at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
>>>      at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>>      at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>      at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>      at java.lang.Thread.run(Thread.java:745)
>>> ---
>>>
>>> Blocker is this.
>>>
>>> ---
>>> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
>>> [0x00007f2b3dbfa000]
>>> java.lang.Thread.State: RUNNABLE
>>>      at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>      at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>>>      at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>>>      at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>>>      - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
>>>      - locked <0x0000000659457db8> (a java.util.Collections$UnmodifiableSet)
>>>      - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
>>>      at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>>>      at org.apache.kafka.common.network.Selector.select(Selector.java:425)
>>>      at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>>>      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>>>      at
>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>>>      at
>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>>>      at
>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>>>      at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
>>>      at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>>>      at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
>>>      - locked <0x0000000659111b58> (a
>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>> ---
>>>
>>> If someone could advise me of the cause or the way to investigate
>>> further, that would be appreciated.
>>>
>>> Thanks,
>>> Hironori
>>>
>>>
>>>
>>> <flink-flink-taskmanager-0-FLINK1503.log.gz>
>>>
>>>
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint takes long with FlinkKafkaConsumer

Ufuk Celebi
Thanks :)

On Thu, Jun 16, 2016 at 3:21 PM, Hironori Ogibayashi
<[hidden email]> wrote:

> Ufuk,
>
> Yes, of course. I will be sure to update when I got some more information.
>
> Hironori
>
> 2016-06-16 1:56 GMT+09:00 Ufuk Celebi <[hidden email]>:
>> Hey Hironori,
>>
>> thanks for reporting this. Could you please update this thread when
>> you have more information from the Kafka list?
>>
>> – Ufuk
>>
>>
>> On Wed, Jun 15, 2016 at 2:48 PM, Hironori Ogibayashi
>> <[hidden email]> wrote:
>>> Kostas,
>>>
>>> Thank you for your advise. I have posted my question to the Kafka mailing list.
>>> I think Kafka brokers are fine because no errors on producer side with
>>> 15,000 msg/sec and
>>> from OS metrics, all of my brokers receives almost the same amount of
>>> network traffic.
>>>
>>> Thanks,
>>> Hironori
>>>
>>>
>>>
>>>
>>> 2016-06-14 22:40 GMT+09:00 Kostas Kloudas <[hidden email]>:
>>>> Hello Hironori,
>>>>
>>>> The logs just show that you get stuck in the Kafka consumer polling loop,
>>>> which does not allow the consumer lock to be released. Thus the Flink
>>>> part of the consumer is never actually called.
>>>>
>>>> To my understanding this does not seem to be a Flink issue.
>>>> Or at least this is not shown from the logs.
>>>>
>>>> From googling a bit, I found this:
>>>>
>>>> http://stackoverflow.com/questions/35636739/kafka-consumer-marking-the-coordinator-2147483647-dead
>>>>
>>>> which relates the problem to network issues.
>>>>
>>>> Have you tried posting the problem also to the Kafka mailing list?
>>>> Can it be that the kafka broker fails and tries to reconnect but does not
>>>> make it?
>>>>
>>>> Kostas
>>>>
>>>> On Jun 14, 2016, at 2:59 PM, Hironori Ogibayashi <[hidden email]>
>>>> wrote:
>>>>
>>>> Kostas,
>>>>
>>>> I have attached a log file from one of the taskManager. (The same host
>>>> I executed jstack)
>>>> I noticed that there are lots of "Marking the coordinator 2147482645
>>>> dead" message in the log.
>>>> MyContinuousProcessingTimeTriggerGlobal in the log is my custom
>>>> trigger which is based on
>>>> ContinuousProcessingTimeTrigger but clean up windows when it received
>>>> specific log records.
>>>>
>>>> Thanks,
>>>> Hironori
>>>>
>>>> 2016-06-14 21:23 GMT+09:00 Kostas Kloudas <[hidden email]>:
>>>>
>>>> Hi Hironori,
>>>>
>>>> Could you also provide the logs of the taskManager?
>>>>
>>>> As you described, it seems that the consumer is stuck in the polling loop,
>>>> although Flink polls with
>>>> a timeout. This would normally mean that periodically it should release the
>>>> lock for the checkpoints to go through.
>>>>
>>>> The logs of the task manager can help at clarifying why this does not
>>>> happen.
>>>>
>>>> Thanks,
>>>> Kostas
>>>>
>>>> On Jun 14, 2016, at 12:48 PM, Hironori Ogibayashi <[hidden email]>
>>>> wrote:
>>>>
>>>> Kostas,
>>>>
>>>> Thank you for your response.
>>>> Yes, I am using latest Flink, which is 1.0.3.
>>>>
>>>> Thanks,
>>>> Hironori
>>>>
>>>> 2016-06-14 19:02 GMT+09:00 Kostas Kloudas <[hidden email]>:
>>>>
>>>> Hello Hironori,
>>>>
>>>> Are you using the latest Flink version?
>>>> There were some changes in the FlinkConsumer in the latest releases.
>>>>
>>>> Thanks,
>>>> Kostas
>>>>
>>>> On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi <[hidden email]>
>>>> wrote:
>>>>
>>>> Hello,
>>>>
>>>> I am running Flink job which reads topics from Kafka and write results
>>>> to Redis. I use FsStatebackend with HDFS.
>>>>
>>>> I noticed that taking checkpoint takes serveral minutes and sometimes
>>>> expires.
>>>> ---
>>>> 2016-06-14 17:25:40,734 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Completed checkpoint 1456 (in 257956 ms)
>>>> 2016-06-14 17:25:40,735 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Triggering checkpoint 1457 @ 1465892740734
>>>> 2016-06-14 17:35:40,735 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Checkpoint 1457 expired before completing.
>>>> 2016-06-14 17:35:40,736 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Triggering checkpoint 1458 @ 1465893340735
>>>> 2016-06-14 17:45:40,736 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Checkpoint 1458 expired before completing.
>>>> 2016-06-14 17:45:40,737 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Triggering checkpoint 1459 @ 1465893940736
>>>> 2016-06-14 17:55:40,738 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Checkpoint 1459 expired before completing.
>>>> 2016-06-14 17:55:40,739 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Triggering checkpoint 1460 @ 1465894540738
>>>> ---
>>>>
>>>> According to WebUI, checkpoint size is just 1MB. Why checkpointing
>>>> takes so long?
>>>>
>>>> I took jstack during checkpointing. It looks that checkpointing thread
>>>> is blocked in commitOffsets.
>>>>
>>>> ----
>>>> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
>>>> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
>>>> [0x00007f2b3ddfc000]
>>>> java.lang.Thread.State: BLOCKED (on object monitor)
>>>>      at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
>>>>      - waiting to lock <0x0000000659111b58> (a
>>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>>>      at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
>>>>      at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
>>>>      at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
>>>>      - locked <0x0000000659111cc8> (a java.lang.Object)
>>>>      at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
>>>>      at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>>>      at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>      at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>      at java.lang.Thread.run(Thread.java:745)
>>>> ---
>>>>
>>>> Blocker is this.
>>>>
>>>> ---
>>>> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
>>>> [0x00007f2b3dbfa000]
>>>> java.lang.Thread.State: RUNNABLE
>>>>      at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>>      at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>>>>      at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>>>>      at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>>>>      - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
>>>>      - locked <0x0000000659457db8> (a java.util.Collections$UnmodifiableSet)
>>>>      - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
>>>>      at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>>>>      at org.apache.kafka.common.network.Selector.select(Selector.java:425)
>>>>      at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>>>>      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>>>>      at
>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>>>>      at
>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>>>>      at
>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>>>>      at
>>>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
>>>>      at
>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>>>>      at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
>>>>      - locked <0x0000000659111b58> (a
>>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>>> ---
>>>>
>>>> If someone could advise me of the cause or the way to investigate
>>>> further, that would be appreciated.
>>>>
>>>> Thanks,
>>>> Hironori
>>>>
>>>>
>>>>
>>>> <flink-flink-taskmanager-0-FLINK1503.log.gz>
>>>>
>>>>
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint takes long with FlinkKafkaConsumer

Hironori Ogibayashi
Hi,

Sorry for my late response.
Actually, I received no response in Kafka mailing list and still
cannot find the root cause.
But when I use FlinkKafkaConsumer082, I do not encounter this issue,
so I will use FlinkKafkaConsumer082.

Thanks
Hironori

2016-06-17 2:59 GMT+09:00 Ufuk Celebi <[hidden email]>:

> Thanks :)
>
> On Thu, Jun 16, 2016 at 3:21 PM, Hironori Ogibayashi
> <[hidden email]> wrote:
>> Ufuk,
>>
>> Yes, of course. I will be sure to update when I got some more information.
>>
>> Hironori
>>
>> 2016-06-16 1:56 GMT+09:00 Ufuk Celebi <[hidden email]>:
>>> Hey Hironori,
>>>
>>> thanks for reporting this. Could you please update this thread when
>>> you have more information from the Kafka list?
>>>
>>> – Ufuk
>>>
>>>
>>> On Wed, Jun 15, 2016 at 2:48 PM, Hironori Ogibayashi
>>> <[hidden email]> wrote:
>>>> Kostas,
>>>>
>>>> Thank you for your advise. I have posted my question to the Kafka mailing list.
>>>> I think Kafka brokers are fine because no errors on producer side with
>>>> 15,000 msg/sec and
>>>> from OS metrics, all of my brokers receives almost the same amount of
>>>> network traffic.
>>>>
>>>> Thanks,
>>>> Hironori
>>>>
>>>>
>>>>
>>>>
>>>> 2016-06-14 22:40 GMT+09:00 Kostas Kloudas <[hidden email]>:
>>>>> Hello Hironori,
>>>>>
>>>>> The logs just show that you get stuck in the Kafka consumer polling loop,
>>>>> which does not allow the consumer lock to be released. Thus the Flink
>>>>> part of the consumer is never actually called.
>>>>>
>>>>> To my understanding this does not seem to be a Flink issue.
>>>>> Or at least this is not shown from the logs.
>>>>>
>>>>> From googling a bit, I found this:
>>>>>
>>>>> http://stackoverflow.com/questions/35636739/kafka-consumer-marking-the-coordinator-2147483647-dead
>>>>>
>>>>> which relates the problem to network issues.
>>>>>
>>>>> Have you tried posting the problem also to the Kafka mailing list?
>>>>> Can it be that the kafka broker fails and tries to reconnect but does not
>>>>> make it?
>>>>>
>>>>> Kostas
>>>>>
>>>>> On Jun 14, 2016, at 2:59 PM, Hironori Ogibayashi <[hidden email]>
>>>>> wrote:
>>>>>
>>>>> Kostas,
>>>>>
>>>>> I have attached a log file from one of the taskManager. (The same host
>>>>> I executed jstack)
>>>>> I noticed that there are lots of "Marking the coordinator 2147482645
>>>>> dead" message in the log.
>>>>> MyContinuousProcessingTimeTriggerGlobal in the log is my custom
>>>>> trigger which is based on
>>>>> ContinuousProcessingTimeTrigger but clean up windows when it received
>>>>> specific log records.
>>>>>
>>>>> Thanks,
>>>>> Hironori
>>>>>
>>>>> 2016-06-14 21:23 GMT+09:00 Kostas Kloudas <[hidden email]>:
>>>>>
>>>>> Hi Hironori,
>>>>>
>>>>> Could you also provide the logs of the taskManager?
>>>>>
>>>>> As you described, it seems that the consumer is stuck in the polling loop,
>>>>> although Flink polls with
>>>>> a timeout. This would normally mean that periodically it should release the
>>>>> lock for the checkpoints to go through.
>>>>>
>>>>> The logs of the task manager can help at clarifying why this does not
>>>>> happen.
>>>>>
>>>>> Thanks,
>>>>> Kostas
>>>>>
>>>>> On Jun 14, 2016, at 12:48 PM, Hironori Ogibayashi <[hidden email]>
>>>>> wrote:
>>>>>
>>>>> Kostas,
>>>>>
>>>>> Thank you for your response.
>>>>> Yes, I am using latest Flink, which is 1.0.3.
>>>>>
>>>>> Thanks,
>>>>> Hironori
>>>>>
>>>>> 2016-06-14 19:02 GMT+09:00 Kostas Kloudas <[hidden email]>:
>>>>>
>>>>> Hello Hironori,
>>>>>
>>>>> Are you using the latest Flink version?
>>>>> There were some changes in the FlinkConsumer in the latest releases.
>>>>>
>>>>> Thanks,
>>>>> Kostas
>>>>>
>>>>> On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi <[hidden email]>
>>>>> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I am running Flink job which reads topics from Kafka and write results
>>>>> to Redis. I use FsStatebackend with HDFS.
>>>>>
>>>>> I noticed that taking checkpoint takes serveral minutes and sometimes
>>>>> expires.
>>>>> ---
>>>>> 2016-06-14 17:25:40,734 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Completed checkpoint 1456 (in 257956 ms)
>>>>> 2016-06-14 17:25:40,735 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Triggering checkpoint 1457 @ 1465892740734
>>>>> 2016-06-14 17:35:40,735 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Checkpoint 1457 expired before completing.
>>>>> 2016-06-14 17:35:40,736 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Triggering checkpoint 1458 @ 1465893340735
>>>>> 2016-06-14 17:45:40,736 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Checkpoint 1458 expired before completing.
>>>>> 2016-06-14 17:45:40,737 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Triggering checkpoint 1459 @ 1465893940736
>>>>> 2016-06-14 17:55:40,738 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Checkpoint 1459 expired before completing.
>>>>> 2016-06-14 17:55:40,739 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Triggering checkpoint 1460 @ 1465894540738
>>>>> ---
>>>>>
>>>>> According to WebUI, checkpoint size is just 1MB. Why checkpointing
>>>>> takes so long?
>>>>>
>>>>> I took jstack during checkpointing. It looks that checkpointing thread
>>>>> is blocked in commitOffsets.
>>>>>
>>>>> ----
>>>>> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
>>>>> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
>>>>> [0x00007f2b3ddfc000]
>>>>> java.lang.Thread.State: BLOCKED (on object monitor)
>>>>>      at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
>>>>>      - waiting to lock <0x0000000659111b58> (a
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>>>>      at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
>>>>>      at
>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
>>>>>      at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
>>>>>      - locked <0x0000000659111cc8> (a java.lang.Object)
>>>>>      at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
>>>>>      at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>>>>      at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>      at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>      at java.lang.Thread.run(Thread.java:745)
>>>>> ---
>>>>>
>>>>> Blocker is this.
>>>>>
>>>>> ---
>>>>> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
>>>>> [0x00007f2b3dbfa000]
>>>>> java.lang.Thread.State: RUNNABLE
>>>>>      at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>>>      at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>>>>>      at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>>>>>      at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>>>>>      - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
>>>>>      - locked <0x0000000659457db8> (a java.util.Collections$UnmodifiableSet)
>>>>>      - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
>>>>>      at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>>>>>      at org.apache.kafka.common.network.Selector.select(Selector.java:425)
>>>>>      at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>>>>>      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>>>>>      at
>>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>>>>>      at
>>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>>>>>      at
>>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>>>>>      at
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
>>>>>      at
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>>>>>      at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
>>>>>      - locked <0x0000000659111b58> (a
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>>>> ---
>>>>>
>>>>> If someone could advise me of the cause or the way to investigate
>>>>> further, that would be appreciated.
>>>>>
>>>>> Thanks,
>>>>> Hironori
>>>>>
>>>>>
>>>>>
>>>>> <flink-flink-taskmanager-0-FLINK1503.log.gz>
>>>>>
>>>>>
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint takes long with FlinkKafkaConsumer

Stephan Ewen
I looked into this a bit and it, I think it is a Flink issue:

The blocking is between the poll() and the commitToKafka() calls.
The "commitToKafkaCall()" is not part of the checkpoint, it comes only after the checkpoint. So even if it is not called, this should not block the checkpoint.

What may happen is the following: 
  - Only one async call on the Kafka Consumer may be in progress at any time
  - The triggerCheckpoint() async call cannot happen, because the "notifyCheckpointComplete() -> commitToKafka()" call is stuck.
  - The "notifyCheckpointComplete() -> commitToKafka()" call is stuck, because even though the poll() call may release the lock, but reacquires it too fast. The lock is not fair after all.

Would you be up for trying something out?

  - In the FlinkKafkaConsumer09, create a fair "java.util.concurrent.locks.ReentrantLock", and where ever you have "synchronized (flinkKafkaConsumer.consumer) {", can you replace that by using the fair lock instead?

If that solves it, we'll add that as a fix.

Greetings,
Stephan


On Tue, Jul 5, 2016 at 9:24 AM, Hironori Ogibayashi <[hidden email]> wrote:
Hi,

Sorry for my late response.
Actually, I received no response in Kafka mailing list and still
cannot find the root cause.
But when I use FlinkKafkaConsumer082, I do not encounter this issue,
so I will use FlinkKafkaConsumer082.

Thanks
Hironori

2016-06-17 2:59 GMT+09:00 Ufuk Celebi <[hidden email]>:
> Thanks :)
>
> On Thu, Jun 16, 2016 at 3:21 PM, Hironori Ogibayashi
> <[hidden email]> wrote:
>> Ufuk,
>>
>> Yes, of course. I will be sure to update when I got some more information.
>>
>> Hironori
>>
>> 2016-06-16 1:56 GMT+09:00 Ufuk Celebi <[hidden email]>:
>>> Hey Hironori,
>>>
>>> thanks for reporting this. Could you please update this thread when
>>> you have more information from the Kafka list?
>>>
>>> – Ufuk
>>>
>>>
>>> On Wed, Jun 15, 2016 at 2:48 PM, Hironori Ogibayashi
>>> <[hidden email]> wrote:
>>>> Kostas,
>>>>
>>>> Thank you for your advise. I have posted my question to the Kafka mailing list.
>>>> I think Kafka brokers are fine because no errors on producer side with
>>>> 15,000 msg/sec and
>>>> from OS metrics, all of my brokers receives almost the same amount of
>>>> network traffic.
>>>>
>>>> Thanks,
>>>> Hironori
>>>>
>>>>
>>>>
>>>>
>>>> 2016-06-14 22:40 GMT+09:00 Kostas Kloudas <[hidden email]>:
>>>>> Hello Hironori,
>>>>>
>>>>> The logs just show that you get stuck in the Kafka consumer polling loop,
>>>>> which does not allow the consumer lock to be released. Thus the Flink
>>>>> part of the consumer is never actually called.
>>>>>
>>>>> To my understanding this does not seem to be a Flink issue.
>>>>> Or at least this is not shown from the logs.
>>>>>
>>>>> From googling a bit, I found this:
>>>>>
>>>>> http://stackoverflow.com/questions/35636739/kafka-consumer-marking-the-coordinator-2147483647-dead
>>>>>
>>>>> which relates the problem to network issues.
>>>>>
>>>>> Have you tried posting the problem also to the Kafka mailing list?
>>>>> Can it be that the kafka broker fails and tries to reconnect but does not
>>>>> make it?
>>>>>
>>>>> Kostas
>>>>>
>>>>> On Jun 14, 2016, at 2:59 PM, Hironori Ogibayashi <[hidden email]>
>>>>> wrote:
>>>>>
>>>>> Kostas,
>>>>>
>>>>> I have attached a log file from one of the taskManager. (The same host
>>>>> I executed jstack)
>>>>> I noticed that there are lots of "Marking the coordinator 2147482645
>>>>> dead" message in the log.
>>>>> MyContinuousProcessingTimeTriggerGlobal in the log is my custom
>>>>> trigger which is based on
>>>>> ContinuousProcessingTimeTrigger but clean up windows when it received
>>>>> specific log records.
>>>>>
>>>>> Thanks,
>>>>> Hironori
>>>>>
>>>>> 2016-06-14 21:23 GMT+09:00 Kostas Kloudas <[hidden email]>:
>>>>>
>>>>> Hi Hironori,
>>>>>
>>>>> Could you also provide the logs of the taskManager?
>>>>>
>>>>> As you described, it seems that the consumer is stuck in the polling loop,
>>>>> although Flink polls with
>>>>> a timeout. This would normally mean that periodically it should release the
>>>>> lock for the checkpoints to go through.
>>>>>
>>>>> The logs of the task manager can help at clarifying why this does not
>>>>> happen.
>>>>>
>>>>> Thanks,
>>>>> Kostas
>>>>>
>>>>> On Jun 14, 2016, at 12:48 PM, Hironori Ogibayashi <[hidden email]>
>>>>> wrote:
>>>>>
>>>>> Kostas,
>>>>>
>>>>> Thank you for your response.
>>>>> Yes, I am using latest Flink, which is 1.0.3.
>>>>>
>>>>> Thanks,
>>>>> Hironori
>>>>>
>>>>> 2016-06-14 19:02 GMT+09:00 Kostas Kloudas <[hidden email]>:
>>>>>
>>>>> Hello Hironori,
>>>>>
>>>>> Are you using the latest Flink version?
>>>>> There were some changes in the FlinkConsumer in the latest releases.
>>>>>
>>>>> Thanks,
>>>>> Kostas
>>>>>
>>>>> On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi <[hidden email]>
>>>>> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I am running Flink job which reads topics from Kafka and write results
>>>>> to Redis. I use FsStatebackend with HDFS.
>>>>>
>>>>> I noticed that taking checkpoint takes serveral minutes and sometimes
>>>>> expires.
>>>>> ---
>>>>> 2016-06-14 17:25:40,734 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Completed checkpoint 1456 (in 257956 ms)
>>>>> 2016-06-14 17:25:40,735 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Triggering checkpoint 1457 @ 1465892740734
>>>>> 2016-06-14 17:35:40,735 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Checkpoint 1457 expired before completing.
>>>>> 2016-06-14 17:35:40,736 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Triggering checkpoint 1458 @ 1465893340735
>>>>> 2016-06-14 17:45:40,736 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Checkpoint 1458 expired before completing.
>>>>> 2016-06-14 17:45:40,737 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Triggering checkpoint 1459 @ 1465893940736
>>>>> 2016-06-14 17:55:40,738 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Checkpoint 1459 expired before completing.
>>>>> 2016-06-14 17:55:40,739 INFO
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>>> Triggering checkpoint 1460 @ 1465894540738
>>>>> ---
>>>>>
>>>>> According to WebUI, checkpoint size is just 1MB. Why checkpointing
>>>>> takes so long?
>>>>>
>>>>> I took jstack during checkpointing. It looks that checkpointing thread
>>>>> is blocked in commitOffsets.
>>>>>
>>>>> ----
>>>>> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
>>>>> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
>>>>> [0x00007f2b3ddfc000]
>>>>> java.lang.Thread.State: BLOCKED (on object monitor)
>>>>>      at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
>>>>>      - waiting to lock <0x0000000659111b58> (a
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>>>>      at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
>>>>>      at
>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
>>>>>      at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
>>>>>      - locked <0x0000000659111cc8> (a java.lang.Object)
>>>>>      at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
>>>>>      at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>>>>      at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>      at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>      at java.lang.Thread.run(Thread.java:745)
>>>>> ---
>>>>>
>>>>> Blocker is this.
>>>>>
>>>>> ---
>>>>> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
>>>>> [0x00007f2b3dbfa000]
>>>>> java.lang.Thread.State: RUNNABLE
>>>>>      at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>>>      at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>>>>>      at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>>>>>      at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>>>>>      - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
>>>>>      - locked <0x0000000659457db8> (a java.util.Collections$UnmodifiableSet)
>>>>>      - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
>>>>>      at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>>>>>      at org.apache.kafka.common.network.Selector.select(Selector.java:425)
>>>>>      at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>>>>>      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>>>>>      at
>>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>>>>>      at
>>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>>>>>      at
>>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>>>>>      at
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
>>>>>      at
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>>>>>      at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
>>>>>      - locked <0x0000000659111b58> (a
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>>>> ---
>>>>>
>>>>> If someone could advise me of the cause or the way to investigate
>>>>> further, that would be appreciated.
>>>>>
>>>>> Thanks,
>>>>> Hironori
>>>>>
>>>>>
>>>>>
>>>>> <flink-flink-taskmanager-0-FLINK1503.log.gz>
>>>>>
>>>>>

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint takes long with FlinkKafkaConsumer

Hironori Ogibayashi
Stephan,

Thank you so much for your advise.

I have modified FlinkKafkaConsumer09 source code and running the job
again. It is working well so far (just a few hours.)
I will keep it running this weekend.
Here is the modification detail.
https://gist.github.com/ogibayashi/e5e72e7d47046cbf46cdc897e491a135

By the way, I would like to understand what may be happening.
There are 2 synchronized (flinkKafkaConsumer.consumer) block, one is
in commitOffsets() and the other is in
pollLoop in ConsumerThread.run(). What might be happing is that the
latter acquires lock repeatedly and the former is
blocked for long?
I could not find commitToKafka() or commitToKafkaCall() in Flink code.

Thanks,
Hironori

2016-07-05 17:04 GMT+09:00 Stephan Ewen <[hidden email]>:

> I looked into this a bit and it, I think it is a Flink issue:
>
> The blocking is between the poll() and the commitToKafka() calls.
> The "commitToKafkaCall()" is not part of the checkpoint, it comes only after
> the checkpoint. So even if it is not called, this should not block the
> checkpoint.
>
> What may happen is the following:
>   - Only one async call on the Kafka Consumer may be in progress at any time
>   - The triggerCheckpoint() async call cannot happen, because the
> "notifyCheckpointComplete() -> commitToKafka()" call is stuck.
>   - The "notifyCheckpointComplete() -> commitToKafka()" call is stuck,
> because even though the poll() call may release the lock, but reacquires it
> too fast. The lock is not fair after all.
>
> Would you be up for trying something out?
>
>   - In the FlinkKafkaConsumer09, create a fair
> "java.util.concurrent.locks.ReentrantLock", and where ever you have
> "synchronized (flinkKafkaConsumer.consumer) {", can you replace that by
> using the fair lock instead?
>
> If that solves it, we'll add that as a fix.
>
> Greetings,
> Stephan
>
>
> On Tue, Jul 5, 2016 at 9:24 AM, Hironori Ogibayashi <[hidden email]>
> wrote:
>>
>> Hi,
>>
>> Sorry for my late response.
>> Actually, I received no response in Kafka mailing list and still
>> cannot find the root cause.
>> But when I use FlinkKafkaConsumer082, I do not encounter this issue,
>> so I will use FlinkKafkaConsumer082.
>>
>> Thanks
>> Hironori
>>
>> 2016-06-17 2:59 GMT+09:00 Ufuk Celebi <[hidden email]>:
>> > Thanks :)
>> >
>> > On Thu, Jun 16, 2016 at 3:21 PM, Hironori Ogibayashi
>> > <[hidden email]> wrote:
>> >> Ufuk,
>> >>
>> >> Yes, of course. I will be sure to update when I got some more
>> >> information.
>> >>
>> >> Hironori
>> >>
>> >> 2016-06-16 1:56 GMT+09:00 Ufuk Celebi <[hidden email]>:
>> >>> Hey Hironori,
>> >>>
>> >>> thanks for reporting this. Could you please update this thread when
>> >>> you have more information from the Kafka list?
>> >>>
>> >>> – Ufuk
>> >>>
>> >>>
>> >>> On Wed, Jun 15, 2016 at 2:48 PM, Hironori Ogibayashi
>> >>> <[hidden email]> wrote:
>> >>>> Kostas,
>> >>>>
>> >>>> Thank you for your advise. I have posted my question to the Kafka
>> >>>> mailing list.
>> >>>> I think Kafka brokers are fine because no errors on producer side
>> >>>> with
>> >>>> 15,000 msg/sec and
>> >>>> from OS metrics, all of my brokers receives almost the same amount of
>> >>>> network traffic.
>> >>>>
>> >>>> Thanks,
>> >>>> Hironori
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>> 2016-06-14 22:40 GMT+09:00 Kostas Kloudas
>> >>>> <[hidden email]>:
>> >>>>> Hello Hironori,
>> >>>>>
>> >>>>> The logs just show that you get stuck in the Kafka consumer polling
>> >>>>> loop,
>> >>>>> which does not allow the consumer lock to be released. Thus the
>> >>>>> Flink
>> >>>>> part of the consumer is never actually called.
>> >>>>>
>> >>>>> To my understanding this does not seem to be a Flink issue.
>> >>>>> Or at least this is not shown from the logs.
>> >>>>>
>> >>>>> From googling a bit, I found this:
>> >>>>>
>> >>>>>
>> >>>>> http://stackoverflow.com/questions/35636739/kafka-consumer-marking-the-coordinator-2147483647-dead
>> >>>>>
>> >>>>> which relates the problem to network issues.
>> >>>>>
>> >>>>> Have you tried posting the problem also to the Kafka mailing list?
>> >>>>> Can it be that the kafka broker fails and tries to reconnect but
>> >>>>> does not
>> >>>>> make it?
>> >>>>>
>> >>>>> Kostas
>> >>>>>
>> >>>>> On Jun 14, 2016, at 2:59 PM, Hironori Ogibayashi
>> >>>>> <[hidden email]>
>> >>>>> wrote:
>> >>>>>
>> >>>>> Kostas,
>> >>>>>
>> >>>>> I have attached a log file from one of the taskManager. (The same
>> >>>>> host
>> >>>>> I executed jstack)
>> >>>>> I noticed that there are lots of "Marking the coordinator 2147482645
>> >>>>> dead" message in the log.
>> >>>>> MyContinuousProcessingTimeTriggerGlobal in the log is my custom
>> >>>>> trigger which is based on
>> >>>>> ContinuousProcessingTimeTrigger but clean up windows when it
>> >>>>> received
>> >>>>> specific log records.
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Hironori
>> >>>>>
>> >>>>> 2016-06-14 21:23 GMT+09:00 Kostas Kloudas
>> >>>>> <[hidden email]>:
>> >>>>>
>> >>>>> Hi Hironori,
>> >>>>>
>> >>>>> Could you also provide the logs of the taskManager?
>> >>>>>
>> >>>>> As you described, it seems that the consumer is stuck in the polling
>> >>>>> loop,
>> >>>>> although Flink polls with
>> >>>>> a timeout. This would normally mean that periodically it should
>> >>>>> release the
>> >>>>> lock for the checkpoints to go through.
>> >>>>>
>> >>>>> The logs of the task manager can help at clarifying why this does
>> >>>>> not
>> >>>>> happen.
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Kostas
>> >>>>>
>> >>>>> On Jun 14, 2016, at 12:48 PM, Hironori Ogibayashi
>> >>>>> <[hidden email]>
>> >>>>> wrote:
>> >>>>>
>> >>>>> Kostas,
>> >>>>>
>> >>>>> Thank you for your response.
>> >>>>> Yes, I am using latest Flink, which is 1.0.3.
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Hironori
>> >>>>>
>> >>>>> 2016-06-14 19:02 GMT+09:00 Kostas Kloudas
>> >>>>> <[hidden email]>:
>> >>>>>
>> >>>>> Hello Hironori,
>> >>>>>
>> >>>>> Are you using the latest Flink version?
>> >>>>> There were some changes in the FlinkConsumer in the latest releases.
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Kostas
>> >>>>>
>> >>>>> On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi
>> >>>>> <[hidden email]>
>> >>>>> wrote:
>> >>>>>
>> >>>>> Hello,
>> >>>>>
>> >>>>> I am running Flink job which reads topics from Kafka and write
>> >>>>> results
>> >>>>> to Redis. I use FsStatebackend with HDFS.
>> >>>>>
>> >>>>> I noticed that taking checkpoint takes serveral minutes and
>> >>>>> sometimes
>> >>>>> expires.
>> >>>>> ---
>> >>>>> 2016-06-14 17:25:40,734 INFO
>> >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> >>>>> Completed checkpoint 1456 (in 257956 ms)
>> >>>>> 2016-06-14 17:25:40,735 INFO
>> >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> >>>>> Triggering checkpoint 1457 @ 1465892740734
>> >>>>> 2016-06-14 17:35:40,735 INFO
>> >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> >>>>> Checkpoint 1457 expired before completing.
>> >>>>> 2016-06-14 17:35:40,736 INFO
>> >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> >>>>> Triggering checkpoint 1458 @ 1465893340735
>> >>>>> 2016-06-14 17:45:40,736 INFO
>> >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> >>>>> Checkpoint 1458 expired before completing.
>> >>>>> 2016-06-14 17:45:40,737 INFO
>> >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> >>>>> Triggering checkpoint 1459 @ 1465893940736
>> >>>>> 2016-06-14 17:55:40,738 INFO
>> >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> >>>>> Checkpoint 1459 expired before completing.
>> >>>>> 2016-06-14 17:55:40,739 INFO
>> >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> >>>>> Triggering checkpoint 1460 @ 1465894540738
>> >>>>> ---
>> >>>>>
>> >>>>> According to WebUI, checkpoint size is just 1MB. Why checkpointing
>> >>>>> takes so long?
>> >>>>>
>> >>>>> I took jstack during checkpointing. It looks that checkpointing
>> >>>>> thread
>> >>>>> is blocked in commitOffsets.
>> >>>>>
>> >>>>> ----
>> >>>>> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
>> >>>>> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
>> >>>>> [0x00007f2b3ddfc000]
>> >>>>> java.lang.Thread.State: BLOCKED (on object monitor)
>> >>>>>      at
>> >>>>>
>> >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
>> >>>>>      - waiting to lock <0x0000000659111b58> (a
>> >>>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>> >>>>>      at
>> >>>>>
>> >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
>> >>>>>      at
>> >>>>>
>> >>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
>> >>>>>      at
>> >>>>>
>> >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
>> >>>>>      - locked <0x0000000659111cc8> (a java.lang.Object)
>> >>>>>      at
>> >>>>> org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
>> >>>>>      at
>> >>>>>
>> >>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>> >>>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>> >>>>>      at
>> >>>>>
>> >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> >>>>>      at
>> >>>>>
>> >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> >>>>>      at java.lang.Thread.run(Thread.java:745)
>> >>>>> ---
>> >>>>>
>> >>>>> Blocker is this.
>> >>>>>
>> >>>>> ---
>> >>>>> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838
>> >>>>> runnable
>> >>>>> [0x00007f2b3dbfa000]
>> >>>>> java.lang.Thread.State: RUNNABLE
>> >>>>>      at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>> >>>>>      at
>> >>>>> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>> >>>>>      at
>> >>>>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>> >>>>>      at
>> >>>>> sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>> >>>>>      - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
>> >>>>>      - locked <0x0000000659457db8> (a
>> >>>>> java.util.Collections$UnmodifiableSet)
>> >>>>>      - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
>> >>>>>      at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>> >>>>>      at
>> >>>>> org.apache.kafka.common.network.Selector.select(Selector.java:425)
>> >>>>>      at
>> >>>>> org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>> >>>>>      at
>> >>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>> >>>>>      at
>> >>>>>
>> >>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>> >>>>>      at
>> >>>>>
>> >>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>> >>>>>      at
>> >>>>>
>> >>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>> >>>>>      at
>> >>>>>
>> >>>>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
>> >>>>>      at
>> >>>>>
>> >>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>> >>>>>      at
>> >>>>>
>> >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
>> >>>>>      - locked <0x0000000659111b58> (a
>> >>>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>> >>>>> ---
>> >>>>>
>> >>>>> If someone could advise me of the cause or the way to investigate
>> >>>>> further, that would be appreciated.
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Hironori
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> <flink-flink-taskmanager-0-FLINK1503.log.gz>
>> >>>>>
>> >>>>>
>
>