How do you do: My problem is flink table format and table schema mapping. The input data is similar to the following json format: {
"id": "123",
"serial": "6b0c2d26",
"msg": {
"f1": "5677"
}
}
The format code for TableSource is as follows:
new Json().schema(Types.ROW(new String[] {
"id",
"serial",
"msg"
}, new TypeInformation << ? > [] {
Types.STRING(), Types.STRING(), Types.ROW(new String[] {
"f1"
}, new TypeInformation << ? > [] {
Types.STRING()
})
})); The schema part of TableSource is as follows: Schema schema = new Schema();
schema.field("id", Types.STRING());
schema.field("serial", Types.STRING()); I don't know how to define the f1 field of msg in the schema. I tried schema.field("f1", Types.STRING()) before; but I will report an error. What is the correct method? The following SQL can be run correctly: select id,serial,f1 from table; My flink version is 1.8.1,use flink table & SQL API thanks;
|
should be schema.field(“msg”, Types.ROW(...))?
And you should select msg.f1 from table. Best Jingsong Lee
|
thank you!
Let me try。 ------------------ 原始邮件 ------------------ And you should select msg.f1 from table. Best Jingsong Lee
|
I want to output the query results to kafka, json format is as follows:
{
"id": "123", "serial": "6b0c2d26", "msg": { "f1": "5677" } } How to define the format and schema of kafka sink?
thanks!
------------------ 原始邮件 ------------------
Let me try。
------------------ 原始邮件 ------------------
And you should select msg.f1 from table.
Best
Jingsong Lee
|
I have found a way,
select row(msg.f1) from table.
------------------ 原始邮件 ------------------
I want to output the query results to kafka, json format is as follows:
{
"id": "123", "serial": "6b0c2d26", "msg": { "f1": "5677" } } How to define the format and schema of kafka sink?
thanks!
------------------ 原始邮件 ------------------
Let me try。
------------------ 原始邮件 ------------------
And you should select msg.f1 from table.
Best
Jingsong Lee
|
Free forum by Nabble | Edit this page |