Hi, My FlinkKinesisProducer sends data to only 1 shard. Is it because I don't have "AggregationEnabled" set to false ? flink_connector_kinesis_2.11 : flink version 1.9.1 //Setup Kinesis Producer Properties kinesisProducerConfig = new Properties(); kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, region); kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO"); //kinesisProducerConfig.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<Map<String, Object>> kinesisProducer = new FlinkKinesisProducer<>( new MonitoringMapKinesisSchema(localKinesis), kinesisProducerConfig); //TODO: kinesisProducer.setFailOnError(true); kinesisProducer.setDefaultStream(kinesisTopicWrite); kinesisProducer.setDefaultPartition("0");//TODO: why from start ? return kinesisProducer; TIA, |
Hi, Looks like I am sending a Map<String,Object> to Kinesis and it is being sent to 1 partition only. How can I make this distribute across multiple partitions/shards on the Kinesis Data stream with this Map<String, Object> data ? Sending to Kinesis: DataStream<Map<String, Object>> influxToMapKinesisStream = enrichedMGStream.map(influxDBPoint -> { return new MonitoringGroupingToInfluxDBPoint(agg, groupBySetArr).fromInfluxDBPoint(influxDBPoint); }).returns(new TypeHint<Map<String, Object>>() { }).setParallelism(dfltParallelism); FlinkKinesisProducer<Map<String, Object>> kinesisProducer = getMonitoringFlinkKinesisProducer(kinesisTopicWrite, region, local, localKinesis); influxToMapKinesisStream.addSink(kinesisProducer).setParallelism(dfltParallelism); Map<String, Object> used to send to Kinesis: Map<String, Object> mapObj = new HashMap<>(); mapObj.put(Utils.EVENT_TIMESTAMP, influxDBPoint.getTimestamp()); mapObj.put(Utils.MEASUREMENT, influxDBPoint.getMeasurement()); mapObj.put(Utils.TAGS, influxDBPoint.getTags()); mapObj.put(Utils.FIELDS, influxDBPoint.getFields()); TIA, On Thu, Jun 4, 2020 at 5:35 PM Vijay Balakrishnan <[hidden email]> wrote:
|
Hi, Resolved the issue by using a Custom Partitioner and setting RequestTimeout properties. kinesisProducer.setCustomPartitioner(new SerializableCustomPartitioner()); private static final class SerializableCustomPartitioner extends KinesisPartitioner<Map<String,Object>> { On Thu, Jun 4, 2020 at 6:43 PM Vijay Balakrishnan <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |