put record to kinesis and then trying consume using flink connector

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

put record to kinesis and then trying consume using flink connector

Sathi Chowdhury

Hi Flink Dev,

I thought something will work easily with flink and it is simple enough ,yer I am struggling to make it work.

I am using flink kinesis connector ..using 1.3-SNAPSHOT version.

 

Basically I am trying to bootstrap a stream with one event pushed into it as a warmup inside flink job’s main method and I use aws kinesis client to simply putrecord into a given stream.

My expectation is that now if I addSource to a kinesis stream the data stream will consume the event I pushed.

 

 

 

//This is the method that pushes to the kinesis Stream “mystream”

                    publishToKinesis(“mystream”,regionName,data) ;

 

 

                    Properties consumerConfig = new Properties();
        consumerConfig.setProperty(ConsumerConfigConstants.
AWS_REGION, region);
        consumerConfig.setProperty(ConsumerConfigConstants.
DEFAULT_STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.LATEST.toString());
        consumerConfig.setProperty(ConsumerConfigConstants.
AWS_CREDENTIALS_PROVIDER, "AUTO");

       
final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new Configuration(), false);
            cluster.start();
            ObjectMapper mapper =
new ObjectMapper();
           
final StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment(
                   
"localhost", cluster.getLeaderRPCPort());

            DataStream<String> outputStream = see.addSource(
new FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));


           
for (Iterator<String> it = DataStreamUtils.collect(outputStream); it.hasNext(); ) {
                String actualOut = it.next();
                ObjectNode actualOutNode = (ObjectNode) mapper.readTree(actualOut);
                    //then I do want to  either print it or do some further validation etc.                                 
       }

 

 

……..

 

Not sure why the record that I published , FlinkKinesisConsumer is not able to react to it, it keeps waiting for it…at the step it.next();

 

 

I print out the SequenceNumber I put the record at

2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published record, of bytes :162810 partition key :fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: shardId-000000000000Sequence number49572539577762637793132019619873654976833479400677703682 Stream Name:mystream

 

 

And Flink job is logging this at the end where it is waiting

2017-04-22 19:47:29,423 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: Subtask 0 will be seeded with initial shard KinesisStreamShard{streamName='mystream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572531519165352852103352736022695959324427654906511362,}}'}, starting state set as sequence number LATEST_SEQUENCE_NUM

2017-04-22 19:47:29,425 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: Subtask 0 will start consuming seeded shard KinesisStreamShard{streamName=’mystream’, shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572531519165352852103352736022695959324427654906511362,}}'} from sequence number LATEST_SEQUENCE_NUM with ShardConsumer 0

 

Any clue will be awesome to clear my confusion.

Thanks

Sathi

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
Reply | Threaded
Open this post in threaded view
|

Re: put record to kinesis and then trying consume using flink connector

Tzu-Li (Gordon) Tai
Hi Sathi,

Here, in the producer-side log, it says:

2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published record, of bytes :162810 partition key :fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: shardId-000000000000Sequence number49572539577762637793132019619873654976833479400677703682 Stream Name:mystream

The stream the record was inserted into is “mystream”.

However,

            DataStream<String> outputStream = see.addSource(new FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));

you seem to be consuming from “myStream”.

Could the capital “S” be the issue?

Cheers,
Gordon


On 24 April 2017 at 12:33:28 AM, Sathi Chowdhury ([hidden email]) wrote:

Hi Flink Dev,

I thought something will work easily with flink and it is simple enough ,yer I am struggling to make it work.

I am using flink kinesis connector ..using 1.3-SNAPSHOT version.

 

Basically I am trying to bootstrap a stream with one event pushed into it as a warmup inside flink job’s main method and I use aws kinesis client to simply putrecord into a given stream.

My expectation is that now if I addSource to a kinesis stream the data stream will consume the event I pushed.

 

 

 

//This is the method that pushes to the kinesis Stream “mystream”

                    publishToKinesis(“mystream”,regionName,data) ;

 

 

                    Properties consumerConfig = new Properties();
        consumerConfig.setProperty(ConsumerConfigConstants.
AWS_REGION, region);
        consumerConfig.setProperty(ConsumerConfigConstants.
DEFAULT_STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.LATEST.toString());
        consumerConfig.setProperty(ConsumerConfigConstants.
AWS_CREDENTIALS_PROVIDER, "AUTO");

       
final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new Configuration(), false);
            cluster.start();
            ObjectMapper mapper =
new ObjectMapper();
           
final StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment(
                   
"localhost", cluster.getLeaderRPCPort());

            DataStream<String> outputStream = see.addSource(
new FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));


           
for (Iterator<String> it = DataStreamUtils.collect(outputStream); it.hasNext(); ) {
                String actualOut = it.next();
                ObjectNode actualOutNode = (ObjectNode) mapper.readTree(actualOut);
                    //then I do want to  either print it or do some further validation etc.                                 
       }

 

 

……..

 

Not sure why the record that I published , FlinkKinesisConsumer is not able to react to it, it keeps waiting for it…at the step it.next();

 

 

I print out the SequenceNumber I put the record at

2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published record, of bytes :162810 partition key :fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: shardId-000000000000Sequence number49572539577762637793132019619873654976833479400677703682 Stream Name:mystream

 

 

And Flink job is logging this at the end where it is waiting

2017-04-22 19:47:29,423 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: Subtask 0 will be seeded with initial shard KinesisStreamShard{streamName='mystream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572531519165352852103352736022695959324427654906511362,}}'}, starting state set as sequence number LATEST_SEQUENCE_NUM

2017-04-22 19:47:29,425 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: Subtask 0 will start consuming seeded shard KinesisStreamShard{streamName=’mystream’, shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572531519165352852103352736022695959324427654906511362,}}'} from sequence number LATEST_SEQUENCE_NUM with ShardConsumer 0

 

Any clue will be awesome to clear my confusion.

Thanks

Sathi

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
Reply | Threaded
Open this post in threaded view
|

Re: put record to kinesis and then trying consume using flink connector

Sathi Chowdhury

Hi Gordon,

That was a typo, as I was trying to mask off the stream name.. I still had issues with using Latest as the initial stream position , I moved to using AT_TIMESTAMP to solve it, it works fine now.

Thanks so much for your response.

Sathi

 

From: "Tzu-Li (Gordon) Tai" <[hidden email]>
Date: Sunday, April 23, 2017 at 3:32 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: put record to kinesis and then trying consume using flink connector

 

Hi Sathi,

 

Here, in the producer-side log, it says:

2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published record, of bytes :162810 partition key :fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: shardId-000000000000Sequence number49572539577762637793132019619873654976833479400677703682 Stream Name:mystream

The stream the record was inserted into is “mystream”.

However,

            DataStream<String> outputStream = see.addSource(new FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));

you seem to be consuming from “myStream”.

Could the capital “S” be the issue?

 

Cheers,

Gordon

 

 

On 24 April 2017 at 12:33:28 AM, Sathi Chowdhury ([hidden email]) wrote:

Hi Flink Dev,

I thought something will work easily with flink and it is simple enough ,yer I am struggling to make it work.

I am using flink kinesis connector ..using 1.3-SNAPSHOT version.

 

Basically I am trying to bootstrap a stream with one event pushed into it as a warmup inside flink job’s main method and I use aws kinesis client to simply putrecord into a given stream.

My expectation is that now if I addSource to a kinesis stream the data stream will consume the event I pushed.

 

 

 

//This is the method that pushes to the kinesis Stream “mystream”

                    publishToKinesis(“mystream”,regionName,data) ;

 

 

                    Properties consumerConfig = new Properties();
        consumerConfig.setProperty(ConsumerConfigConstants.
AWS_REGION, region);
        consumerConfig.setProperty(ConsumerConfigConstants.
DEFAULT_STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.LATEST.toString());
        consumerConfig.setProperty(ConsumerConfigConstants.
AWS_CREDENTIALS_PROVIDER, "AUTO");

       
final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new Configuration(), false);
            cluster.start();
            ObjectMapper mapper =
new ObjectMapper();
           
final StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment(
                   
"localhost", cluster.getLeaderRPCPort());

            DataStream<String> outputStream = see.addSource(
new FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));


           
for (Iterator<String> it = DataStreamUtils.collect(outputStream); it.hasNext(); ) {
                String actualOut = it.next();
                ObjectNode actualOutNode = (ObjectNode) mapper.readTree(actualOut);
                    //then I do want to  either print it or do some further validation etc.                                 
       }

 

 

……..

 

Not sure why the record that I published , FlinkKinesisConsumer is not able to react to it, it keeps waiting for it…at the step it.next();

 

 

I print out the SequenceNumber I put the record at

2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published record, of bytes :162810 partition key :fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: shardId-000000000000Sequence number49572539577762637793132019619873654976833479400677703682 Stream Name:mystream

 

 

And Flink job is logging this at the end where it is waiting

2017-04-22 19:47:29,423 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: Subtask 0 will be seeded with initial shard KinesisStreamShard{streamName='mystream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572531519165352852103352736022695959324427654906511362,}}'}, starting state set as sequence number LATEST_SEQUENCE_NUM

2017-04-22 19:47:29,425 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: Subtask 0 will start consuming seeded shard KinesisStreamShard{streamName=’mystream’, shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572531519165352852103352736022695959324427654906511362,}}'} from sequence number LATEST_SEQUENCE_NUM with ShardConsumer 0

 

Any clue will be awesome to clear my confusion.

Thanks

Sathi

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
Reply | Threaded
Open this post in threaded view
|

Re: put record to kinesis and then trying consume using flink connector

Sathi Chowdhury

Hi ,

I also had a question around how long is the data that you broadcast in a stream that is not changing available in operator’s JVM …will it be as long as the operator is alive.

What happens when a slot dies. Does the new slot automatically get aware of the broadcasted data?

Thanks

Sathi

 

From: Sathi Chowdhury <[hidden email]>
Date: Tuesday, April 25, 2017 at 3:56 PM
To: "Tzu-Li (Gordon) Tai" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: put record to kinesis and then trying consume using flink connector

 

Hi Gordon,

That was a typo, as I was trying to mask off the stream name.. I still had issues with using Latest as the initial stream position , I moved to using AT_TIMESTAMP to solve it, it works fine now.

Thanks so much for your response.

Sathi

 

From: "Tzu-Li (Gordon) Tai" <[hidden email]>
Date: Sunday, April 23, 2017 at 3:32 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: put record to kinesis and then trying consume using flink connector

 

Hi Sathi,

 

Here, in the producer-side log, it says:

2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published record, of bytes :162810 partition key :fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: shardId-000000000000Sequence number49572539577762637793132019619873654976833479400677703682 Stream Name:mystream

The stream the record was inserted into is “mystream”.

However,

            DataStream<String> outputStream = see.addSource(new FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));

you seem to be consuming from “myStream”.

Could the capital “S” be the issue?

 

Cheers,

Gordon

 

 

On 24 April 2017 at 12:33:28 AM, Sathi Chowdhury ([hidden email]) wrote:

Hi Flink Dev,

I thought something will work easily with flink and it is simple enough ,yer I am struggling to make it work.

I am using flink kinesis connector ..using 1.3-SNAPSHOT version.

 

Basically I am trying to bootstrap a stream with one event pushed into it as a warmup inside flink job’s main method and I use aws kinesis client to simply putrecord into a given stream.

My expectation is that now if I addSource to a kinesis stream the data stream will consume the event I pushed.

 

 

 

//This is the method that pushes to the kinesis Stream “mystream”

                    publishToKinesis(“mystream”,regionName,data) ;

 

 

                    Properties consumerConfig = new Properties();
        consumerConfig.setProperty(ConsumerConfigConstants.
AWS_REGION, region);
        consumerConfig.setProperty(ConsumerConfigConstants.
DEFAULT_STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.LATEST.toString());
        consumerConfig.setProperty(ConsumerConfigConstants.
AWS_CREDENTIALS_PROVIDER, "AUTO");

       
final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new Configuration(), false);
            cluster.start();
            ObjectMapper mapper =
