kinesis producer setCustomPartitioner use stream's own data

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

kinesis producer setCustomPartitioner use stream's own data

Sathi Chowdhury

Hi flink users and experts,

 

In my flink processor I am trying to use Flink Kinesis connector . I read from a kinesis stream , and After the transformation (for which I use RichCoFlatMapFunction), json event needs to sink to a kinesis stream k1.

DataStream<ObjectNode> myStream = see.addSource(new FlinkKinesisConsumer<>(inputStream, new MyDeserializationSchema(), consumerConfig));

 

 

For setting up the producer including partitioning I want to use setCustompartitioner , but the problem is that I don’t know how to access a parameters inside myStream , I have multiple fields that I want to extract from the stream  right there in the main method and use them in deciding the partition key. is possible to choose a partition key that is prepared from the stream ? if so can you please share an example.

 

 


kinesis.setCustomPartitioner(
new KinesisPartitioner<String>() {
   
@Override
   
public String getPartitionId(String element) {
       
int l = element.length();   /// here I want to bring values extracted from the stream
       
return element.substring(l - 1, l);
    }
});

 

Thanks

Sathi

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
Reply | Threaded
Open this post in threaded view
|

Re: kinesis producer setCustomPartitioner use stream's own data

Tzu-Li (Gordon) Tai
Hi Sathi,

The `getPartitionId` method is invoked with each record from the stream. In there, you can extract values / fields from the record, and use that to determine the target partition id.

Is this what you had in mind?

Cheers,
Gordon

On February 21, 2017 at 11:54:21 AM, Sathi Chowdhury ([hidden email]) wrote:

Hi flink users and experts,

 

In my flink processor I am trying to use Flink Kinesis connector . I read from a kinesis stream , and After the transformation (for which I use RichCoFlatMapFunction), json event needs to sink to a kinesis stream k1.

DataStream<ObjectNode> myStream = see.addSource(new FlinkKinesisConsumer<>(inputStream, new MyDeserializationSchema(), consumerConfig));

 

 

For setting up the producer including partitioning I want to use setCustompartitioner , but the problem is that I don’t know how to access a parameters inside myStream , I have multiple fields that I want to extract from the stream  right there in the main method and use them in deciding the partition key. is possible to choose a partition key that is prepared from the stream ? if so can you please share an example.

 

 


kinesis.setCustomPartitioner(
new KinesisPartitioner<String>() {
   
@Override
   
public String getPartitionId(String element) {
       
int l = element.length();   /// here I want to bring values extracted from the stream
       
return element.substring(l - 1, l);
    }
});

 

Thanks

Sathi

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
Reply | Threaded
Open this post in threaded view
|

Re: kinesis producer setCustomPartitioner use stream's own data

Sathi Chowdhury

Thanks Gordon,

It was simple to resolve.

Best,
Sathi

 

From: "Tzu-Li (Gordon) Tai" <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Monday, February 20, 2017 at 11:46 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: kinesis producer setCustomPartitioner use stream's own data

 

Hi Sathi,

 

The `getPartitionId` method is invoked with each record from the stream. In there, you can extract values / fields from the record, and use that to determine the target partition id.

 

Is this what you had in mind?

 

Cheers,

Gordon

 

On February 21, 2017 at 11:54:21 AM, Sathi Chowdhury ([hidden email]) wrote:

Hi flink users and experts,

 

In my flink processor I am trying to use Flink Kinesis connector . I read from a kinesis stream , and After the transformation (for which I use RichCoFlatMapFunction), json event needs to sink to a kinesis stream k1.

DataStream<ObjectNode> myStream = see.addSource(new FlinkKinesisConsumer<>(inputStream, new MyDeserializationSchema(), consumerConfig));

 

 

For setting up the producer including partitioning I want to use setCustompartitioner , but the problem is that I don’t know how to access a parameters inside myStream , I have multiple fields that I want to extract from the stream  right there in the main method and use them in deciding the partition key. is possible to choose a partition key that is prepared from the stream ? if so can you please share an example.

 

 


kinesis.setCustomPartitioner(
new KinesisPartitioner<String>() {
   
@Override
   
public String getPartitionId(String element) {
       
int l = element.length();   /// here I want to bring values extracted from the stream
       
return element.substring(l - 1, l);
    }
});

 

Thanks

Sathi

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============