Flink SQL update-mode set to retract in env file.

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL update-mode set to retract in env file.

srikanth flink
How could I configure environment file for Flink SQL, update-mode: retract?

I have this for append:
properties:        
        - key: zookeeper.connect
          value: localhost:2181
        - key: bootstrap.servers
          value: localhost:9092
        - key: group.id
          value: reconMultiAttempFail
    format:
      type: json
      fail-on-missing-field: false
      json-schema: >
        {
          type: 'object',
          properties: {
            'a': {
               type: 'string'
            },
            'b': {
               type: 'string'
            },
            'cnt': {
               type: 'string'
            }
          }
        }
      derive-schema: false

    schema:
      - name: 'a'
        type: VARCHAR
     - name: 'b'
        type: VARCHAR
      - name: 'cnt'
        type: BIGINT

Couldn't find any document for the same.

someone help me with the syntax.

Thanks
Srikanth

Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL update-mode set to retract in env file.

Terry Wang
Hi srikanth~

The Flink SQL update-mode is inferred from the target table type.
For now, there are three StreamTableSink type, `AppendStreamTableSink` `UpsertStreamTableSink` and `RetractStreamTableSink`. 
If the target table is a type of Kafka which implments AppendStreamTableSink, the update-mode will be append only. 
So if you want enable retract-mode you may need to insert into one kind of RetractStreamTableSink.
Hope it helps you ~



Best,
Terry Wang



在 2019年9月26日,下午2:50,srikanth flink <[hidden email]> 写道:

How could I configure environment file for Flink SQL, update-mode: retract?

I have this for append:
properties:        
        - key: zookeeper.connect
          value: localhost:2181
        - key: bootstrap.servers
          value: localhost:9092
        - key: group.id
          value: reconMultiAttempFail
    format:
      type: json
      fail-on-missing-field: false
      json-schema: >
        {
          type: 'object',
          properties: {
            'a': {
               type: 'string'
            },
            'b': {
               type: 'string'
            },
            'cnt': {
               type: 'string'
            }
          }
        }
      derive-schema: false

    schema:
      - name: 'a'
        type: VARCHAR
     - name: 'b'
        type: VARCHAR
      - name: 'cnt'
        type: BIGINT

Couldn't find any document for the same.

someone help me with the syntax.

Thanks
Srikanth


Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL update-mode set to retract in env file.

srikanth flink
Hi Terry Wang,

Thanks for quick reply.

I would like to understand more on your line " If the target table is a type of Kafka which implments AppendStreamTableSink, the update-mode will be append only".
If your statement defines retract mode could not be used for Kafka sinks as it implements AppendStreamTableSink, but then the below code is working for me, dumping data to Kafka:
DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable, Row.class).map(t -> {
Row r = t.f1;
ObjectNode node = mapper.createObjectNode();
node.put("source.ip", r.getField(0).toString());
node.put("destination.ip", r.getField(1).toString());
node.put("cnt", Long.parseLong(r.getField(2).toString()));
return node.toString();
});
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "host:9092");
kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");

outStreamAgg.addSink(new FlinkKafkaProducer<String>("reconMultiAttempFail", new SimpleStringSchema(),
kafkaProducerProperties));

Is it that the above functionality works only with Table API and not with SQL?
Please explain.

Thanks
Srikanth



On Thu, Sep 26, 2019 at 1:57 PM Terry Wang <[hidden email]> wrote:
Hi srikanth~

The Flink SQL update-mode is inferred from the target table type.
For now, there are three StreamTableSink type, `AppendStreamTableSink` `UpsertStreamTableSink` and `RetractStreamTableSink`. 
If the target table is a type of Kafka which implments AppendStreamTableSink, the update-mode will be append only. 
So if you want enable retract-mode you may need to insert into one kind of RetractStreamTableSink.
Hope it helps you ~



Best,
Terry Wang



在 2019年9月26日,下午2:50,srikanth flink <[hidden email]> 写道:

How could I configure environment file for Flink SQL, update-mode: retract?

I have this for append:
properties:        
        - key: zookeeper.connect
          value: localhost:2181
        - key: bootstrap.servers
          value: localhost:9092
        - key: group.id
          value: reconMultiAttempFail
    format:
      type: json
      fail-on-missing-field: false
      json-schema: >
        {
          type: 'object',
          properties: {
            'a': {
               type: 'string'
            },
            'b': {
               type: 'string'
            },
            'cnt': {
               type: 'string'
            }
          }
        }
      derive-schema: false

    schema:
      - name: 'a'
        type: VARCHAR
     - name: 'b'
        type: VARCHAR
      - name: 'cnt'
        type: BIGINT

Couldn't find any document for the same.

someone help me with the syntax.

Thanks
Srikanth


Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL update-mode set to retract in env file.

Terry Wang
Hi, Srikanth~

In your code, 
DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable, Row.class).map(t -> {});  has converted the resultTable into a DataStream that’s unrelated with tableApi,
And the following code `outStreamAgg.addSink(…)` is just a normall stream write to a FlinkKafka sink function.
Your program code is a mixture of table api and dataStream programing not just single Table API.

Best,
Terry Wang



在 2019年9月26日,下午5:47,srikanth flink <[hidden email]> 写道:

Hi Terry Wang,

Thanks for quick reply.

