How could I configure environment file for Flink SQL, update-mode: retract? I have this for append: properties: - key: zookeeper.connect value: localhost:2181 - key: bootstrap.servers value: localhost:9092 - key: group.id value: reconMultiAttempFail format: type: json fail-on-missing-field: false json-schema: > { type: 'object', properties: { 'a': { type: 'string' }, 'b': { type: 'string' }, 'cnt': { type: 'string' } } } derive-schema: false schema: - name: 'a' type: VARCHAR - name: 'b' type: VARCHAR - name: 'cnt' type: BIGINT Couldn't find any document for the same. someone help me with the syntax. Thanks Srikanth |
Hi srikanth~
The Flink SQL update-mode is inferred from the target table type. For now, there are three StreamTableSink type, `AppendStreamTableSink` `UpsertStreamTableSink` and `RetractStreamTableSink`. If the target table is a type of Kafka which implments AppendStreamTableSink, the update-mode will be append only. So if you want enable retract-mode you may need to insert into one kind of RetractStreamTableSink. Hope it helps you ~ Best, Terry Wang
|
Hi Terry Wang, Thanks for quick reply. I would like to understand more on your line "
If the target table is a type of Kafka which implments AppendStreamTableSink, the update-mode will be append only". If your statement defines retract mode could not be used for Kafka sinks as it implements AppendStreamTableSink, but then the below code is working for me, dumping data to Kafka: DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable, Row.class).map(t -> { Row r = t.f1; ObjectNode node = mapper.createObjectNode(); node.put("source.ip", r.getField(0).toString()); node.put("destination.ip", r.getField(1).toString()); node.put("cnt", Long.parseLong(r.getField(2).toString())); return node.toString(); }); Properties kafkaProducerProperties = new Properties(); kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "host:9092"); kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1"); kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3"); outStreamAgg.addSink(new FlinkKafkaProducer<String>("reconMultiAttempFail", new SimpleStringSchema(), kafkaProducerProperties)); Is it that the above functionality works only with Table API and not with SQL? Please explain. Thanks Srikanth On Thu, Sep 26, 2019 at 1:57 PM Terry Wang <[hidden email]> wrote:
|
Hi, Srikanth~
In your code, DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable, Row.class).map(t -> {}); has converted the resultTable into a DataStream that’s unrelated with tableApi, And the following code `outStreamAgg.addSink(…)` is just a normall stream write to a FlinkKafka sink function. Your program code is a mixture of table api and dataStream programing not just single Table API. Best, Terry Wang
|
Awesome, thanks! On Thu, Sep 26, 2019 at 5:50 PM Terry Wang <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |