[Table API] how to configure a nested timestamp field

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

[Table API] how to configure a nested timestamp field

Dongwon Kim-2
Hi,
I use Flink 1.10.1 and I want to use Table API to read JSON messages. The message looks like below.
    {
       "type":"Update",
       "location":{
          "id":"123e4567-e89b-12d3-a456-426652340000",
          "lastUpdateTime":1593866161436
       }
    }

I wrote the following program just to see whether json messages are correctly parsed by Table API:
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);
    tEnv
      .connect(
        new Kafka()
          .version("universal")
          .topic(consumerTopic)
          .startFromLatest()
          .properties(consumerProperties)
      )
      .withFormat(new Json())
      .withSchema(new Schema().schema(
        TableSchema.builder()
          .field("type", STRING())
          .field("location",
            ROW(
              FIELD("id", STRING()),
              // (1)
              FIELD("lastUpdateTime", BIGINT())
              // (2)
              FIELD("lastUpdateTime", TIMESTAMP())
              // (3)
              FIELD("lastUpdateTime", TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))

            ))
          .build()
      ))
      .createTemporaryTable("message");
    tEnv.toAppendStream(tEnv.from("message"), Row.class)
      .print();

Note that I tried BIGINT(), TIMESTAMP(), and TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
(1) it works fine but later I can't use time-based operations like windowing.

(2) it causes the following exception
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
... 38 more

(3) it causes the following exception
Caused by: java.time.format.DateTimeParseException: Text '1593868714814' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
... 7 more

Can I read such json messages with time information in Flink 1.10.1?

Thanks

Dongwon
Reply | Threaded
Open this post in threaded view
|

Re: [Table API] how to configure a nested timestamp field

Leonard Xu
Hi, Kim

The reason your attempts (2) and (3) failed is that the json format does not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT field and then use a computed column to extract TIMESTAMP field, you can also define the time attribute on TIMESTAMP filed for using time-based operations in Flink 1.10.1. But the computed column only support in pure DDL, the Table API lacks the support and should be aligned in 1.12 as I know.
The DDL syntax  as following:

create table test (
  `type` STRING,
  `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
   timestampCol as TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd HH:mm:ss')), —computed column
   WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
)   with (
  'connector' = '...',
  'format' = 'json',
  ...
);


Best,
Leonard Xu


在 2020年7月4日,21:21,Dongwon Kim <[hidden email]> 写道:

Hi,
I use Flink 1.10.1 and I want to use Table API to read JSON messages. The message looks like below.
    {
       "type":"Update",
       "location":{
          "id":"123e4567-e89b-12d3-a456-426652340000",
          "lastUpdateTime":1593866161436
       }
    }

I wrote the following program just to see whether json messages are correctly parsed by Table API:
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);
    tEnv
      .connect(
        new Kafka()
          .version("universal")
          .topic(consumerTopic)
          .startFromLatest()
          .properties(consumerProperties)
      )
      .withFormat(new Json())
      .withSchema(new Schema().schema(
        TableSchema.builder()
          .field("type", STRING())
          .field("location",
            ROW(
              FIELD("id", STRING()),
              // (1)
              FIELD("lastUpdateTime", BIGINT())
              // (2)
              FIELD("lastUpdateTime", TIMESTAMP())
              // (3)
              FIELD("lastUpdateTime", TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))

            ))
          .build()
      ))
      .createTemporaryTable("message");
    tEnv.toAppendStream(tEnv.from("message"), Row.class)
      .print();

Note that I tried BIGINT(), TIMESTAMP(), and TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
(1) it works fine but later I can't use time-based operations like windowing.

(2) it causes the following exception
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
... 38 more

(3) it causes the following exception
Caused by: java.time.format.DateTimeParseException: Text '1593868714814' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
... 7 more

Can I read such json messages with time information in Flink 1.10.1?

Thanks

Dongwon

Reply | Threaded
Open this post in threaded view
|

Re: [Table API] how to configure a nested timestamp field

Dongwon Kim-2
Hi Leonard,

Wow, that's great! It works like a charm.
I've never considered this approach at all.
Thanks a lot.

Best,
Dongwon

On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <[hidden email]> wrote:
Hi, Kim

The reason your attempts (2) and (3) failed is that the json format does not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT field and then use a computed column to extract TIMESTAMP field, you can also define the time attribute on TIMESTAMP filed for using time-based operations in Flink 1.10.1. But the computed column only support in pure DDL, the Table API lacks the support and should be aligned in 1.12 as I know.
The DDL syntax  as following:

create table test (
  `type` STRING,
  `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
   timestampCol as TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd HH:mm:ss')), —computed column
   WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
)   with (
  'connector' = '...',
  'format' = 'json',
  ...
);


Best,
Leonard Xu


在 2020年7月4日,21:21,Dongwon Kim <[hidden email]> 写道:

Hi,
I use Flink 1.10.1 and I want to use Table API to read JSON messages. The message looks like below.
    {
       "type":"Update",
       "location":{
          "id":"123e4567-e89b-12d3-a456-426652340000",
          "lastUpdateTime":1593866161436
       }
    }

I wrote the following program just to see whether json messages are correctly parsed by Table API:
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);
    tEnv
      .connect(
        new Kafka()
          .version("universal")
          .topic(consumerTopic)
          .startFromLatest()
          .properties(consumerProperties)
      )
      .withFormat(new Json())
      .withSchema(new Schema().schema(
        TableSchema.builder()
          .field("type", STRING())
          .field("location",
            ROW(
              FIELD("id", STRING()),
              // (1)
              FIELD("lastUpdateTime", BIGINT())
              // (2)
              FIELD("lastUpdateTime", TIMESTAMP())
              // (3)
              FIELD("lastUpdateTime", TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))

            ))
          .build()
      ))
      .createTemporaryTable("message");
    tEnv.toAppendStream(tEnv.from("message"), Row.class)
      .print();

Note that I tried BIGINT(), TIMESTAMP(), and TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
(1) it works fine but later I can't use time-based operations like windowing.

(2) it causes the following exception
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
... 38 more

(3) it causes the following exception
Caused by: java.time.format.DateTimeParseException: Text '1593868714814' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
... 7 more

Can I read such json messages with time information in Flink 1.10.1?

Thanks

Dongwon

Reply | Threaded
Open this post in threaded view
|

Re: [Table API] how to configure a nested timestamp field

Dongwon Kim-2
Hi Leonard,

Can I have a YAML definition corresponding to the DDL you suggested?

I tried below (Flink 1.11.0) but got some error:
tables:
  - name: test
    type: source-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: universal
      topic: ...
      properties:
        bootstrap.servers: ...
        group.id: ...
    format:
      property-version: 1
      type: json
    schema:
      - name: type
        data-type: STRING
      - name: location
        data-type: >
          ROW<
            id STRING,
            lastUpdateTime BIGINT
          >
      - name: timestampCol
        data-type: TIMESTAMP(3)
        rowtime:
          timestamps:
            type: from-field
            from: TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, 'yyyy-MM-dd HH:mm:ss'))
          watermarks:
            type: periodic-bounded
            delay: 5000

SQL client doesn't complain about the file but, when I execute "SELECT timestampCol from test", the job fails with the following error message:
Caused by: java.lang.NullPointerException
at org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
at org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at SourceConversion$4.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)

On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim <[hidden email]> wrote:
Hi Leonard,

Wow, that's great! It works like a charm.
I've never considered this approach at all.
Thanks a lot.

Best,
Dongwon

On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <[hidden email]> wrote:
Hi, Kim

The reason your attempts (2) and (3) failed is that the json format does not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT field and then use a computed column to extract TIMESTAMP field, you can also define the time attribute on TIMESTAMP filed for using time-based operations in Flink 1.10.1. But the computed column only support in pure DDL, the Table API lacks the support and should be aligned in 1.12 as I know.
The DDL syntax  as following:

create table test (
  `type` STRING,
  `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
   timestampCol as TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd HH:mm:ss')), —computed column
   WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
)   with (
  'connector' = '...',
  'format' = 'json',
  ...
);


Best,
Leonard Xu


在 2020年7月4日,21:21,Dongwon Kim <[hidden email]> 写道:

Hi,
I use Flink 1.10.1 and I want to use Table API to read JSON messages. The message looks like below.
    {
       "type":"Update",
       "location":{
          "id":"123e4567-e89b-12d3-a456-426652340000",
          "lastUpdateTime":1593866161436
       }
    }

I wrote the following program just to see whether json messages are correctly parsed by Table API:
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);
    tEnv
      .connect(
        new Kafka()
          .version("universal")
          .topic(consumerTopic)
          .startFromLatest()
          .properties(consumerProperties)
      )
      .withFormat(new Json())
      .withSchema(new Schema().schema(
        TableSchema.builder()
          .field("type", STRING())
          .field("location",
            ROW(
              FIELD("id", STRING()),
              // (1)
              FIELD("lastUpdateTime", BIGINT())
              // (2)
              FIELD("lastUpdateTime", TIMESTAMP())
              // (3)
              FIELD("lastUpdateTime", TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))

            ))
          .build()
      ))
      .createTemporaryTable("message");
    tEnv.toAppendStream(tEnv.from("message"), Row.class)
      .print();

Note that I tried BIGINT(), TIMESTAMP(), and TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
(1) it works fine but later I can't use time-based operations like windowing.

(2) it causes the following exception
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
... 38 more

(3) it causes the following exception
Caused by: java.time.format.DateTimeParseException: Text '1593868714814' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
... 7 more

Can I read such json messages with time information in Flink 1.10.1?

Thanks

Dongwon

Reply | Threaded
Open this post in threaded view
|

Re: [Table API] how to configure a nested timestamp field

Leonard Xu
Hi, Kim

Hi Leonard,

Can I have a YAML definition corresponding to the DDL you suggested?

Unfortunately the answer is no, the YAML you defined will parse by Table API and then execute, the root cause of your post error is Table API does not support computed column now, 

there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, I think DDL is recommended way since FLINK 1.11.0.

Best,
Leonard Xu

在 2020年7月20日,14:30,Dongwon Kim <[hidden email]> 写道:


I tried below (Flink 1.11.0) but got some error:
tables:
  - name: test
    type: source-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: universal
      topic: ...
      properties:
        bootstrap.servers: ...
        group.id: ...
    format:
      property-version: 1
      type: json
    schema:
      - name: type
        data-type: STRING
      - name: location
        data-type: >
          ROW<
            id STRING,
            lastUpdateTime BIGINT
          >
      - name: timestampCol
        data-type: TIMESTAMP(3)
        rowtime:
          timestamps:
            type: from-field
            from: TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, 'yyyy-MM-dd HH:mm:ss'))
          watermarks:
            type: periodic-bounded
            delay: 5000

SQL client doesn't complain about the file but, when I execute "SELECT timestampCol from test", the job fails with the following error message:
Caused by: java.lang.NullPointerException
at org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
at org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at SourceConversion$4.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)

On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim <[hidden email]> wrote:
Hi Leonard,

Wow, that's great! It works like a charm.
I've never considered this approach at all.
Thanks a lot.

Best,
Dongwon

On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <[hidden email]> wrote:
Hi, Kim

The reason your attempts (2) and (3) failed is that the json format does not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT field and then use a computed column to extract TIMESTAMP field, you can also define the time attribute on TIMESTAMP filed for using time-based operations in Flink 1.10.1. But the computed column only support in pure DDL, the Table API lacks the support and should be aligned in 1.12 as I know.
The DDL syntax  as following:

create table test (
  `type` STRING,
  `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
   timestampCol as TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd HH:mm:ss')), —computed column
   WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
)   with (
  'connector' = '...',
  'format' = 'json',
  ...
);


Best,
Leonard Xu


在 2020年7月4日,21:21,Dongwon Kim <[hidden email]> 写道:

Hi,
I use Flink 1.10.1 and I want to use Table API to read JSON messages. The message looks like below.
    {
       "type":"Update",
       "location":{
          "id":"123e4567-e89b-12d3-a456-426652340000",
          "lastUpdateTime":1593866161436
       }
    }

I wrote the following program just to see whether json messages are correctly parsed by Table API:
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);
    tEnv
      .connect(
        new Kafka()
          .version("universal")
          .topic(consumerTopic)
          .startFromLatest()
          .properties(consumerProperties)
      )
      .withFormat(new Json())
      .withSchema(new Schema().schema(
        TableSchema.builder()
          .field("type", STRING())
          .field("location",
            ROW(
              FIELD("id", STRING()),
              // (1)
              FIELD("lastUpdateTime", BIGINT())
              // (2)
              FIELD("lastUpdateTime", TIMESTAMP())
              // (3)
              FIELD("lastUpdateTime", TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))

            ))
          .build()
      ))
      .createTemporaryTable("message");
    tEnv.toAppendStream(tEnv.from("message"), Row.class)
      .print();

Note that I tried BIGINT(), TIMESTAMP(), and TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
(1) it works fine but later I can't use time-based operations like windowing.

(2) it causes the following exception
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
... 38 more

(3) it causes the following exception
Caused by: java.time.format.DateTimeParseException: Text '1593868714814' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
... 7 more

Can I read such json messages with time information in Flink 1.10.1?

Thanks

Dongwon


Reply | Threaded
Open this post in threaded view
|

Re: [Table API] how to configure a nested timestamp field

Dongwon Kim-2
Hi Leonard,

Unfortunately the answer is no, the YAML you defined will parse by Table API and then execute, the root cause of your post error is Table API does not support computed column now, 
there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, I think DDL is recommended way since FLINK 1.11.0.
Okay, thanks a lot for your input. 

I just tried out Flink SQL client and wanted to store pre-defined YAML files each declaring a source table from a Kafka topic.
As you advised, I have to manually enter DDL in the SQL client on FLINK 1.11.x

Best,

Dongwon


On Mon, Jul 20, 2020 at 3:59 PM Leonard Xu <[hidden email]> wrote:
Hi, Kim

Hi Leonard,

Can I have a YAML definition corresponding to the DDL you suggested?

Unfortunately the answer is no, the YAML you defined will parse by Table API and then execute, the root cause of your post error is Table API does not support computed column now, 

there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, I think DDL is recommended way since FLINK 1.11.0.

Best,
Leonard Xu

在 2020年7月20日,14:30,Dongwon Kim <[hidden email]> 写道:


I tried below (Flink 1.11.0) but got some error:
tables:
  - name: test
    type: source-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: universal
      topic: ...
      properties:
        bootstrap.servers: ...
        group.id: ...
    format:
      property-version: 1
      type: json
    schema:
      - name: type
        data-type: STRING
      - name: location
        data-type: >
          ROW<
            id STRING,
            lastUpdateTime BIGINT
          >
      - name: timestampCol
        data-type: TIMESTAMP(3)
        rowtime:
          timestamps:
            type: from-field
            from: TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, 'yyyy-MM-dd HH:mm:ss'))
          watermarks:
            type: periodic-bounded
            delay: 5000

SQL client doesn't complain about the file but, when I execute "SELECT timestampCol from test", the job fails with the following error message:
Caused by: java.lang.NullPointerException
at org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
at org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at SourceConversion$4.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)

On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim <[hidden email]> wrote:
Hi Leonard,

Wow, that's great! It works like a charm.
I've never considered this approach at all.
Thanks a lot.

Best,
Dongwon

On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <[hidden email]> wrote:
Hi, Kim

The reason your attempts (2) and (3) failed is that the json format does not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT field and then use a computed column to extract TIMESTAMP field, you can also define the time attribute on TIMESTAMP filed for using time-based operations in Flink 1.10.1. But the computed column only support in pure DDL, the Table API lacks the support and should be aligned in 1.12 as I know.
The DDL syntax  as following:

