Backpressure from producer with flink connector kinesis 1.4.2

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Backpressure from producer with flink connector kinesis 1.4.2

Liu, Gavin (CAI - Atlanta)

Hi guys,

 

I am new to flink framework. And we are building an application that takes kinesis stream for both flink source and sink.

The flink version we are using is 1.4.2, which is also the version for the flink-connector-kinesis. We built the flink-connector-kinesis jar explicitly with KPL version 0.12.6 due to the existing problems with default 0.12.5.

 

I get a rough idea how the backpressure works with flink through reading http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CF8DD76C0-9DE0-412A-8C24-B72AF0D4211B@...%3E

 

From my experiment with flink and flink-connector-kinesis, the back pressure only happens within flink processing operations, i.e., not in the flink producer to kinesis.

More specifically, when the throughput from KPL exceeds the kinesis throughput limitations, flink does not slow down at all, i.e., it does not add pressure on the processing chain up to the flink consumer.

Correct me if I misunderstood this. It looks like the flink producer (in the flink-connector-kinesis) is a standalone component, once a record is collected and sent to the producer, flink core finishes all the processing and does not care the fate of the record any more, it is the responsibility of the connector to continue the job.

I am expecting back pressure to happen from the source kinesis stream to the sink kinesis stream, whenever the sink kinesis stream could not handle the volume, it adds back pressure. Could someone illustrate a bit more why flink connector is designed in such a way. Also correct me if I stated anything wrong.

 

 

Gavin Liu             

 

Reply | Threaded
Open this post in threaded view
|

Re: Backpressure from producer with flink connector kinesis 1.4.2

Tzu-Li (Gordon) Tai
Hi Gavin,

The problem is that the Kinesis producer currently does not propagate backpressure properly.
Records are added to the internally used KPL client’s queue, without any queue size limit.

This is considered a bug, and already has a pull request for it [1], which we should probably push towards being merged soon.
What the pull request essentially does, is adding an upper bound to the number pending records in the KPL producer queue.
Once the upper bound is hit, input to the Kinesis producer sink is blocked, and therefore propagating backpressure further upstream.

Cheers,
Gordon

[1] https://github.com/apache/flink/pull/6021


On 20 June 2018 at 6:00:30 PM, Liu, Gavin (CAI - Atlanta) ([hidden email]) wrote:

Hi guys,

 

I am new to flink framework. And we are building an application that takes kinesis stream for both flink source and sink.

The flink version we are using is 1.4.2, which is also the version for the flink-connector-kinesis. We built the flink-connector-kinesis jar explicitly with KPL version 0.12.6 due to the existing problems with default 0.12.5.

 

I get a rough idea how the backpressure works with flink through reading http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CF8DD76C0-9DE0-412A-8C24-B72AF0D4211B@...%3E

 

From my experiment with flink and flink-connector-kinesis, the back pressure only happens within flink processing operations, i.e., not in the flink producer to kinesis.

More specifically, when the throughput from KPL exceeds the kinesis throughput limitations, flink does not slow down at all, i.e., it does not add pressure on the processing chain up to the flink consumer.

Correct me if I misunderstood this. It looks like the flink producer (in the flink-connector-kinesis) is a standalone component, once a record is collected and sent to the producer, flink core finishes all the processing and does not care the fate of the record any more, it is the responsibility of the connector to continue the job.

I am expecting back pressure to happen from the source kinesis stream to the sink kinesis stream, whenever the sink kinesis stream could not handle the volume, it adds back pressure. Could someone illustrate a bit more why flink connector is designed in such a way. Also correct me if I stated anything wrong.

 

 

Gavin Liu             

 

Reply | Threaded
Open this post in threaded view
|

Re: Backpressure from producer with flink connector kinesis 1.4.2

Liu, Gavin (CAI - Atlanta)

Thanks, Gordon. You are quick and It is very helpful to me.

I tried some other alternatives to resolve this, finally thought about rewriting the FlinkKinesisProducer class for our need. Glad that I asked before I started.

Really appreciate the quick response.

 

From: "Tzu-Li (Gordon) Tai" <[hidden email]>
Date: Wednesday, June 20, 2018 at 12:05 PM
To: "Liu, Gavin (CAI - Atlanta)" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Backpressure from producer with flink connector kinesis 1.4.2

 

 

The problem is that the Kinesis producer currently does not propagate backpressure properly.

Records are added to the internally used KPL client’s queue, without any queue size limit.

 

This is considered a bug, and already has a pull request for it [1], which we should probably push towards being merged soon.

What the pull request essentially does, is adding an upper bound to the number pending records in the KPL producer queue.

Once the upper bound is hit, input to the Kinesis producer sink is blocked, and therefore propagating backpressure further upstream.

 

Cheers,

Gordon

 

[1] https://github.com/apache/flink/pull/6021

 

 

On 20 June 2018 at 6:00:30 PM, Liu, Gavin (CAI - Atlanta) ([hidden email]) wrote:

Hi guys,

 

I am new to flink framework. And we are building an application that takes kinesis stream for both flink source and sink.

The flink version we are using is 1.4.2, which is also the version for the flink-connector-kinesis. We built the flink-connector-kinesis jar explicitly with KPL version 0.12.6 due to the existing problems with default 0.12.5.

 

I get a rough idea how the backpressure works with flink through reading http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CF8DD76C0-9DE0-412A-8C24-B72AF0D4211B@...%3E

 

From my experiment with flink and flink-connector-kinesis, the back pressure only happens within flink processing operations, i.e., not in the flink producer to kinesis.

More specifically, when the throughput from KPL exceeds the kinesis throughput limitations, flink does not slow down at all, i.e., it does not add pressure on the processing chain up to the flink consumer.

Correct me if I misunderstood this. It looks like the flink producer (in the flink-connector-kinesis) is a standalone component, once a record is collected and sent to the producer, flink core finishes all the processing and does not care the fate of the record any more, it is the responsibility of the connector to continue the job.

I am expecting back pressure to happen from the source kinesis stream to the sink kinesis stream, whenever the sink kinesis stream could not handle the volume, it adds back pressure. Could someone illustrate a bit more why flink connector is designed in such a way. Also correct me if I stated anything wrong.

 

 

Gavin Liu             

 

Reply | Threaded
Open this post in threaded view
|

Re: Backpressure from producer with flink connector kinesis 1.4.2

Liu, Gavin (CAI - Atlanta)

Hi guys,

 

I have another question related to the KPL problem. I wonder what the consequences of overwhelming KPL internal queue (kinesis) can be.

 

From my observation in experimenting with 1.4.2 (which does not have backpressure support yet in the open pr stated below), when the flink cluster is processing too fast and the throughput on the sink kinesis stream is limited, i.e, the throughput exceeding exception starts to be thrown, we quite often get the following exception (pasted in the end) very soon and all the subtasks switching status to cancelling and restarted.

From the exception trace, I can see that yarn got shutdown and all task managers are terminated. I suspect it is because of the memory issue. Whenever the throughput exceeding exception is thrown, it implicitly means that the internal unbounded queue in KPL may grow rapidly. we set the recordTtl = 60s and we can still see the record expiration exception along with exceeded throughput exception. Which leads me to wonder that if the internal unbounded queue grows too large and exhaust all the memory in the node and eventually crashing the yarn and the job manager.

 

Well, This is just my hypothesis. I wonder if someone has already encountered or investigated similar issues and could shed some light on it.

 

 

 

 

java.lang.Exception: TaskManager was lost/killed: container_1529095945616_0009_01_000004 @ ip-172-31-64-249.ec2.internal (dataPort=44591)

    at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

    at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

    at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

    at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

    at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:426)

    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

    at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

    at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)

    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

    at akka.actor.ActorCell.invoke(ActorCell.scala:495)

    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

    at akka.dispatch.Mailbox.run(Mailbox.scala:224)

    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

 

 

 

 

 

 

From: "Liu, Gavin (CAI - Atlanta)" <[hidden email]>
Date: Wednesday, June 20, 2018 at 12:11 PM
To: "Tzu-Li (Gordon) Tai" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Backpressure from producer with flink connector kinesis 1.4.2

 

Thanks, Gordon. You are quick and It is very helpful to me.

I tried some other alternatives to resolve this, finally thought about rewriting the FlinkKinesisProducer class for our need. Glad that I asked before I started.

Really appreciate the quick response.

 

From: "Tzu-Li (Gordon) Tai" <[hidden email]>
Date: Wednesday, June 20, 2018 at 12:05 PM
To: "Liu, Gavin (CAI - Atlanta)" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Backpressure from producer with flink connector kinesis 1.4.2

 

 

The problem is that the Kinesis producer currently does not propagate backpressure properly.

Records are added to the internally used KPL client’s queue, without any queue size limit.

 

This is considered a bug, and already has a pull request for it [1], which we should probably push towards being merged soon.

What the pull request essentially does, is adding an upper bound to the number pending records in the KPL producer queue.

Once the upper bound is hit, input to the Kinesis producer sink is blocked, and therefore propagating backpressure further upstream.

 

Cheers,

Gordon

 

[1] https://github.com/apache/flink/pull/6021

 

 

On 20 June 2018 at 6:00:30 PM, Liu, Gavin (CAI - Atlanta) ([hidden email]) wrote:

Hi guys,

 

I am new to flink framework. And we are building an application that takes kinesis stream for both flink source and sink.

The flink version we are using is 1.4.2, which is also the version for the flink-connector-kinesis. We built the flink-connector-kinesis jar explicitly with KPL version 0.12.6 due to the existing problems with default 0.12.5.

 

I get a rough idea how the backpressure works with flink through reading http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CF8DD76C0-9DE0-412A-8C24-B72AF0D4211B@...%3E

 

From my experiment with flink and flink-connector-kinesis, the back pressure only happens within flink processing operations, i.e., not in the flink producer to kinesis.

More specifically, when the throughput from KPL exceeds the kinesis throughput limitations, flink does not slow down at all, i.e., it does not add pressure on the processing chain up to the flink consumer.

Correct me if I misunderstood this. It looks like the flink producer (in the flink-connector-kinesis) is a standalone component, once a record is collected and sent to the producer, flink core finishes all the processing and does not care the fate of the record any more, it is the responsibility of the connector to continue the job.

I am expecting back pressure to happen from the source kinesis stream to the sink kinesis stream, whenever the sink kinesis stream could not handle the volume, it adds back pressure. Could someone illustrate a bit more why flink connector is designed in such a way. Also correct me if I stated anything wrong.

 

 

Gavin Liu             

 

Reply | Threaded
Open this post in threaded view
|

Re: Backpressure from producer with flink connector kinesis 1.4.2

Tzu-Li (Gordon) Tai
Hi,

According to the description in [1], then yes, I think it is expected that eventually YARN containers running TMs that execute the producer sink subtasks will be killed due to memory problems.
It seems like that KPL client is only a wrapper around a C++ daemon process, so it actually wouldn’t be the TM Java processes running out of memory and throwing OOMs, which would explain why the TMs were apparently silently killed as shown by the exception you pasted.

Cheers,
Gordon

[1] https://github.com/apache/flink/pull/6021

On 21 June 2018 at 5:56:02 AM, Liu, Gavin (CAI - Atlanta) ([hidden email]) wrote:

Hi guys,

 

I have another question related to the KPL problem. I wonder what the consequences of overwhelming KPL internal queue (kinesis) can be.

 

From my observation in experimenting with 1.4.2 (which does not have backpressure support yet in the open pr stated below), when the flink cluster is processing too fast and the throughput on the sink kinesis stream is limited, i.e, the throughput exceeding exception starts to be thrown, we quite often get the following exception (pasted in the end) very soon and all the subtasks switching status to cancelling and restarted.

From the exception trace, I can see that yarn got shutdown and all task managers are terminated. I suspect it is because of the memory issue. Whenever the throughput exceeding exception is thrown, it implicitly means that the internal unbounded queue in KPL may grow rapidly. we set the recordTtl = 60s and we can still see the record expiration exception along with exceeded throughput exception. Which leads me to wonder that if the internal unbounded queue grows too large and exhaust all the memory in the node and eventually crashing the yarn and the job manager.

 

Well, This is just my hypothesis. I wonder if someone has already encountered or investigated similar issues and could shed some light on it.

 

 

 

 

java.lang.Exception: TaskManager was lost/killed: container_1529095945616_0009_01_000004 @ ip-172-31-64-249.ec2.internal (dataPort=44591)

    at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

    at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

    at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

    at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

    at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:426)

    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

    at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

    at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)

    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

    at akka.actor.ActorCell.invoke(ActorCell.scala:495)

    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

    at akka.dispatch.Mailbox.run(Mailbox.scala:224)

    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

 

 

 

 

 

 

From: "Liu, Gavin (CAI - Atlanta)" <[hidden email]>
Date: Wednesday, June 20, 2018 at 12:11 PM
To: "Tzu-Li (Gordon) Tai" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Backpressure from producer with flink connector kinesis 1.4.2

 

Thanks, Gordon. You are quick and It is very helpful to me.

I tried some other alternatives to resolve this, finally thought about rewriting the FlinkKinesisProducer class for our need. Glad that I asked before I started.

Really appreciate the quick response.

 

From: "Tzu-Li (Gordon) Tai" <[hidden email]>
Date: Wednesday, June 20, 2018 at 12:05 PM
To: "Liu, Gavin (CAI - Atlanta)" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Backpressure from producer with flink connector kinesis 1.4.2

 

 

The problem is that the Kinesis producer currently does not propagate backpressure properly.

Records are added to the internally used KPL client’s queue, without any queue size limit.

 

This is considered a bug, and already has a pull request for it [1], which we should probably push towards being merged soon.

What the pull request essentially does, is adding an upper bound to the number pending records in the KPL producer queue.

Once the upper bound is hit, input to the Kinesis producer sink is blocked, and therefore propagating backpressure further upstream.

 

Cheers,

Gordon

 

[1] https://github.com/apache/flink/pull/6021

 

 

On 20 June 2018 at 6:00:30 PM, Liu, Gavin (CAI - Atlanta) ([hidden email]) wrote:

Hi guys,

 

I am new to flink framework. And we are building an application that takes kinesis stream for both flink source and sink.

The flink version we are using is 1.4.2, which is also the version for the flink-connector-kinesis. We built the flink-connector-kinesis jar explicitly with KPL version 0.12.6 due to the existing problems with default 0.12.5.

 

I get a rough idea how the backpressure works with flink through reading http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CF8DD76C0-9DE0-412A-8C24-B72AF0D4211B@...%3E

 

From my experiment with flink and flink-connector-kinesis, the back pressure only happens within flink processing operations, i.e., not in the flink producer to kinesis.

More specifically, when the throughput from KPL exceeds the kinesis throughput limitations, flink does not slow down at all, i.e., it does not add pressure on the processing chain up to the flink consumer.

Correct me if I misunderstood this. It looks like the flink producer (in the flink-connector-kinesis) is a standalone component, once a record is collected and sent to the producer, flink core finishes all the processing and does not care the fate of the record any more, it is the responsibility of the connector to continue the job.

I am expecting back pressure to happen from the source kinesis stream to the sink kinesis stream, whenever the sink kinesis stream could not handle the volume, it adds back pressure. Could someone illustrate a bit more why flink connector is designed in such a way. Also correct me if I stated anything wrong.

 

 

Gavin Liu             

 

Reply | Threaded
Open this post in threaded view
|

Re: Backpressure from producer with flink connector kinesis 1.4.2

Liu, Gavin (CAI - Atlanta)

Thanks, Gordon. Glad to hear you confirm on this. I learned a lot from the open pr btw.

 

I wonder except adding back pressure support in the producer, is there any other way to protect yarn from crashing, e.g., through configuration?

 

From: "Tzu-Li (Gordon) Tai" <[hidden email]>
Date: Thursday, June 21, 2018 at 5:09 AM
To: "Liu, Gavin (CAI - Atlanta)" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Backpressure from producer with flink connector kinesis 1.4.2

 

Hi,

 

According to the description in [1], then yes, I think it is expected that eventually YARN containers running TMs that execute the producer sink subtasks will be killed due to memory problems.

It seems like that KPL client is only a wrapper around a C++ daemon process, so it actually wouldn’t be the TM Java processes running out of memory and throwing OOMs, which would explain why the TMs were apparently silently killed as shown by the exception you pasted.

 

Cheers,

Gordon

 

[1] https://github.com/apache/flink/pull/6021

On 21 June 2018 at 5:56:02 AM, Liu, Gavin (CAI - Atlanta) ([hidden email]) wrote:

