FlinkKinesisProducer sends data to only 1 shard. Is it because I don't have "AggregationEnabled" set to false ?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkKinesisProducer sends data to only 1 shard. Is it because I don't have "AggregationEnabled" set to false ?

Vijay Balakrishnan
Hi,
My FlinkKinesisProducer sends data to only 1 shard. Is it because I don't have "AggregationEnabled" set to false ?

flink_connector_kinesis_2.11 : flink version 1.9.1

//Setup Kinesis Producer
        Properties kinesisProducerConfig = new Properties();
        kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
        kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
        //kinesisProducerConfig.setProperty("AggregationEnabled", "false");
       
        FlinkKinesisProducer<Map<String, Object>> kinesisProducer = new FlinkKinesisProducer<>(
                new MonitoringMapKinesisSchema(localKinesis), kinesisProducerConfig);

        //TODO: kinesisProducer.setFailOnError(true);
        kinesisProducer.setDefaultStream(kinesisTopicWrite);
        kinesisProducer.setDefaultPartition("0");//TODO: why from start ?
        return kinesisProducer;

TIA,  
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKinesisProducer sends data to only 1 shard. Is it because I don't have "AggregationEnabled" set to false ?

Vijay Balakrishnan
Hi,
Looks like I am sending a Map<String,Object> to Kinesis and it is being sent to 1 partition only. How can I make this distribute across multiple partitions/shards on the Kinesis Data stream with this Map<String, Object> data ?

Sending to Kinesis:
DataStream<Map<String, Object>> influxToMapKinesisStream = enrichedMGStream.map(influxDBPoint -> {
                                return new MonitoringGroupingToInfluxDBPoint(agg, groupBySetArr).fromInfluxDBPoint(influxDBPoint);
                            }).returns(new TypeHint<Map<String, Object>>() {
                            }).setParallelism(dfltParallelism);

                            FlinkKinesisProducer<Map<String, Object>> kinesisProducer = getMonitoringFlinkKinesisProducer(kinesisTopicWrite, region, local, localKinesis);
                            influxToMapKinesisStream.addSink(kinesisProducer).setParallelism(dfltParallelism);

Map<String, Object> used to send to Kinesis:

Map<String, Object> mapObj = new HashMap<>();
mapObj.put(Utils.EVENT_TIMESTAMP, influxDBPoint.getTimestamp());
mapObj.put(Utils.MEASUREMENT, influxDBPoint.getMeasurement());
mapObj.put(Utils.TAGS, influxDBPoint.getTags());
mapObj.put(Utils.FIELDS, influxDBPoint.getFields());

TIA,

On Thu, Jun 4, 2020 at 5:35 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
My FlinkKinesisProducer sends data to only 1 shard. Is it because I don't have "AggregationEnabled" set to false ?

flink_connector_kinesis_2.11 : flink version 1.9.1

//Setup Kinesis Producer
        Properties kinesisProducerConfig = new Properties();
        kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
        kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
        //kinesisProducerConfig.setProperty("AggregationEnabled", "false");
       
        FlinkKinesisProducer<Map<String, Object>> kinesisProducer = new FlinkKinesisProducer<>(
                new MonitoringMapKinesisSchema(localKinesis), kinesisProducerConfig);

        //TODO: kinesisProducer.setFailOnError(true);
        kinesisProducer.setDefaultStream(kinesisTopicWrite);
        kinesisProducer.setDefaultPartition("0");//TODO: why from start ?
        return kinesisProducer;

TIA,  
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKinesisProducer sends data to only 1 shard. Is it because I don't have "AggregationEnabled" set to false ?

Vijay Balakrishnan
Hi,
Resolved the issue by using a Custom Partitioner and setting RequestTimeout properties.

kinesisProducer.setCustomPartitioner(new SerializableCustomPartitioner());
private static final class SerializableCustomPartitioner extends KinesisPartitioner<Map<String,Object>> {

private static final long serialVersionUID = -5196071893997035695L;

@Override
public String getPartitionId(Map<String,Object> map) {
StringBuilder stringBuilder = new StringBuilder();
UUID uuid = UUID.randomUUID();
stringBuilder.append(uuid);
return stringBuilder.toString();
}
}

On Thu, Jun 4, 2020 at 6:43 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
Looks like I am sending a Map<String,Object> to Kinesis and it is being sent to 1 partition only. How can I make this distribute across multiple partitions/shards on the Kinesis Data stream with this Map<String, Object> data ?

Sending to Kinesis:
DataStream<Map<String, Object>> influxToMapKinesisStream = enrichedMGStream.map(influxDBPoint -> {
                                return new MonitoringGroupingToInfluxDBPoint(agg, groupBySetArr).fromInfluxDBPoint(influxDBPoint);
                            }).returns(new TypeHint<Map<String, Object>>() {
                            }).setParallelism(dfltParallelism);

                            FlinkKinesisProducer<Map<String, Object>> kinesisProducer = getMonitoringFlinkKinesisProducer(kinesisTopicWrite, region, local, localKinesis);
                            influxToMapKinesisStream.addSink(kinesisProducer).setParallelism(dfltParallelism);

Map<String, Object> used to send to Kinesis:

Map<String, Object> mapObj = new HashMap<>();
mapObj.put(Utils.EVENT_TIMESTAMP, influxDBPoint.getTimestamp());
mapObj.put(Utils.MEASUREMENT, influxDBPoint.getMeasurement());
mapObj.put(Utils.TAGS, influxDBPoint.getTags());
mapObj.put(Utils.FIELDS, influxDBPoint.getFields());

TIA,

On Thu, Jun 4, 2020 at 5:35 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
My FlinkKinesisProducer sends data to only 1 shard. Is it because I don't have "AggregationEnabled" set to false ?

flink_connector_kinesis_2.11 : flink version 1.9.1

//Setup Kinesis Producer
        Properties kinesisProducerConfig = new Properties();
        kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
        kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
        //kinesisProducerConfig.setProperty("AggregationEnabled", "false");
       
        FlinkKinesisProducer<Map<String, Object>> kinesisProducer = new FlinkKinesisProducer<>(
                new MonitoringMapKinesisSchema(localKinesis), kinesisProducerConfig);

        //TODO: kinesisProducer.setFailOnError(true);
        kinesisProducer.setDefaultStream(kinesisTopicWrite);
        kinesisProducer.setDefaultPartition("0");//TODO: why from start ?
        return kinesisProducer;

TIA,