I would like to understand more on your line " If the target table is a type of Kafka which implments AppendStreamTableSink, the update-mode will be append only".
If your statement defines retract mode could not be used for Kafka sinks as it implements AppendStreamTableSink, but then the below code is working for me, dumping data to Kafka:
DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable, Row.class).map(t -> {
Row r = t.f1;
ObjectNode node = mapper.createObjectNode();
node.put("source.ip", r.getField(0).toString());
node.put("destination.ip", r.getField(1).toString());
node.put("cnt", Long.parseLong(r.getField(2).toString()));
return node.toString();
});
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "host:9092");
kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");

outStreamAgg.addSink(new FlinkKafkaProducer<String>("reconMultiAttempFail", new SimpleStringSchema(),
kafkaProducerProperties));

Is it that the above functionality works only with Table API and not with SQL?
Please explain.

Thanks
Srikanth



On Thu, Sep 26, 2019 at 1:57 PM Terry Wang <[hidden email]> wrote:
Hi srikanth~

The Flink SQL update-mode is inferred from the target table type.
For now, there are three StreamTableSink type, `AppendStreamTableSink` `UpsertStreamTableSink` and `RetractStreamTableSink`. 
If the target table is a type of Kafka which implments AppendStreamTableSink, the update-mode will be append only. 
So if you want enable retract-mode you may need to insert into one kind of RetractStreamTableSink.
Hope it helps you ~



Best,
Terry Wang



在 2019年9月26日,下午2:50,srikanth flink <[hidden email]> 写道:

How could I configure environment file for Flink SQL, update-mode: retract?

I have this for append:
properties:        
        - key: zookeeper.connect
          value: localhost:2181
        - key: bootstrap.servers
          value: localhost:9092
        - key: group.id
          value: reconMultiAttempFail
    format:
      type: json
      fail-on-missing-field: false
      json-schema: >
        {
          type: 'object',
          properties: {
            'a': {
               type: 'string'
            },
            'b': {
               type: 'string'
            },
            'cnt': {
               type: 'string'
            }
          }
        }
      derive-schema: false

    schema:
      - name: 'a'
        type: VARCHAR
     - name: 'b'
        type: VARCHAR
      - name: 'cnt'
        type: BIGINT

Couldn't find any document for the same.

someone help me with the syntax.

Thanks
Srikanth



Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL update-mode set to retract in env file.

srikanth flink
Awesome, thanks!

On Thu, Sep 26, 2019 at 5:50 PM Terry Wang <[hidden email]> wrote:
Hi, Srikanth~

In your code, 
DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable, Row.class).map(t -> {});  has converted the resultTable into a DataStream that’s unrelated with tableApi,
And the following code `outStreamAgg.addSink(…)` is just a normall stream write to a FlinkKafka sink function.
Your program code is a mixture of table api and dataStream programing not just single Table API.

Best,
Terry Wang



在 2019年9月26日,下午5:47,srikanth flink <[hidden email]> 写道:

Hi Terry Wang,

Thanks for quick reply.

I would like to understand more on your line " If the target table is a type of Kafka which implments AppendStreamTableSink, the update-mode will be append only".
If your statement defines retract mode could not be used for Kafka sinks as it implements AppendStreamTableSink, but then the below code is working for me, dumping data to Kafka:
DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable, Row.class).map(t -> {
Row r = t.f1;
ObjectNode node = mapper.createObjectNode();
node.put("source.ip", r.getField(0).toString());
node.put("destination.ip", r.getField(1).toString());
node.put("cnt", Long.parseLong(r.getField(2).toString()));
return node.toString();
});
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "host:9092");
kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");

outStreamAgg.addSink(new FlinkKafkaProducer<String>("reconMultiAttempFail", new SimpleStringSchema(),
kafkaProducerProperties));

Is it that the above functionality works only with Table API and not with SQL?
Please explain.

Thanks
Srikanth



On Thu, Sep 26, 2019 at 1:57 PM Terry Wang <[hidden email]> wrote:
Hi srikanth~

The Flink SQL update-mode is inferred from the target table type.
For now, there are three StreamTableSink type, `AppendStreamTableSink` `UpsertStreamTableSink` and `RetractStreamTableSink`. 
If the target table is a type of Kafka which implments AppendStreamTableSink, the update-mode will be append only. 
So if you want enable retract-mode you may need to insert into one kind of RetractStreamTableSink.
Hope it helps you ~



Best,
Terry Wang



在 2019年9月26日,下午2:50,srikanth flink <[hidden email]> 写道:

How could I configure environment file for Flink SQL, update-mode: retract?

I have this for append:
properties:        
        - key: zookeeper.connect
          value: localhost:2181
        - key: bootstrap.servers
          value: localhost:9092
        - key: group.id
          value: reconMultiAttempFail
    format:
      type: json
      fail-on-missing-field: false
      json-schema: >
        {
          type: 'object',
          properties: {
            'a': {
               type: 'string'
            },
            'b': {
               type: 'string'
            },
            'cnt': {
               type: 'string'
            }
          }
        }
      derive-schema: false

    schema:
      - name: 'a'
        type: VARCHAR
     - name: 'b'
        type: VARCHAR
      - name: 'cnt'
        type: BIGINT

Couldn't find any document for the same.

someone help me with the syntax.

Thanks
Srikanth