Hi guys,
In flink1.9, we can set `connector.type` to `kafka` and `format.type` to json to read/write json data from kafka or write json data to kafka. In my scenario, I wish to read local json data as a souce table, since I need to do local debug and don't consume online kafka data. For example: create table source ( In addition, writing local json data is also needed. Does anyone have similar needs? Best regards, Anyang |
Hi, Unfortunately it is not possible out of the box. The only format that the filesystem connector supports as of now is CSV. As a workaround you could create a Table out of a DataStream reusing the JsonRowDeserializationSchema. Have a look at the example below: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); DataStream<byte[]> input = env.fromElements( "{\"lon\": 123.23, \"rideTime\": \"2019\", \"obj\": {\"numb\": 1234}}".getBytes() ); // or read from file record by record JsonRowDeserializationSchema jsonSchema = new JsonRowDeserializationSchema.Builder(...).build(); TypeInformation<Row> producedType = jsonSchema.getProducedType(); SingleOutputStreamOperator<Row> in = input.map(jsonSchema::deserialize) .returns(producedType); tEnv.registerDataStream("t", in); Table table = tEnv.sqlQuery("SELECT * FROM t"); Best, Dawid On 29/10/2019 10:59, Anyang Hu wrote:
signature.asc (849 bytes) Download Attachment |
In reply to this post by Anyang Hu
Hi! Another solution would be to locally install kafka+zookeeper and push your dumped json (from the production server) data in a topic(you create a Kafka producer). Then you configure your code to point to this local broker. Consume your data from topic from either strategy you need (earliest offset, latest). The advantage is that you can repeat your tests multiple times as in real scenario. Depending on your use case, there can be different behaviour of your processing pipeline when you consume from a file (batch) or from a stream (kafka). I had this kind of issue when some CEP functionalities. I hope it helps. Regards, Florin On Tue, Oct 29, 2019 at 12:00 PM Anyang Hu <[hidden email]> wrote:
|
Hi, Thanks Dawid and Florin. To Dawid: CsvTableSource doesn't implements DefinedProctimeAttribute and DefinedRowtimeAttributes interfaces, so we can not use proctime and rowtime in source ddl. Except csv, we also need to consume json and pb data. Installing local kafka and zk introduces too many third-party components and may be not universal. In my scenario, I need to run a local sql job to debug(for example source and sink are kafka-json, dimension table is jdbc) before submit it to yarn. The following usage is what I want: 1)generate local json data for source and dimension table (source table supports proctime and rowtime); 2) replace `connetor.type` to 'filesystem'; 3) add `connector.path` to source table /dimension table ddl property; 4) new sql can run locally as data read from kafka and jdbc. Thanks, Anyang Spico Florin <[hidden email]> 于2019年10月29日周二 下午6:35写道:
|
Hi anyang: For you information. I plan to support JSON format in file system connector after https://issues.apache.org/jira/browse/FLINK-14256 After FLIP-66[1], we can define time attribute in SQL DDL whatever connector is. On Wed, Oct 30, 2019 at 11:36 AM Jingsong Li <[hidden email]> wrote:
Best, Jingsong Lee |
Free forum by Nabble | Edit this page |