[FlinkSQL] is there a way to read/write local json data in Flink SQL like that of kafka?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[FlinkSQL] is there a way to read/write local json data in Flink SQL like that of kafka?

Anyang Hu
Hi guys,

In flink1.9, we can set `connector.type` to `kafka` and `format.type` to json to read/write json data from kafka or write json data to kafka.

In my scenario, I wish to read local json data as a souce table, since I need to do local debug and don't consume online kafka data.

For example:
create table source (
first varchar,
id int
) with (
'connector.type' = 'filesystem',
'connector.path' = '/path/to/json',
'format.type' = 'json'
)

In addition, writing local json data is also needed.

Does anyone have similar needs?

Best regards,
Anyang
Reply | Threaded
Open this post in threaded view
|

Re: [FlinkSQL] is there a way to read/write local json data in Flink SQL like that of kafka?

Dawid Wysakowicz-2

Hi,

Unfortunately it is not possible out of the box. The only format that the filesystem connector supports as of now is CSV.

As a workaround you could create a Table out of a DataStream reusing the JsonRowDeserializationSchema. Have a look at the example below:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStream<byte[]> input = env.fromElements(
            "{\"lon\": 123.23, \"rideTime\": \"2019\", \"obj\": {\"numb\": 1234}}".getBytes()
        ); // or read from file record by record

        JsonRowDeserializationSchema jsonSchema = new JsonRowDeserializationSchema.Builder(...).build();

        TypeInformation<Row> producedType = jsonSchema.getProducedType();
        SingleOutputStreamOperator<Row> in = input.map(jsonSchema::deserialize)
            .returns(producedType);

        tEnv.registerDataStream("t", in);

	Table table = tEnv.sqlQuery("SELECT * FROM t");

Best,

Dawid

On 29/10/2019 10:59, Anyang Hu wrote:
Hi guys,

In flink1.9, we can set `connector.type` to `kafka` and `format.type` to json to read/write json data from kafka or write json data to kafka.

In my scenario, I wish to read local json data as a souce table, since I need to do local debug and don't consume online kafka data.

For example:
create table source (
first varchar,
id int
) with (
'connector.type' = 'filesystem',
'connector.path' = '/path/to/json',
'format.type' = 'json'
)

In addition, writing local json data is also needed.

Does anyone have similar needs?

Best regards,
Anyang

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [FlinkSQL] is there a way to read/write local json data in Flink SQL like that of kafka?

Spico Florin
In reply to this post by Anyang Hu
Hi!

 Another solution would be to locally install kafka+zookeeper and push your dumped json (from the production server) data in a topic(you create a Kafka producer).
 Then you configure your code to point to this local broker. Consume your data from topic from either strategy you need (earliest offset, latest).
The advantage is that you can repeat your tests multiple times as in real scenario.

Depending on your use case, there can be different behaviour of your processing pipeline when you consume from a file (batch) or from a stream (kafka). 
I had this kind of issue when some CEP functionalities.
I hope it helps.
 Regards,
 Florin 
 

On Tue, Oct 29, 2019 at 12:00 PM Anyang Hu <[hidden email]> wrote:
Hi guys,

In flink1.9, we can set `connector.type` to `kafka` and `format.type` to json to read/write json data from kafka or write json data to kafka.

In my scenario, I wish to read local json data as a souce table, since I need to do local debug and don't consume online kafka data.

For example:
create table source (
first varchar,
id int
) with (
'connector.type' = 'filesystem',
'connector.path' = '/path/to/json',
'format.type' = 'json'
)

In addition, writing local json data is also needed.

Does anyone have similar needs?

Best regards,
Anyang
Reply | Threaded
Open this post in threaded view
|

Re: [FlinkSQL] is there a way to read/write local json data in Flink SQL like that of kafka?

Anyang Hu
Hi,

Thanks Dawid and Florin.
To Dawid: 
CsvTableSource doesn't implements DefinedProctimeAttribute and DefinedRowtimeAttributes interfaces, so we can not use proctime and rowtime in source ddl. Except csv, we also need to consume json and pb data.

To Florin: 
Installing local kafka and zk introduces too many third-party components and may be not universal.

In my scenario, I need to run a local sql job to debug(for example source and sink are kafka-json, dimension table is jdbc) before submit  it to yarn. The following usage is what I want:
1)generate local json data for source and dimension table (source table supports proctime and rowtime);
2)  replace `connetor.type` to 'filesystem';
3)  add `connector.path`  to source table /dimension table ddl property;
4)  new sql can run locally as data read from kafka and jdbc.