new ObjectMapper();
           
final StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment(
                   
"localhost", cluster.getLeaderRPCPort());

            DataStream<String> outputStream = see.addSource(
new FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));


           
for (Iterator<String> it = DataStreamUtils.collect(outputStream); it.hasNext(); ) {
                String actualOut = it.next();
                ObjectNode actualOutNode = (ObjectNode) mapper.readTree(actualOut);
                    //then I do want to  either print it or do some further validation etc.                                 
       }

 

 

……..

 

Not sure why the record that I published , FlinkKinesisConsumer is not able to react to it, it keeps waiting for it…at the step it.next();

 

 

I print out the SequenceNumber I put the record at

2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published record, of bytes :162810 partition key :fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: shardId-000000000000Sequence number49572539577762637793132019619873654976833479400677703682 Stream Name:mystream

 

 

And Flink job is logging this at the end where it is waiting

2017-04-22 19:47:29,423 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: Subtask 0 will be seeded with initial shard KinesisStreamShard{streamName='mystream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572531519165352852103352736022695959324427654906511362,}}'}, starting state set as sequence number LATEST_SEQUENCE_NUM

2017-04-22 19:47:29,425 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: Subtask 0 will start consuming seeded shard KinesisStreamShard{streamName=’mystream’, shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572531519165352852103352736022695959324427654906511362,}}'} from sequence number LATEST_SEQUENCE_NUM with ShardConsumer 0

 

Any clue will be awesome to clear my confusion.

Thanks

Sathi

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
Reply | Threaded
Open this post in threaded view
|

Re: put record to kinesis and then trying consume using flink connector

Alex Reid
Hi Sathi,

I believe the issue is because you pushed the event into the stream and then you started up a consumer app to start reading after that. If you push an event into the kinesis stream prior to starting up a reader that sets its initial stream position to LATEST, it will not read that record because you told it to start reading from the time you started up the consumer app. "LATEST" does not mean read the last event that was pushed into the stream, it means start reading from "now"/consumer app start basically. 

- alex

On Tue, Apr 25, 2017 at 4:00 PM, Sathi Chowdhury <[hidden email]> wrote:

Hi ,

I also had a question around how long is the data that you broadcast in a stream that is not changing available in operator’s JVM …will it be as long as the operator is alive.

What happens when a slot dies. Does the new slot automatically get aware of the broadcasted data?

Thanks

Sathi

 

From: Sathi Chowdhury <[hidden email]>
Date: Tuesday, April 25, 2017 at 3:56 PM
To: "Tzu-Li (Gordon) Tai" <[hidden email]>, "[hidden email]" <[hidden email]>


Subject: Re: put record to kinesis and then trying consume using flink connector

 

Hi Gordon,

That was a typo, as I was trying to mask off the stream name.. I still had issues with using Latest as the initial stream position , I moved to using AT_TIMESTAMP to solve it, it works fine now.

Thanks so much for your response.

Sathi

 

From: "Tzu-Li (Gordon) Tai" <[hidden email]>
Date: Sunday, April 23, 2017 at 3:32 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: put record to kinesis and then trying consume using flink connector

 

Hi Sathi,

 

Here, in the producer-side log, it says:

2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published record, of bytes :162810 partition key :fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: shardId-000000000000Sequence number49572539577762637793132019619873654976833479400677703682 Stream Name:mystream

The stream the record was inserted into is “mystream”.

However,

            DataStream<String> outputStream = see.addSource(new FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));

you seem to be consuming from “myStream”.

Could the capital “S” be the issue?

 

Cheers,

Gordon

 

 

On 24 April 2017 at 12:33:28 AM, Sathi Chowdhury ([hidden email]) wrote:

Hi Flink Dev,

I thought something will work easily with flink and it is simple enough ,yer I am struggling to make it work.

I am using flink kinesis connector ..using 1.3-SNAPSHOT version.

 

Basically I am trying to bootstrap a stream with one event pushed into it as a warmup inside flink job’s main method and I use aws kinesis client to simply putrecord into a given stream.

My expectation is that now if I addSource to a kinesis stream the data stream will consume the event I pushed.

 

 

 

//This is the method that pushes to the kinesis Stream “mystream”

                    publishToKinesis(“mystream”,regionName,data) ;

 

 

                    Properties consumerConfig = new Properties();
        consumerConfig.setProperty(ConsumerConfigConstants.
AWS_REGION, region);
        consumerConfig.setProperty(ConsumerConfigConstants.
DEFAULT_STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.LATEST.toString());
        consumerConfig.setProperty(ConsumerConfigConstants.
AWS_CREDENTIALS_PROVIDER, "AUTO");

       
final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new Configuration(), false);
            cluster.start();
            ObjectMapper mapper =
new ObjectMapper();
           
final StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment(
                   
"localhost", cluster.getLeaderRPCPort());

            DataStream<String> outputStream = see.addSource(
new FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));


           
for (Iterator<String> it = DataStreamUtils.collect(outputStream); it.hasNext(); ) {
                String actualOut = it.next();
                ObjectNode actualOutNode = (ObjectNode) mapper.readTree(actualOut);
                    //then I do want to  either print it or do some further validation etc.                                 
       }

 

 

……..

 

Not sure why the record that I published , FlinkKinesisConsumer is not able to react to it, it keeps waiting for it…at the step it.next();

 

 

I print out the SequenceNumber I put the record at

2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published record, of bytes :162810 partition key :fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: shardId-000000000000Sequence number49572539577762637793132019619873654976833479400677703682 Stream Name:mystream

 

 

And Flink job is logging this at the end where it is waiting

2017-04-22 19:47:29,423 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: Subtask 0 will be seeded with initial shard KinesisStreamShard{streamName='mystream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572531519165352852103352736022695959324427654906511362,}}'}, starting state set as sequence number LATEST_SEQUENCE_NUM

2017-04-22 19:47:29,425 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: Subtask 0 will start consuming seeded shard KinesisStreamShard{streamName=mystream’, shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572531519165352852103352736022695959324427654906511362,}}'} from sequence number LATEST_SEQUENCE_NUM with ShardConsumer 0

 

Any clue will be awesome to clear my confusion.

Thanks

Sathi

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

Reply | Threaded
Open this post in threaded view
|

Re: put record to kinesis and then trying consume using flink connector

Sathi Chowdhury

Thanks Alex.

Yes exactly so.I was actually aware of it was challenging to do it in the main method of flink, even though the push appears after the my  datastream  is attached to kinesis , due to lazy execution, once the stream is connected then my publish did not work.

If the publish is done by an outside service then obviously no issues..

 

From: Alex Reid <[hidden email]>
Date: Tuesday, April 25, 2017 at 4:31 PM
To: Sathi Chowdhury <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: put record to kinesis and then trying consume using flink connector

 

Hi Sathi,

 

I believe the issue is because you pushed the event into the stream and then you started up a consumer app to start reading after that. If you push an event into the kinesis stream prior to starting up a reader that sets its initial stream position to LATEST, it will not read that record because you told it to start reading from the time you started up the consumer app. "LATEST" does not mean read the last event that was pushed into the stream, it means start reading from "now"/consumer app start basically.

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
Reply | Threaded
Open this post in threaded view
|

Re: put record to kinesis and then trying consume using flink connector

Tzu-Li (Gordon) Tai
In reply to this post by Sathi Chowdhury
Hi Sathi,

Just curious: you mentioned that you’re writing some records in the main method of your job application, I assume that this is just for testing purposes, correct? If so, you can perhaps just use “EARLIEST” as the starting position. Or “AT_TIMESTAMP”, as you are currently doing.

And yes, you’re correct about the observation that the job transformations are lazily executed, so the Kinesis consumer connectors to your Kinesis stream after anything that happened in the main method.

Also, regarding one of your earlier questions:

I also had a question around how long is the data that you broadcast in a stream that is not changing available in operator’s JVM …will it be as long as the operator is alive.

What happens when a slot dies. Does the new slot automatically get aware of the broadcasted data?

I’m not sure what you mean here. Could you elaborate a bit more?

Cheers,
Gordon