create table test (
  `type` STRING,
  `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
   timestampCol as TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd HH:mm:ss')), —computed column
   WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
)   with (
  'connector' = '...',
  'format' = 'json',
  ...
);


Best,
Leonard Xu


在 2020年7月4日,21:21,Dongwon Kim <[hidden email]> 写道:

Hi,
I use Flink 1.10.1 and I want to use Table API to read JSON messages. The message looks like below.
    {
       "type":"Update",
       "location":{
          "id":"123e4567-e89b-12d3-a456-426652340000",
          "lastUpdateTime":1593866161436
       }
    }

I wrote the following program just to see whether json messages are correctly parsed by Table API:
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);
    tEnv
      .connect(
        new Kafka()
          .version("universal")
          .topic(consumerTopic)
          .startFromLatest()
          .properties(consumerProperties)
      )
      .withFormat(new Json())
      .withSchema(new Schema().schema(
        TableSchema.builder()
          .field("type", STRING())
          .field("location",
            ROW(
              FIELD("id", STRING()),
              // (1)
              FIELD("lastUpdateTime", BIGINT())
              // (2)
              FIELD("lastUpdateTime", TIMESTAMP())
              // (3)
              FIELD("lastUpdateTime", TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))

            ))
          .build()
      ))
      .createTemporaryTable("message");
    tEnv.toAppendStream(tEnv.from("message"), Row.class)
      .print();

Note that I tried BIGINT(), TIMESTAMP(), and TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
(1) it works fine but later I can't use time-based operations like windowing.

(2) it causes the following exception
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
... 38 more

(3) it causes the following exception
Caused by: java.time.format.DateTimeParseException: Text '1593868714814' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
... 7 more

Can I read such json messages with time information in Flink 1.10.1?

Thanks

Dongwon


Reply | Threaded
Open this post in threaded view
|

Fwd: Re: [Table API] how to configure a nested timestamp field

Danny Chan

Best,
Danny Chan
---------- 转发信息 ----------
发件人: Danny Chan <[hidden email]>
日期: 2020年7月20日 +0800 PM4:51
收件人: Dongwon Kim <[hidden email]>
主题: Re: [Table API] how to configure a nested timestamp field

Or is it possible you pre-define a catalog there and register through the SQL CLI yaml ?

Best,
Danny Chan
在 2020年7月20日 +0800 PM3:23,Dongwon Kim <[hidden email]>,写道:
Hi Leonard,

Unfortunately the answer is no, the YAML you defined will parse by Table API and then execute, the root cause of your post error is Table API does not support computed column now, 
there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, I think DDL is recommended way since FLINK 1.11.0.
Okay, thanks a lot for your input. 

I just tried out Flink SQL client and wanted to store pre-defined YAML files each declaring a source table from a Kafka topic.
As you advised, I have to manually enter DDL in the SQL client on FLINK 1.11.x

Best,

Dongwon


On Mon, Jul 20, 2020 at 3:59 PM Leonard Xu <[hidden email]> wrote:
Hi, Kim

Hi Leonard,

Can I have a YAML definition corresponding to the DDL you suggested?

Unfortunately the answer is no, the YAML you defined will parse by Table API and then execute, the root cause of your post error is Table API does not support computed column now, 

there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, I think DDL is recommended way since FLINK 1.11.0.

Best,
Leonard Xu

在 2020年7月20日,14:30,Dongwon Kim <[hidden email]> 写道:


I tried below (Flink 1.11.0) but got some error:
tables:
  - name: test
    type: source-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: universal
      topic: ...
      properties:
        bootstrap.servers: ...
        group.id: ...
    format:
      property-version: 1
      type: json
    schema:
      - name: type
        data-type: STRING
      - name: location
        data-type: >
          ROW<
            id STRING,
            lastUpdateTime BIGINT
          >
      - name: timestampCol
        data-type: TIMESTAMP(3)
        rowtime:
          timestamps:
            type: from-field
            from: TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, 'yyyy-MM-dd HH:mm:ss'))
          watermarks:
            type: periodic-bounded
            delay: 5000

SQL client doesn't complain about the file but, when I execute "SELECT timestampCol from test", the job fails with the following error message:
Caused by: java.lang.NullPointerException
at org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
at org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at SourceConversion$4.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)

On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim <[hidden email]> wrote:
Hi Leonard,

Wow, that's great! It works like a charm.
I've never considered this approach at all.
Thanks a lot.

Best,
Dongwon

On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <[hidden email]> wrote:
Hi, Kim

The reason your attempts (2) and (3) failed is that the json format does not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT field and then use a computed column to extract TIMESTAMP field, you can also define the time attribute on TIMESTAMP filed for using time-based operations in Flink 1.10.1. But the computed column only support in pure DDL, the Table API lacks the support and should be aligned in 1.12 as I know.
The DDL syntax  as following:

create table test (
  `type` STRING,
  `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
   timestampCol as TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd HH:mm:ss')), —computed column
   WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
)   with (
  'connector' = '...',
  'format' = 'json',
  ...
);


Best,
Leonard Xu


在 2020年7月4日,21:21,Dongwon Kim <[hidden email]> 写道:

Hi,
I use Flink 1.10.1 and I want to use Table API to read JSON messages. The message looks like below.
    {
       "type":"Update",
       "location":{
          "id":"123e4567-e89b-12d3-a456-426652340000",
          "lastUpdateTime":1593866161436
       }
    }

I wrote the following program just to see whether json messages are correctly parsed by Table API:
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);
    tEnv
      .connect(
        new Kafka()
          .version("universal")
          .topic(consumerTopic)
          .startFromLatest()
          .properties(consumerProperties)
      )
      .withFormat(new Json())
      .withSchema(new Schema().schema(
        TableSchema.builder()
          .field("type", STRING())
          .field("location",
            ROW(
              FIELD("id", STRING()),
              // (1)
              FIELD("lastUpdateTime", BIGINT())
              // (2)
              FIELD("lastUpdateTime", TIMESTAMP())
              // (3)
              FIELD("lastUpdateTime", TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))

            ))
          .build()
      ))
      .createTemporaryTable("message");
    tEnv.toAppendStream(tEnv.from("message"), Row.class)
      .print();

Note that I tried BIGINT(), TIMESTAMP(), and TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
(1) it works fine but later I can't use time-based operations like windowing.

(2) it causes the following exception
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
... 38 more

(3) it causes the following exception
Caused by: java.time.format.DateTimeParseException: Text '1593868714814' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
... 7 more

Can I read such json messages with time information in Flink 1.10.1?

Thanks

Dongwon