Re: FlinkKinesisProducer sends data to only 1 shard. Is it because I don't have "AggregationEnabled" set to false ?
Posted by
Vijay Balakrishnan on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/FlinkKinesisProducer-sends-data-to-only-1-shard-Is-it-because-I-don-t-have-AggregationEnabled-set-to-tp35722p35723.html
Hi,
Looks like I am sending a Map<String,Object> to Kinesis and it is being sent to 1 partition only. How can I make this distribute across multiple partitions/shards on the Kinesis Data stream with this Map<String, Object> data ?
Sending to Kinesis:
DataStream<Map<String, Object>> influxToMapKinesisStream = enrichedMGStream.map(influxDBPoint -> {
return new MonitoringGroupingToInfluxDBPoint(agg, groupBySetArr).fromInfluxDBPoint(influxDBPoint);
}).returns(new TypeHint<Map<String, Object>>() {
}).setParallelism(dfltParallelism);
FlinkKinesisProducer<Map<String, Object>> kinesisProducer = getMonitoringFlinkKinesisProducer(kinesisTopicWrite, region, local, localKinesis);
influxToMapKinesisStream.addSink(kinesisProducer).setParallelism(dfltParallelism);
Map<String, Object> used to send to Kinesis:
Map<String, Object> mapObj = new HashMap<>();
mapObj.put(Utils.EVENT_TIMESTAMP, influxDBPoint.getTimestamp());
mapObj.put(Utils.MEASUREMENT, influxDBPoint.getMeasurement());
mapObj.put(Utils.TAGS, influxDBPoint.getTags());
mapObj.put(Utils.FIELDS, influxDBPoint.getFields());
TIA,
On Thu, Jun 4, 2020 at 5:35 PM Vijay Balakrishnan <
[hidden email]> wrote:
Hi,
My FlinkKinesisProducer sends data to only 1 shard. Is it because I don't have "AggregationEnabled" set to false ?
flink_connector_kinesis_2.11 : flink version 1.9.1
//Setup Kinesis Producer
Properties kinesisProducerConfig = new Properties();
kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
//kinesisProducerConfig.setProperty("AggregationEnabled", "false");
FlinkKinesisProducer<Map<String, Object>> kinesisProducer = new FlinkKinesisProducer<>(
new MonitoringMapKinesisSchema(localKinesis), kinesisProducerConfig);
//TODO: kinesisProducer.setFailOnError(true);
kinesisProducer.setDefaultStream(kinesisTopicWrite);
kinesisProducer.setDefaultPartition("0");//TODO: why from start ?
return kinesisProducer;
TIA,