Hi guys,

 

I have another question related to the KPL problem. I wonder what the consequences of overwhelming KPL internal queue (kinesis) can be.

 

From my observation in experimenting with 1.4.2 (which does not have backpressure support yet in the open pr stated below), when the flink cluster is processing too fast and the throughput on the sink kinesis stream is limited, i.e, the throughput exceeding exception starts to be thrown, we quite often get the following exception (pasted in the end) very soon and all the subtasks switching status to cancelling and restarted.

From the exception trace, I can see that yarn got shutdown and all task managers are terminated. I suspect it is because of the memory issue. Whenever the throughput exceeding exception is thrown, it implicitly means that the internal unbounded queue in KPL may grow rapidly. we set the recordTtl = 60s and we can still see the record expiration exception along with exceeded throughput exception. Which leads me to wonder that if the internal unbounded queue grows too large and exhaust all the memory in the node and eventually crashing the yarn and the job manager.

 

Well, This is just my hypothesis. I wonder if someone has already encountered or investigated similar issues and could shed some light on it.

 

 

 

 

java.lang.Exception: TaskManager was lost/killed: container_1529095945616_0009_01_000004 @ ip-172-31-64-249.ec2.internal (dataPort=44591)

    at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

    at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

    at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

    at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

    at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:426)

    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

    at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

    at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)

    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

    at akka.actor.ActorCell.invoke(ActorCell.scala:495)

    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

    at akka.dispatch.Mailbox.run(Mailbox.scala:224)

    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

 

 

 

 

 

 

From: "Liu, Gavin (CAI - Atlanta)" <[hidden email]>
Date: Wednesday, June 20, 2018 at 12:11 PM
To: "Tzu-Li (Gordon) Tai" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Backpressure from producer with flink connector kinesis 1.4.2

 

Thanks, Gordon. You are quick and It is very helpful to me.

I tried some other alternatives to resolve this, finally thought about rewriting the FlinkKinesisProducer class for our need. Glad that I asked before I started.

Really appreciate the quick response.

 

From: "Tzu-Li (Gordon) Tai" <[hidden email]>
Date: Wednesday, June 20, 2018 at 12:05 PM
To: "Liu, Gavin (CAI - Atlanta)" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Backpressure from producer with flink connector kinesis 1.4.2

 

 

The problem is that the Kinesis producer currently does not propagate backpressure properly.

Records are added to the internally used KPL client’s queue, without any queue size limit.

 

This is considered a bug, and already has a pull request for it [1], which we should probably push towards being merged soon.

What the pull request essentially does, is adding an upper bound to the number pending records in the KPL producer queue.

Once the upper bound is hit, input to the Kinesis producer sink is blocked, and therefore propagating backpressure further upstream.

 

Cheers,

Gordon

 

[1] https://github.com/apache/flink/pull/6021

 

 

On 20 June 2018 at 6:00:30 PM, Liu, Gavin (CAI - Atlanta) ([hidden email]) wrote:

Hi guys,

 

I am new to flink framework. And we are building an application that takes kinesis stream for both flink source and sink.

The flink version we are using is 1.4.2, which is also the version for the flink-connector-kinesis. We built the flink-connector-kinesis jar explicitly with KPL version 0.12.6 due to the existing problems with default 0.12.5.

 

I get a rough idea how the backpressure works with flink through reading http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CF8DD76C0-9DE0-412A-8C24-B72AF0D4211B@...%3E

 

From my experiment with flink and flink-connector-kinesis, the back pressure only happens within flink processing operations, i.e., not in the flink producer to kinesis.

More specifically, when the throughput from KPL exceeds the kinesis throughput limitations, flink does not slow down at all, i.e., it does not add pressure on the processing chain up to the flink consumer.

Correct me if I misunderstood this. It looks like the flink producer (in the flink-connector-kinesis) is a standalone component, once a record is collected and sent to the producer, flink core finishes all the processing and does not care the fate of the record any more, it is the responsibility of the connector to continue the job.

I am expecting back pressure to happen from the source kinesis stream to the sink kinesis stream, whenever the sink kinesis stream could not handle the volume, it adds back pressure. Could someone illustrate a bit more why flink connector is designed in such a way. Also correct me if I stated anything wrong.

 

 

Gavin Liu