Thanks,
Anyang

Spico Florin <[hidden email]> 于2019年10月29日周二 下午6:35写道:
Hi!

 Another solution would be to locally install kafka+zookeeper and push your dumped json (from the production server) data in a topic(you create a Kafka producer).
 Then you configure your code to point to this local broker. Consume your data from topic from either strategy you need (earliest offset, latest).
The advantage is that you can repeat your tests multiple times as in real scenario.

Depending on your use case, there can be different behaviour of your processing pipeline when you consume from a file (batch) or from a stream (kafka). 
I had this kind of issue when some CEP functionalities.
I hope it helps.
 Regards,
 Florin 
 

On Tue, Oct 29, 2019 at 12:00 PM Anyang Hu <[hidden email]> wrote:
Hi guys,

In flink1.9, we can set `connector.type` to `kafka` and `format.type` to json to read/write json data from kafka or write json data to kafka.

In my scenario, I wish to read local json data as a souce table, since I need to do local debug and don't consume online kafka data.

For example:
create table source (
first varchar,
id int
) with (
'connector.type' = 'filesystem',
'connector.path' = '/path/to/json',
'format.type' = 'json'
)

In addition, writing local json data is also needed.

Does anyone have similar needs?

Best regards,
Anyang
Reply | Threaded
Open this post in threaded view
|

Re: [FlinkSQL] is there a way to read/write local json data in Flink SQL like that of kafka?

Jingsong Li
Hi anyang:

For you information. I plan to support JSON format in file system connector after https://issues.apache.org/jira/browse/FLINK-14256
After FLIP-66[1], we can define time attribute in SQL DDL whatever connector is.


On Wed, Oct 30, 2019 at 11:36 AM Jingsong Li <[hidden email]> wrote:
Hi anyang:

For you information. I plan to support JSON format in file system connector after https://issues.apache.org/jira/browse/FLINK-14256
After FLIP-66[1], we can define time attribute in SQL DDL whatever connector is.



On Tue, Oct 29, 2019 at 10:01 PM Anyang Hu <[hidden email]> wrote:
Hi,

Thanks Dawid and Florin.
To Dawid: 
CsvTableSource doesn't implements DefinedProctimeAttribute and DefinedRowtimeAttributes interfaces, so we can not use proctime and rowtime in source ddl. Except csv, we also need to consume json and pb data.

To Florin: 
Installing local kafka and zk introduces too many third-party components and may be not universal.

In my scenario, I need to run a local sql job to debug(for example source and sink are kafka-json, dimension table is jdbc) before submit  it to yarn. The following usage is what I want:
1)generate local json data for source and dimension table (source table supports proctime and rowtime);
2)  replace `connetor.type` to 'filesystem';
3)  add `connector.path`  to source table /dimension table ddl property;
4)  new sql can run locally as data read from kafka and jdbc.

Thanks,
Anyang

Spico Florin <[hidden email]> 于2019年10月29日周二 下午6:35写道:
Hi!

 Another solution would be to locally install kafka+zookeeper and push your dumped json (from the production server) data in a topic(you create a Kafka producer).
 Then you configure your code to point to this local broker. Consume your data from topic from either strategy you need (earliest offset, latest).
The advantage is that you can repeat your tests multiple times as in real scenario.

Depending on your use case, there can be different behaviour of your processing pipeline when you consume from a file (batch) or from a stream (kafka). 
I had this kind of issue when some CEP functionalities.
I hope it helps.
 Regards,
 Florin 
 

On Tue, Oct 29, 2019 at 12:00 PM Anyang Hu <[hidden email]> wrote:
Hi guys,

In flink1.9, we can set `connector.type` to `kafka` and `format.type` to json to read/write json data from kafka or write json data to kafka.

In my scenario, I wish to read local json data as a souce table, since I need to do local debug and don't consume online kafka data.

For example:
create table source (
first varchar,
id int
) with (
'connector.type' = 'filesystem',
'connector.path' = '/path/to/json',
'format.type' = 'json'
)

In addition, writing local json data is also needed.

Does anyone have similar needs?

Best regards,
Anyang


--
Best, Jingsong Lee


--
Best, Jingsong Lee