We have a flink job with 7 subtasks. See graph below. This is on flink 1.2.
Here each source task consumes from a kafka topic. Data rate is low around 70-80 messages per sec. What we are noticing is after running for 2 hours or so the source tasks starts showing up back pressure.
A thread dump shows the following – A bunch of blocked threads like this . Any idea what could be going here? "OutputFlusher" #87 daemon prio=5 os_prio=0 tid=0x0000000000b3b000 nid=0x73 waiting for monitor entry [0x00007f1d03af9000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(RecordWriter.java:175) - waiting to lock <0x000000060076fa80> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer) at org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:185) Waiting on "Time Trigger for Source:stream://com.tesseract.com/snmp_generic/envtemp -> FlatMap -> Map (1/1)" #92 daemon prio=5 os_prio=0 tid=0x0000000000c13000 nid=0x79 in Object.wait() [0x00007f1d032e5000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:168) - locked <0x00000006063b5aa8> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:138) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131) - locked <0x000000060076fa80> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:106) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:104) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:821) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:681) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:663) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:389) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:821) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:681) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:663) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:389) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:821) at org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:142) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:256) - locked <0x00000006038fc128> (a java.lang.Object) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) "Source:stream://com.tesseract.com/metricstream/v1 -> FlatMap -> Map (1/1)" #76 daemon prio=5 os_prio=0 tid=0x00007f1d240b5000 nid=0x68 in Object.wait() [0x00007f1d08b4e000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:168) - locked <0x0000000602c8daa8> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:138) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131) - locked <0x0000000606572e98> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:72) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:827) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:805) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:422) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:407) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:827) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:805) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.cisco.ndp.cep.impl.flinksiddhi.source.SourceToTupleFunction.flatMap(SourceToTupleFunction.java:142) at com.cisco.ndp.cep.impl.flinksiddhi.source.SourceToTupleFunction.flatMap(SourceToTupleFunction.java:40) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:422) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:407) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:827) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:805) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84) - locked <0x000000060387f028> (a java.lang.Object) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:245) - locked <0x000000060387f028> (a java.lang.Object) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.emitRecord(Kafka09Fetcher.java:198) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:157) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256) |
Hi Aparup, the slow-down can have multiple reasons. One reason could be that your computation in Timeseries-Analytics becomes more complex over time and therefore it slows down resulting in back pressure at the sources. This could be, for example, caused by accumulating a large state. Here the question would be which state backend are you using if your computation is stateful? Another problem could be garbage collection which only kicks in after some time. You can get information about the current garbage collection by enabling the configuration settings described here [1]. This could for example indicate that you create a lot of objects and maybe this could be optimized. Moreover it could indicate that you keep the reference to some old objects which cannot be garbage collected. In order to debug this, you could also take a look at a heap dump of the running program when it's slow. Another question would be whether you could upgrade to Flink 1.3.2 to see whether the problems exist there as well. Cheers, Till On Sun, Aug 27, 2017 at 9:03 AM, Aparup Banerjee (apbanerj) <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |