Subtask keeps on discovering new Kinesis shard when using Kinesalite

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Subtask keeps on discovering new Kinesis shard when using Kinesalite

Philipp Bussche
Hi there,
I am looking into AWS Kinesis and wanted to test with a local install of Kinesalite. This is on the Flink 1.2-SNAPSHOT. However it looks like my subtask keeps on discovering new shards indicated by the following log messages which is constantly written:

21:45:42,867 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - Subtask 0 has discovered a new shard KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49567668736415295912662799644604754736261244425831710722,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 4

There is a new one discovered every 10 seconds or so. I have set parallism to 1 and checkpointing to 5000ms.

This does not happen when I am running directly against AWS.

I wonder if this is really something inside Flink or rather Kinesalite, however the part of the code in the Flink connector where the respective log message is written is heavily documented with FLINK-4341 so I wondering if this is maybe a side effect ?

Thanks
Philipp
Reply | Threaded
Open this post in threaded view
|

Re: Subtask keeps on discovering new Kinesis shard when using Kinesalite

Tzu-Li (Gordon) Tai
Hi Philipp,

When used against Kinesalite, can you tell if the connector is already reading data from the test shard before any
of the shard discovery messages? If you have any spare time to test this, you can set a larger value for the
`ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS` in the config properties to easier test this.

If yes, I’m suspecting the problem is that Kinesalite hasn’t sticked to the actual Kinesis behaviour for some of the APIs.
Specifically, I think the problem is with the `describeStream(streamName, lastSeenShardId)` Kinesis API, where the
expected behaviour is that the returned shard list only contains shardIds after `lastSeenShardId`. Perhaps Kinesalite
didn’t follow the behaviour on this part. That’s why the connector kept on determining that it’s a new discovered shard.

I’ll investigate and try to reproduce the problem, and see if there’s a good way to workaround this for Kinesalite.
Thank you for reporting the issue, I’ve filed up a JIRA (https://issues.apache.org/jira/browse/FLINK-5075) for this.

Best,
Gordon


On November 16, 2016 at 5:03:17 AM, Philipp Bussche ([hidden email]) wrote:

has discovered a new shard 
KinesisStreamShard
Reply | Threaded
Open this post in threaded view
|

Re: Subtask keeps on discovering new Kinesis shard when using Kinesalite

Philipp Bussche
Hello Gordon,

thank you for your help. I have set the discovery interval to 30 seconds and just starting the job on a clean kinesalite service (I am running it inside docker so every time the container gets stopped and removed to start from scratch).

This is the output without actually any data in the stream:

11/16/2016 17:59:03 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING
17:59:04,673 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - Subtask 0 will be seeded with initial shard KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49567694685205596999719397165301965297537316555774230530,}}'}, starting state set as sequence number LATEST_SEQUENCE_NUM
17:59:04,674 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - Subtask 0 will start consuming seeded shard KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49567694685205596999719397165301965297537316555774230530,}}'} from sequence number LATEST_SEQUENCE_NUM with ShardConsumer 0
17:59:04,689 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - Subtask 0 has discovered a new shard KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49567694685205596999719397165301965297537316555774230530,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
17:59:08,817 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1479315548815
17:59:08,835 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 (in 20 ms)
17:59:13,815 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1479315553815
17:59:13,817 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 (in 1 ms)
17:59:18,814 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 3 @ 1479315558814
17:59:18,815 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 3 (in 1 ms)
17:59:23,815 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 4 @ 1479315563815
17:59:23,816 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 4 (in 1 ms)
17:59:28,814 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 5 @ 1479315568813
17:59:28,814 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 5 (in 1 ms)
17:59:33,814 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 6 @ 1479315573814
17:59:33,815 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 6 (in 1 ms)
17:59:34,704 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - Subtask 0 has discovered a new shard KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49567694685205596999719397165301965297537316555774230530,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2

I then restarted the kinesalite container and posted a message to the stream before the 30 second mark occurred. The output shows that the job consumes from the 2 shards discovered initially (I initialized kinsalite with one shard only) right away and then continues to consume for the new shards to be discovered whenever they appear in 30 second frequencies. (I am posting a string to the stream but expect a JSON document in my job so the parsing kind of fails but look for the json output I am writing by just doing a job.print()):

Thanks
Philipp

11/16/2016 18:03:30 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING
18:03:30,832 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - Subtask 0 will be seeded with initial shard KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49567694816512384728667706222664274486890894391754358786,}}'}, starting state set as sequence number LATEST_SEQUENCE_NUM
18:03:30,833 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - Subtask 0 will start consuming seeded shard KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49567694816512384728667706222664274486890894391754358786,}}'} from sequence number LATEST_SEQUENCE_NUM with ShardConsumer 0
18:03:30,847 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - Subtask 0 has discovered a new shard KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49567694816512384728667706222664274486890894391754358786,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
18:03:34,878 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class de.harvee.dataspa.flink.model.HarveeEventError is not a valid POJO type
18:03:35,093 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1479315815091
{"domainId":null,"title":null,"description":null,"venue":null,"message":"could not parse message"}
{"domainId":null,"title":null,"description":null,"venue":null,"message":"could not parse message"}
18:03:35,191 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 (in 100 ms)
18:03:40,003 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1479315820003
18:03:40,005 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 (in 1 ms)
18:03:45,005 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 3 @ 1479315825005
18:03:45,006 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 3 (in 1 ms)
18:03:50,007 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 4 @ 1479315830007
18:03:50,007 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 4 (in 0 ms)
18:03:55,006 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 5 @ 1479315835006
18:03:55,007 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 5 (in 1 ms)
18:04:00,007 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 6 @ 1479315840007
18:04:00,008 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 6 (in 1 ms)
18:04:00,861 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - Subtask 0 has discovered a new shard KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49567694816512384728667706222664274486890894391754358786,}}'} due to resharding, and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2
{"domainId":null,"title":null,"description":null,"venue":null,"message":"could not parse message"}




Reply | Threaded
Open this post in threaded view
|

Re: Subtask keeps on discovering new Kinesis shard when using Kinesalite

Tzu-Li (Gordon) Tai
Hi Phillip,

Thanks for testing it. From your log and my own tests, I can confirm the problem is with Kinesalite not correctly
mocking the official Kinesis behaviour for the `describeStream` API.

There’s a PR for the fix here: https://github.com/apache/flink/pull/2822. With this change, shard discovery
should work normally when tested against Kinesalite.

However, I’m not completely sure yet if the fix is viable, and would like to wait for others to take a look / review.
Therefore, it might not make it into the next Flink minor bugfix release. If you’d like, you can try out the patch for now
and see if the problem remains.

Best Regards,
Gordon

On November 17, 2016 at 1:07:44 AM, Philipp Bussche ([hidden email]) wrote:

Hello Gordon,

thank you for your help. I have set the discovery interval to 30 seconds and
just starting the job on a clean kinesalite service (I am running it inside
docker so every time the container gets stopped and removed to start from
scratch).

This is the output without actually any data in the stream:

11/16/2016 17:59:03 Source: Custom Source -> Sink: Unnamed(1/1) switched to
RUNNING
17:59:04,673 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
- Subtask 0 will be seeded with initial shard
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49567694685205596999719397165301965297537316555774230530,}}'}, starting
state set as sequence number LATEST_SEQUENCE_NUM
17:59:04,674 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
- Subtask 0 will start consuming seeded shard
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49567694685205596999719397165301965297537316555774230530,}}'} from sequence
number LATEST_SEQUENCE_NUM with ShardConsumer 0
17:59:04,689 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
- Subtask 0 has discovered a new shard
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49567694685205596999719397165301965297537316555774230530,}}'} due to
resharding, and will start consuming the shard from sequence number
EARLIEST_SEQUENCE_NUM with ShardConsumer 1
17:59:08,817 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 1 @ 1479315548815
17:59:08,835 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 1 (in 20 ms)
17:59:13,815 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 2 @ 1479315553815
17:59:13,817 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 2 (in 1 ms)
17:59:18,814 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 3 @ 1479315558814
17:59:18,815 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 3 (in 1 ms)
17:59:23,815 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 4 @ 1479315563815
17:59:23,816 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 4 (in 1 ms)
17:59:28,814 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 5 @ 1479315568813
17:59:28,814 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 5 (in 1 ms)
17:59:33,814 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 6 @ 1479315573814
17:59:33,815 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 6 (in 1 ms)
17:59:34,704 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
- Subtask 0 has discovered a new shard
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49567694685205596999719397165301965297537316555774230530,}}'} due to
resharding, and will start consuming the shard from sequence number
EARLIEST_SEQUENCE_NUM with ShardConsumer 2

I then restarted the kinesalite container and posted a message to the stream
before the 30 second mark occurred. The output shows that the job consumes
from the 2 shards discovered initially (I initialized kinsalite with one
shard only) right away and then continues to consume for the new shards to
be discovered whenever they appear in 30 second frequencies. (I am posting a
string to the stream but expect a JSON document in my job so the parsing
kind of fails but look for the json output I am writing by just doing a
job.print()):

Thanks
Philipp

11/16/2016 18:03:30 Source: Custom Source -> Sink: Unnamed(1/1) switched to
RUNNING
18:03:30,832 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
- Subtask 0 will be seeded with initial shard
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49567694816512384728667706222664274486890894391754358786,}}'}, starting
state set as sequence number LATEST_SEQUENCE_NUM
18:03:30,833 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
- Subtask 0 will start consuming seeded shard
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49567694816512384728667706222664274486890894391754358786,}}'} from sequence
number LATEST_SEQUENCE_NUM with ShardConsumer 0
18:03:30,847 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
- Subtask 0 has discovered a new shard
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49567694816512384728667706222664274486890894391754358786,}}'} due to
resharding, and will start consuming the shard from sequence number
EARLIEST_SEQUENCE_NUM with ShardConsumer 1
18:03:34,878 INFO org.apache.flink.api.java.typeutils.TypeExtractor
- class de.harvee.dataspa.flink.model.HarveeEventError is not a valid POJO
type
18:03:35,093 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 1 @ 1479315815091
{"domainId":null,"title":null,"description":null,"venue":null,"message":"could
not parse message"}
{"domainId":null,"title":null,"description":null,"venue":null,"message":"could
not parse message"}
18:03:35,191 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 1 (in 100 ms)
18:03:40,003 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 2 @ 1479315820003
18:03:40,005 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 2 (in 1 ms)
18:03:45,005 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 3 @ 1479315825005
18:03:45,006 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 3 (in 1 ms)
18:03:50,007 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 4 @ 1479315830007
18:03:50,007 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 4 (in 0 ms)
18:03:55,006 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 5 @ 1479315835006
18:03:55,007 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 5 (in 1 ms)
18:04:00,007 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 6 @ 1479315840007
18:04:00,008 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 6 (in 1 ms)
18:04:00,861 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
- Subtask 0 has discovered a new shard
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49567694816512384728667706222664274486890894391754358786,}}'} due to
resharding, and will start consuming the shard from sequence number
EARLIEST_SEQUENCE_NUM with ShardConsumer 2
{"domainId":null,"title":null,"description":null,"venue":null,"message":"could
not parse message"}








--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-tp10133p10154.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Subtask keeps on discovering new Kinesis shard when using Kinesalite

Philipp Bussche
Hello Gordon

thank you for the patch. I can confirm that discovery looks good now and it does not re discover shards every few seconds.
I will do more testing with this now but it looks very good already !

Thanks,
Philipp