ExpiredIteratorException when reading from a Kinesis stream

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

ExpiredIteratorException when reading from a Kinesis stream

Steffen Hausmann
Hi there,

I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
from a Kinesis stream. However, after a while (the exact duration varies
and is in the order of minutes) the Kinesis source doesn't emit any
further events and hence Flink doesn't produce any further output.
Eventually, an ExpiredIteratorException occurs in one of the task,
causing the entire job to fail:

> com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the future than the tolerated delay of 300000 milliseconds. (Service: AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException; Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)

This seems to be related to FLINK-4514, which is marked as resovled for
Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
running isn't suspended but hangs just a few minutes after the job has
been started.

I've attached a log file showing the described behavior.

Any idea what may be wrong?

Thanks,
Steffen

flink-expirediteratorexception.log (223K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: ExpiredIteratorException when reading from a Kinesis stream

Tzu-Li (Gordon) Tai
Hi Steffen,

Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for noticing this!).
The Flink community is going to release 1.1.3 asap, which will include the fix.
If you don’t want to wait for the release and want to try the fix now, you can also build on the current “release-1.1” branch, which already has FLINK-4514 merged.
Sorry for the inconvenience. Let me know if you bump into any other problems afterwards.

Best Regards,
Gordon


On October 5, 2016 at 2:56:21 AM, Steffen Hausmann ([hidden email]) wrote:

Hi there,

I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
from a Kinesis stream. However, after a while (the exact duration varies
and is in the order of minutes) the Kinesis source doesn't emit any
further events and hence Flink doesn't produce any further output.
Eventually, an ExpiredIteratorException occurs in one of the task,
causing the entire job to fail:

> com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the future than the tolerated delay of 300000 milliseconds. (Service: AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException; Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)

This seems to be related to FLINK-4514, which is marked as resovled for
Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
running isn't suspended but hangs just a few minutes after the job has
been started.

I've attached a log file showing the described behavior.

Any idea what may be wrong?

Thanks,
Steffen
Reply | Threaded
Open this post in threaded view
|

Re: ExpiredIteratorException when reading from a Kinesis stream

Josh
Hey Gordon,

I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514) with no problems, but yesterday the Kinesis consumer started behaving strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec, however the Flink Kinesis consumer started to stop consuming for periods of time (see the spikes in graph attached which shows data consumed by the Flink Kinesis consumer)

Looking in the task manager logs, there are no exceptions however there is this log message which I believe is related to the problem:

2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer  - Encountered an unexpected expired iterator AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore for shard KinesisStreamShard{streamName='stream001', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 85070511730234615865841151857942042863},SequenceNumberRange: {StartingSequenceNumber: 49566542916923648892164247926679091159472198219567464450,}}'}; refreshing the iterator ...

Having restarted the job from my last savepoint, it's consuming the stream fine again with no problems.

Do you have any idea what might be causing this, or anything I should do to investigate further?

Cheers,

Josh


On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Steffen,

Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for noticing this!).
The Flink community is going to release 1.1.3 asap, which will include the fix.
If you don’t want to wait for the release and want to try the fix now, you can also build on the current “release-1.1” branch, which already has FLINK-4514 merged.
Sorry for the inconvenience. Let me know if you bump into any other problems afterwards.

Best Regards,
Gordon


On October 5, 2016 at 2:56:21 AM, Steffen Hausmann ([hidden email]) wrote:

Hi there,

I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
from a Kinesis stream. However, after a while (the exact duration varies
and is in the order of minutes) the Kinesis source doesn't emit any
further events and hence Flink doesn't produce any further output.
Eventually, an ExpiredIteratorException occurs in one of the task,
causing the entire job to fail:

> com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the future than the tolerated delay of 300000 milliseconds. (Service: AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException; Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)

This seems to be related to FLINK-4514, which is marked as resovled for
Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
running isn't suspended but hangs just a few minutes after the job has
been started.

I've attached a log file showing the described behavior.

Any idea what may be wrong?

Thanks,
Steffen


Screen Shot 2016-11-03 at 09.53.53.png (43K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: ExpiredIteratorException when reading from a Kinesis stream

Tzu-Li (Gordon) Tai
Hi Josh,

That warning message was added as part of FLINK-4514. It pops out whenever a shard iterator was used after 5 minutes it was returned from Kinesis.
The only time spent between after a shard iterator was returned and before it was used to fetch the next batch of records, is on deserializing and emitting of the records of the last fetched batch.
So unless processing of the last fetched batch took over 5 minutes, this normally shouldn’t happen.

Have you noticed any sign of long, constant full GC for your Flink task managers? From your description and check in code, the only possible guess I can come up with now is that
the source tasks completely seized to be running for a period of time, and when it came back, the shard iterator was unexpectedly found to be expired. According to the graph you attached,
when the iterator was refreshed and tasks successfully fetched a few more batches, the source tasks again halted, and so on.
So you should see that same warning message right before every small peak within the graph.

Best Regards,
Gordon


On November 3, 2016 at 7:46:42 PM, Josh ([hidden email]) wrote:

Hey Gordon,

I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514) with no problems, but yesterday the Kinesis consumer started behaving strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec, however the Flink Kinesis consumer started to stop consuming for periods of time (see the spikes in graph attached which shows data consumed by the Flink Kinesis consumer)

Looking in the task manager logs, there are no exceptions however there is this log message which I believe is related to the problem:

2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer  - Encountered an unexpected expired iterator AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore for shard KinesisStreamShard{streamName='stream001', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 85070511730234615865841151857942042863},SequenceNumberRange: {StartingSequenceNumber: 49566542916923648892164247926679091159472198219567464450,}}'}; refreshing the iterator ...

Having restarted the job from my last savepoint, it's consuming the stream fine again with no problems.

Do you have any idea what might be causing this, or anything I should do to investigate further?

Cheers,

Josh


On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Steffen,

Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for noticing this!).
The Flink community is going to release 1.1.3 asap, which will include the fix.
If you don’t want to wait for the release and want to try the fix now, you can also build on the current “release-1.1” branch, which already has FLINK-4514 merged.
Sorry for the inconvenience. Let me know if you bump into any other problems afterwards.

Best Regards,
Gordon


On October 5, 2016 at 2:56:21 AM, Steffen Hausmann ([hidden email]) wrote:

Hi there,

I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
from a Kinesis stream. However, after a while (the exact duration varies
and is in the order of minutes) the Kinesis source doesn't emit any
further events and hence Flink doesn't produce any further output.
Eventually, an ExpiredIteratorException occurs in one of the task,
causing the entire job to fail:

> com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the future than the tolerated delay of 300000 milliseconds. (Service: AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException; Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)

This seems to be related to FLINK-4514, which is marked as resovled for
Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
running isn't suspended but hangs just a few minutes after the job has
been started.

I've attached a log file showing the described behavior.

Any idea what may be wrong?

Thanks,
Steffen

Reply | Threaded
Open this post in threaded view
|

Re: ExpiredIteratorException when reading from a Kinesis stream

Josh
Hi Gordon,

Thanks for the fast reply! 
You're right about the expired iterator exception occurring just before each spike. I can't see any signs of long GC on the task managers... CPU has been <15% the whole time when the spikes were taking place and I can't see anything unusual in the task manager logs.

But actually I just noticed that the Flink UI showed no successful checkpoints during the time of the problem even though my checkpoint interval is 15 minutes. So I guess this is probably some kind of Flink problem rather than a problem with the Kinesis consumer. Unfortunately I can't find anything useful in the logs so not sure what happened!

Josh



On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Josh,

That warning message was added as part of FLINK-4514. It pops out whenever a shard iterator was used after 5 minutes it was returned from Kinesis.
The only time spent between after a shard iterator was returned and before it was used to fetch the next batch of records, is on deserializing and emitting of the records of the last fetched batch.
So unless processing of the last fetched batch took over 5 minutes, this normally shouldn’t happen.

Have you noticed any sign of long, constant full GC for your Flink task managers? From your description and check in code, the only possible guess I can come up with now is that
the source tasks completely seized to be running for a period of time, and when it came back, the shard iterator was unexpectedly found to be expired. According to the graph you attached,
when the iterator was refreshed and tasks successfully fetched a few more batches, the source tasks again halted, and so on.
So you should see that same warning message right before every small peak within the graph.

Best Regards,
Gordon


On November 3, 2016 at 7:46:42 PM, Josh ([hidden email]) wrote:

Hey Gordon,

I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514) with no problems, but yesterday the Kinesis consumer started behaving strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec, however the Flink Kinesis consumer started to stop consuming for periods of time (see the spikes in graph attached which shows data consumed by the Flink Kinesis consumer)

Looking in the task manager logs, there are no exceptions however there is this log message which I believe is related to the problem:

2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer  - Encountered an unexpected expired iterator AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore for shard KinesisStreamShard{streamName='stream001', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 85070511730234615865841151857942042863},SequenceNumberRange: {StartingSequenceNumber: 49566542916923648892164247926679091159472198219567464450,}}'}; refreshing the iterator ...

Having restarted the job from my last savepoint, it's consuming the stream fine again with no problems.

Do you have any idea what might be causing this, or anything I should do to investigate further?

Cheers,

Josh


On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Steffen,

Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for noticing this!).
The Flink community is going to release 1.1.3 asap, which will include the fix.
If you don’t want to wait for the release and want to try the fix now, you can also build on the current “release-1.1” branch, which already has FLINK-4514 merged.
Sorry for the inconvenience. Let me know if you bump into any other problems afterwards.

Best Regards,
Gordon


On October 5, 2016 at 2:56:21 AM, Steffen Hausmann ([hidden email]) wrote:

Hi there,

I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
from a Kinesis stream. However, after a while (the exact duration varies
and is in the order of minutes) the Kinesis source doesn't emit any
further events and hence Flink doesn't produce any further output.
Eventually, an ExpiredIteratorException occurs in one of the task,
causing the entire job to fail:

> com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the future than the tolerated delay of 300000 milliseconds. (Service: AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException; Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)

This seems to be related to FLINK-4514, which is marked as resovled for
Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
running isn't suspended but hangs just a few minutes after the job has
been started.

I've attached a log file showing the described behavior.

Any idea what may be wrong?

Thanks,
Steffen


Reply | Threaded
Open this post in threaded view
|

Re: ExpiredIteratorException when reading from a Kinesis stream

Scott Kidder
Hi Steffan & Josh,

For what it's worth, I've been using the Kinesis connector with very good results on Flink 1.1.2 and 1.1.3. I updated the Flink Kinesis connector KCL and AWS SDK dependencies to the following versions:

aws.sdk.version: 1.11.34
aws.kinesis-kcl.version: 1.7.0

My customizations are visible in this commit on my fork:

It might be worth testing with newer AWS SDK & KCL libraries to see if the problem persists.

Best,

--Scott Kidder


On Thu, Nov 3, 2016 at 7:08 AM, Josh <[hidden email]> wrote:
Hi Gordon,

Thanks for the fast reply! 
You're right about the expired iterator exception occurring just before each spike. I can't see any signs of long GC on the task managers... CPU has been <15% the whole time when the spikes were taking place and I can't see anything unusual in the task manager logs.

But actually I just noticed that the Flink UI showed no successful checkpoints during the time of the problem even though my checkpoint interval is 15 minutes. So I guess this is probably some kind of Flink problem rather than a problem with the Kinesis consumer. Unfortunately I can't find anything useful in the logs so not sure what happened!

Josh



On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Josh,

That warning message was added as part of FLINK-4514. It pops out whenever a shard iterator was used after 5 minutes it was returned from Kinesis.
The only time spent between after a shard iterator was returned and before it was used to fetch the next batch of records, is on deserializing and emitting of the records of the last fetched batch.
So unless processing of the last fetched batch took over 5 minutes, this normally shouldn’t happen.

Have you noticed any sign of long, constant full GC for your Flink task managers? From your description and check in code, the only possible guess I can come up with now is that
the source tasks completely seized to be running for a period of time, and when it came back, the shard iterator was unexpectedly found to be expired. According to the graph you attached,
when the iterator was refreshed and tasks successfully fetched a few more batches, the source tasks again halted, and so on.
So you should see that same warning message right before every small peak within the graph.

Best Regards,
Gordon


On November 3, 2016 at 7:46:42 PM, Josh ([hidden email]) wrote:

Hey Gordon,

I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514) with no problems, but yesterday the Kinesis consumer started behaving strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec, however the Flink Kinesis consumer started to stop consuming for periods of time (see the spikes in graph attached which shows data consumed by the Flink Kinesis consumer)

Looking in the task manager logs, there are no exceptions however there is this log message which I believe is related to the problem:

2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer  - Encountered an unexpected expired iterator AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore for shard KinesisStreamShard{streamName='stream001', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 85070511730234615865841151857942042863},SequenceNumberRange: {StartingSequenceNumber: 49566542916923648892164247926679091159472198219567464450,}}'}; refreshing the iterator ...

Having restarted the job from my last savepoint, it's consuming the stream fine again with no problems.

Do you have any idea what might be causing this, or anything I should do to investigate further?

Cheers,

Josh


On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Steffen,

Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for noticing this!).
The Flink community is going to release 1.1.3 asap, which will include the fix.
If you don’t want to wait for the release and want to try the fix now, you can also build on the current “release-1.1” branch, which already has FLINK-4514 merged.
Sorry for the inconvenience. Let me know if you bump into any other problems afterwards.

Best Regards,
Gordon


On October 5, 2016 at 2:56:21 AM, Steffen Hausmann ([hidden email]) wrote:

Hi there,

I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
from a Kinesis stream. However, after a while (the exact duration varies
and is in the order of minutes) the Kinesis source doesn't emit any
further events and hence Flink doesn't produce any further output.
Eventually, an ExpiredIteratorException occurs in one of the task,
causing the entire job to fail:

> com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the future than the tolerated delay of 300000 milliseconds. (Service: AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException; Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)

This seems to be related to FLINK-4514, which is marked as resovled for
Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
running isn't suspended but hangs just a few minutes after the job has
been started.

I've attached a log file showing the described behavior.

Any idea what may be wrong?

Thanks,
Steffen



Reply | Threaded
Open this post in threaded view
|

Re: ExpiredIteratorException when reading from a Kinesis stream

Stephan Ewen
Is it possible that you have stalls in your topology?

Reasons could be:

  - The data sink blocks or becomes slow for some periods (where are you sending the data to?)

  - If you are using large state and a state backend that only supports synchronous checkpointing, there may be a delay introduced by the checkpoint


On Thu, Nov 3, 2016 at 7:21 PM, Scott Kidder <[hidden email]> wrote:
Hi Steffan & Josh,

For what it's worth, I've been using the Kinesis connector with very good results on Flink 1.1.2 and 1.1.3. I updated the Flink Kinesis connector KCL and AWS SDK dependencies to the following versions:

aws.sdk.version: 1.11.34
aws.kinesis-kcl.version: 1.7.0

My customizations are visible in this commit on my fork:

It might be worth testing with newer AWS SDK & KCL libraries to see if the problem persists.

Best,

--Scott Kidder


On Thu, Nov 3, 2016 at 7:08 AM, Josh <[hidden email]> wrote:
Hi Gordon,

Thanks for the fast reply! 
You're right about the expired iterator exception occurring just before each spike. I can't see any signs of long GC on the task managers... CPU has been <15% the whole time when the spikes were taking place and I can't see anything unusual in the task manager logs.

But actually I just noticed that the Flink UI showed no successful checkpoints during the time of the problem even though my checkpoint interval is 15 minutes. So I guess this is probably some kind of Flink problem rather than a problem with the Kinesis consumer. Unfortunately I can't find anything useful in the logs so not sure what happened!

Josh



On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Josh,

That warning message was added as part of FLINK-4514. It pops out whenever a shard iterator was used after 5 minutes it was returned from Kinesis.
The only time spent between after a shard iterator was returned and before it was used to fetch the next batch of records, is on deserializing and emitting of the records of the last fetched batch.
So unless processing of the last fetched batch took over 5 minutes, this normally shouldn’t happen.

Have you noticed any sign of long, constant full GC for your Flink task managers? From your description and check in code, the only possible guess I can come up with now is that
the source tasks completely seized to be running for a period of time, and when it came back, the shard iterator was unexpectedly found to be expired. According to the graph you attached,
when the iterator was refreshed and tasks successfully fetched a few more batches, the source tasks again halted, and so on.
So you should see that same warning message right before every small peak within the graph.

Best Regards,
Gordon


On November 3, 2016 at 7:46:42 PM, Josh ([hidden email]) wrote:

Hey Gordon,

I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514) with no problems, but yesterday the Kinesis consumer started behaving strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec, however the Flink Kinesis consumer started to stop consuming for periods of time (see the spikes in graph attached which shows data consumed by the Flink Kinesis consumer)

Looking in the task manager logs, there are no exceptions however there is this log message which I believe is related to the problem:

2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer  - Encountered an unexpected expired iterator AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore for shard KinesisStreamShard{streamName='stream001', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 85070511730234615865841151857942042863},SequenceNumberRange: {StartingSequenceNumber: 49566542916923648892164247926679091159472198219567464450,}}'}; refreshing the iterator ...

Having restarted the job from my last savepoint, it's consuming the stream fine again with no problems.

Do you have any idea what might be causing this, or anything I should do to investigate further?

Cheers,

Josh


On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Steffen,

Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for noticing this!).
The Flink community is going to release 1.1.3 asap, which will include the fix.
If you don’t want to wait for the release and want to try the fix now, you can also build on the current “release-1.1” branch, which already has FLINK-4514 merged.
Sorry for the inconvenience. Let me know if you bump into any other problems afterwards.

Best Regards,
Gordon


On October 5, 2016 at 2:56:21 AM, Steffen Hausmann ([hidden email]) wrote:

Hi there,

I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
from a Kinesis stream. However, after a while (the exact duration varies
and is in the order of minutes) the Kinesis source doesn't emit any
further events and hence Flink doesn't produce any further output.
Eventually, an ExpiredIteratorException occurs in one of the task,
causing the entire job to fail:

> com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the future than the tolerated delay of 300000 milliseconds. (Service: AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException; Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)

This seems to be related to FLINK-4514, which is marked as resovled for
Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
running isn't suspended but hangs just a few minutes after the job has
been started.

I've attached a log file showing the described behavior.

Any idea what may be wrong?

Thanks,
Steffen




Reply | Threaded
Open this post in threaded view
|

Re: ExpiredIteratorException when reading from a Kinesis stream

Josh
Hi Scott & Stephan,

The problem has happened a couple more times since yesterday, it's very strange as my job was running fine for over a week before this started happening. I find that if I restart the job (and restore from the last checkpoint) it runs fine for a while (couple of hours) before breaking again.

@Scott thanks, I'll try testing with the upgraded versions, though since my job was running fine for over a week it feels like there might be something else going on here.

@Stephan I see, my sink is a Kafka topic. I only have two nodes in my Kafka cluster and CPU and memory usage seems normal on both nodes. I can't see anything bad in the task manager logs relating to the Kafka producer either. I do have a fairly large state (20GB) but I'm using the latest RocksDB state backend (with asynchronous checkpointing). I'm not sure what else I can do to investigate this, but let me know if you have any more ideas!

Thanks,
Josh
 

On Thu, Nov 3, 2016 at 6:27 PM, Stephan Ewen <[hidden email]> wrote:
Is it possible that you have stalls in your topology?

Reasons could be:

  - The data sink blocks or becomes slow for some periods (where are you sending the data to?)

  - If you are using large state and a state backend that only supports synchronous checkpointing, there may be a delay introduced by the checkpoint


On Thu, Nov 3, 2016 at 7:21 PM, Scott Kidder <[hidden email]> wrote:
Hi Steffan & Josh,

For what it's worth, I've been using the Kinesis connector with very good results on Flink 1.1.2 and 1.1.3. I updated the Flink Kinesis connector KCL and AWS SDK dependencies to the following versions:

aws.sdk.version: 1.11.34
aws.kinesis-kcl.version: 1.7.0

My customizations are visible in this commit on my fork:

It might be worth testing with newer AWS SDK & KCL libraries to see if the problem persists.

Best,

--Scott Kidder


On Thu, Nov 3, 2016 at 7:08 AM, Josh <[hidden email]> wrote:
Hi Gordon,

Thanks for the fast reply! 
You're right about the expired iterator exception occurring just before each spike. I can't see any signs of long GC on the task managers... CPU has been <15% the whole time when the spikes were taking place and I can't see anything unusual in the task manager logs.

But actually I just noticed that the Flink UI showed no successful checkpoints during the time of the problem even though my checkpoint interval is 15 minutes. So I guess this is probably some kind of Flink problem rather than a problem with the Kinesis consumer. Unfortunately I can't find anything useful in the logs so not sure what happened!

Josh



On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Josh,

That warning message was added as part of FLINK-4514. It pops out whenever a shard iterator was used after 5 minutes it was returned from Kinesis.
The only time spent between after a shard iterator was returned and before it was used to fetch the next batch of records, is on deserializing and emitting of the records of the last fetched batch.
So unless processing of the last fetched batch took over 5 minutes, this normally shouldn’t happen.

Have you noticed any sign of long, constant full GC for your Flink task managers? From your description and check in code, the only possible guess I can come up with now is that
the source tasks completely seized to be running for a period of time, and when it came back, the shard iterator was unexpectedly found to be expired. According to the graph you attached,
when the iterator was refreshed and tasks successfully fetched a few more batches, the source tasks again halted, and so on.
So you should see that same warning message right before every small peak within the graph.

Best Regards,
Gordon


On November 3, 2016 at 7:46:42 PM, Josh ([hidden email]) wrote:

Hey Gordon,

I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514) with no problems, but yesterday the Kinesis consumer started behaving strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec, however the Flink Kinesis consumer started to stop consuming for periods of time (see the spikes in graph attached which shows data consumed by the Flink Kinesis consumer)

Looking in the task manager logs, there are no exceptions however there is this log message which I believe is related to the problem:

2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer  - Encountered an unexpected expired iterator AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore for shard KinesisStreamShard{streamName='stream001', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 85070511730234615865841151857942042863},SequenceNumberRange: {StartingSequenceNumber: 49566542916923648892164247926679091159472198219567464450,}}'}; refreshing the iterator ...

Having restarted the job from my last savepoint, it's consuming the stream fine again with no problems.

Do you have any idea what might be causing this, or anything I should do to investigate further?

Cheers,

Josh


On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Steffen,

Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for noticing this!).
The Flink community is going to release 1.1.3 asap, which will include the fix.
If you don’t want to wait for the release and want to try the fix now, you can also build on the current “release-1.1” branch, which already has FLINK-4514 merged.
Sorry for the inconvenience. Let me know if you bump into any other problems afterwards.

Best Regards,
Gordon


On October 5, 2016 at 2:56:21 AM, Steffen Hausmann ([hidden email]) wrote:

Hi there,

I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
from a Kinesis stream. However, after a while (the exact duration varies
and is in the order of minutes) the Kinesis source doesn't emit any
further events and hence Flink doesn't produce any further output.
Eventually, an ExpiredIteratorException occurs in one of the task,
causing the entire job to fail:

> com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the future than the tolerated delay of 300000 milliseconds. (Service: AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException; Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)

This seems to be related to FLINK-4514, which is marked as resovled for
Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
running isn't suspended but hangs just a few minutes after the job has
been started.

I've attached a log file showing the described behavior.

Any idea what may be wrong?

Thanks,
Steffen





Reply | Threaded
Open this post in threaded view
|

Re: ExpiredIteratorException when reading from a Kinesis stream

Josh
I've reset the state and the job appears to be running smoothly again now. My guess is that this problem was somehow related to my state becoming too large (it got to around 20GB before the problem began). I would still like to get to the bottom of what caused this as resetting the job's state is not a long term solution, so if anyone has any ideas I can restore the large state and investigate further.

Btw sorry for going a bit off topic on this thread!

On Fri, Nov 4, 2016 at 11:19 AM, Josh <[hidden email]> wrote:
Hi Scott & Stephan,

The problem has happened a couple more times since yesterday, it's very strange as my job was running fine for over a week before this started happening. I find that if I restart the job (and restore from the last checkpoint) it runs fine for a while (couple of hours) before breaking again.

@Scott thanks, I'll try testing with the upgraded versions, though since my job was running fine for over a week it feels like there might be something else going on here.

@Stephan I see, my sink is a Kafka topic. I only have two nodes in my Kafka cluster and CPU and memory usage seems normal on both nodes. I can't see anything bad in the task manager logs relating to the Kafka producer either. I do have a fairly large state (20GB) but I'm using the latest RocksDB state backend (with asynchronous checkpointing). I'm not sure what else I can do to investigate this, but let me know if you have any more ideas!

Thanks,
Josh
 

On Thu, Nov 3, 2016 at 6:27 PM, Stephan Ewen <[hidden email]> wrote:
Is it possible that you have stalls in your topology?

Reasons could be:

  - The data sink blocks or becomes slow for some periods (where are you sending the data to?)

  - If you are using large state and a state backend that only supports synchronous checkpointing, there may be a delay introduced by the checkpoint


On Thu, Nov 3, 2016 at 7:21 PM, Scott Kidder <[hidden email]> wrote:
Hi Steffan & Josh,

For what it's worth, I've been using the Kinesis connector with very good results on Flink 1.1.2 and 1.1.3. I updated the Flink Kinesis connector KCL and AWS SDK dependencies to the following versions:

aws.sdk.version: 1.11.34
aws.kinesis-kcl.version: 1.7.0

My customizations are visible in this commit on my fork:

It might be worth testing with newer AWS SDK & KCL libraries to see if the problem persists.

Best,

--Scott Kidder


On Thu, Nov 3, 2016 at 7:08 AM, Josh <[hidden email]> wrote:
Hi Gordon,

Thanks for the fast reply! 
You're right about the expired iterator exception occurring just before each spike. I can't see any signs of long GC on the task managers... CPU has been <15% the whole time when the spikes were taking place and I can't see anything unusual in the task manager logs.

But actually I just noticed that the Flink UI showed no successful checkpoints during the time of the problem even though my checkpoint interval is 15 minutes. So I guess this is probably some kind of Flink problem rather than a problem with the Kinesis consumer. Unfortunately I can't find anything useful in the logs so not sure what happened!

Josh



On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Josh,

That warning message was added as part of FLINK-4514. It pops out whenever a shard iterator was used after 5 minutes it was returned from Kinesis.
The only time spent between after a shard iterator was returned and before it was used to fetch the next batch of records, is on deserializing and emitting of the records of the last fetched batch.
So unless processing of the last fetched batch took over 5 minutes, this normally shouldn’t happen.

Have you noticed any sign of long, constant full GC for your Flink task managers? From your description and check in code, the only possible guess I can come up with now is that
the source tasks completely seized to be running for a period of time, and when it came back, the shard iterator was unexpectedly found to be expired. According to the graph you attached,
when the iterator was refreshed and tasks successfully fetched a few more batches, the source tasks again halted, and so on.
So you should see that same warning message right before every small peak within the graph.

Best Regards,
Gordon


On November 3, 2016 at 7:46:42 PM, Josh ([hidden email]) wrote:

Hey Gordon,

I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514) with no problems, but yesterday the Kinesis consumer started behaving strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec, however the Flink Kinesis consumer started to stop consuming for periods of time (see the spikes in graph attached which shows data consumed by the Flink Kinesis consumer)

Looking in the task manager logs, there are no exceptions however there is this log message which I believe is related to the problem:

2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer  - Encountered an unexpected expired iterator AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore for shard KinesisStreamShard{streamName='stream001', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 85070511730234615865841151857942042863},SequenceNumberRange: {StartingSequenceNumber: 49566542916923648892164247926679091159472198219567464450,}}'}; refreshing the iterator ...

Having restarted the job from my last savepoint, it's consuming the stream fine again with no problems.

Do you have any idea what might be causing this, or anything I should do to investigate further?

Cheers,

Josh


On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Steffen,

Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for noticing this!).
The Flink community is going to release 1.1.3 asap, which will include the fix.
If you don’t want to wait for the release and want to try the fix now, you can also build on the current “release-1.1” branch, which already has FLINK-4514 merged.
Sorry for the inconvenience. Let me know if you bump into any other problems afterwards.

Best Regards,
Gordon


On October 5, 2016 at 2:56:21 AM, Steffen Hausmann ([hidden email]) wrote:

Hi there,

I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
from a Kinesis stream. However, after a while (the exact duration varies
and is in the order of minutes) the Kinesis source doesn't emit any
further events and hence Flink doesn't produce any further output.
Eventually, an ExpiredIteratorException occurs in one of the task,
causing the entire job to fail:

> com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the future than the tolerated delay of 300000 milliseconds. (Service: AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException; Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)

This seems to be related to FLINK-4514, which is marked as resovled for
Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
running isn't suspended but hangs just a few minutes after the job has
been started.

I've attached a log file showing the described behavior.

Any idea what may be wrong?

Thanks,
Steffen