|
---------- 转发信息 ----------
发件人: Danny Chan < [hidden email]>
日期: 2020年7月20日 +0800 PM4:51
收件人: Dongwon Kim < [hidden email]>
主题: Re: [Table API] how to configure a nested timestamp field
Or is it possible you pre-define a catalog there and register through the SQL CLI yaml ?
在 2020年7月20日 +0800 PM3:23,Dongwon Kim < [hidden email]>,写道:
Hi Leonard,
Unfortunately the answer is no, the YAML you defined will parse by Table API and then execute, the root cause of your post error is Table API does not support computed column now,
there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, I think DDL is recommended way since FLINK 1.11.0.
Okay, thanks a lot for your input.
I just tried out Flink SQL client and wanted to store pre-defined YAML files each declaring a source table from a Kafka topic.
As you advised, I have to manually enter DDL in the SQL client on FLINK 1.11.x
Best,
Dongwon
Hi, Kim
Hi Leonard,
Can I have a YAML definition corresponding to the DDL you suggested?
Unfortunately the answer is no, the YAML you defined will parse by Table API and then execute, the root cause of your post error is Table API does not support computed column now,
there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, I think DDL is recommended way since FLINK 1.11.0.
Best,
Leonard Xu
I tried below (Flink 1.11.0) but got some error:
tables:
- name: test
type: source-table
update-mode: append
connector:
property-version: 1
type: kafka
version: universal
topic: ...
properties:
bootstrap.servers: ...
group.id: ...
format:
property-version: 1
type: json
schema:
- name: type
data-type: STRING
- name: location
data-type: >
ROW<
id STRING,
lastUpdateTime BIGINT
>
- name: timestampCol
data-type: TIMESTAMP(3)
rowtime:
timestamps:
type: from-field
from: TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, 'yyyy-MM-dd HH:mm:ss'))
watermarks:
type: periodic-bounded
delay: 5000
SQL client doesn't complain about the file but, when I execute "SELECT timestampCol from test", the job fails with the following error message:
Caused by: java.lang.NullPointerException
at org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
at org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at SourceConversion$4.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Hi Leonard,
Wow, that's great! It works like a charm.
I've never considered this approach at all.
Thanks a lot.
Best,
Dongwon
Hi, Kim
The reason your attempts (2) and (3) failed is that the json format does not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT field and then use a computed column to extract TIMESTAMP field, you can also define the time attribute on TIMESTAMP filed for using time-based operations in Flink 1.10.1. But the computed column only support in pure DDL, the Table API lacks the support and should be aligned in 1.12 as I know.
The DDL syntax as following:
create table test (
`type` STRING,
`location` ROW<`id` STRING, lastUpdateTime BIGINT>,
timestampCol as TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd HH:mm:ss')), —computed column
WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
) with (
'connector' = '...',
'format' = 'json',
...
);
Best,
Leonard Xu
Hi,
I use Flink 1.10.1 and I want to use Table API to read JSON messages. The message looks like below.
{
"type":"Update",
"location":{
"id":"123e4567-e89b-12d3-a456-426652340000",
"lastUpdateTime":1593866161436
}
}
I wrote the following program just to see whether json messages are correctly parsed by Table API:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);
tEnv
.connect(
new Kafka()
.version("universal")
.topic(consumerTopic)
.startFromLatest()
.properties(consumerProperties)
)
.withFormat(new Json())
.withSchema(new Schema().schema(
TableSchema.builder()
.field("type", STRING())
.field("location",
ROW(
FIELD("id", STRING()),
// (1)
FIELD("lastUpdateTime", BIGINT())
// (2)
FIELD("lastUpdateTime", TIMESTAMP())
// (3)
FIELD("lastUpdateTime", TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
))
.build()
))
.createTemporaryTable("message");
tEnv.toAppendStream(tEnv.from("message"), Row.class)
.print();
Note that I tried BIGINT(), TIMESTAMP(), and TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
(1) it works fine but later I can't use time-based operations like windowing.
(2) it causes the following exception
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
... 38 more
(3) it causes the following exception
Caused by: java.time.format.DateTimeParseException: Text '1593868714814' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
... 7 more
Can I read such json messages with time information in Flink 1.10.1?
Thanks
Dongwon
|