On 26 April 2017 at 7:01:14 AM, Sathi Chowdhury ([hidden email]) wrote:

Hi ,

I also had a question around how long is the data that you broadcast in a stream that is not changing available in operator’s JVM …will it be as long as the operator is alive.

What happens when a slot dies. Does the new slot automatically get aware of the broadcasted data?

Thanks

Sathi

 

From: Sathi Chowdhury <[hidden email]>
Date: Tuesday, April 25, 2017 at 3:56 PM
To: "Tzu-Li (Gordon) Tai" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: put record to kinesis and then trying consume using flink connector

 

Hi Gordon,

That was a typo, as I was trying to mask off the stream name.. I still had issues with using Latest as the initial stream position , I moved to using AT_TIMESTAMP to solve it, it works fine now.

Thanks so much for your response.

Sathi

 

From: "Tzu-Li (Gordon) Tai" <[hidden email]>
Date: Sunday, April 23, 2017 at 3:32 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: put record to kinesis and then trying consume using flink connector

 

Hi Sathi,

 

Here, in the producer-side log, it says:

2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published record, of bytes :162810 partition key :fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: shardId-000000000000Sequence number49572539577762637793132019619873654976833479400677703682 Stream Name:mystream

The stream the record was inserted into is “mystream”.

However,

            DataStream<String> outputStream = see.addSource(new FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));

you seem to be consuming from “myStream”.

Could the capital “S” be the issue?

 

Cheers,

Gordon

 

 

On 24 April 2017 at 12:33:28 AM, Sathi Chowdhury ([hidden email]) wrote:

Hi Flink Dev,

I thought something will work easily with flink and it is simple enough ,yer I am struggling to make it work.

I am using flink kinesis connector ..using 1.3-SNAPSHOT version.

 

Basically I am trying to bootstrap a stream with one event pushed into it as a warmup inside flink job’s main method and I use aws kinesis client to simply putrecord into a given stream.

My expectation is that now if I addSource to a kinesis stream the data stream will consume the event I pushed.

 

 

 

//This is the method that pushes to the kinesis Stream “mystream”

                    publishToKinesis(“mystream”,regionName,data) ;

 

 

                    Properties consumerConfig = new Properties();
        consumerConfig.setProperty(ConsumerConfigConstants.
AWS_REGION, region);
        consumerConfig.setProperty(ConsumerConfigConstants.
DEFAULT_STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.LATEST.toString());
        consumerConfig.setProperty(ConsumerConfigConstants.
AWS_CREDENTIALS_PROVIDER, "AUTO");

       
final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new Configuration(), false);
            cluster.start();
            ObjectMapper mapper =
new ObjectMapper();
           
final StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment(
                   
"localhost", cluster.getLeaderRPCPort());

            DataStream<String> outputStream = see.addSource(
new FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));


           
for (Iterator<String> it = DataStreamUtils.collect(outputStream); it.hasNext(); ) {
                String actualOut = it.next();
                ObjectNode actualOutNode = (ObjectNode) mapper.readTree(actualOut);
                    //then I do want to  either print it or do some further validation etc.                                 
       }

 

 

……..

 

Not sure why the record that I published , FlinkKinesisConsumer is not able to react to it, it keeps waiting for it…at the step it.next();

 

 

I print out the SequenceNumber I put the record at

2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published record, of bytes :162810 partition key :fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: shardId-000000000000Sequence number49572539577762637793132019619873654976833479400677703682 Stream Name:mystream

 

 

And Flink job is logging this at the end where it is waiting

2017-04-22 19:47:29,423 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: Subtask 0 will be seeded with initial shard KinesisStreamShard{streamName='mystream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572531519165352852103352736022695959324427654906511362,}}'}, starting state set as sequence number LATEST_SEQUENCE_NUM

2017-04-22 19:47:29,425 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: Subtask 0 will start consuming seeded shard KinesisStreamShard{streamName=’mystream’, shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572531519165352852103352736022695959324427654906511362,}}'} from sequence number LATEST_SEQUENCE_NUM with ShardConsumer 0

 

Any clue will be awesome to clear my confusion.

Thanks

Sathi

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============