Hi all,
I read json data from kafka, and print to console. When I do this, some error occurs when time/timestamp deserialization. json data in Kafka: ``` { "server_date": "2019-07-09", "server_time": "14:02:00", "reqsndtime_c": "2019-07-09 02:02:00.040" } ``` flink code: ``` bsTableEnv.connect( new Kafka() .version("universal") .topic("xxx") .property("bootstrap.servers", "localhost:9092") .property("zookeeper.connect", "localhost:2181") .property("group.id", "g1") .startFromEarliest() ).withFormat( new Json() .failOnMissingField(false) ).withSchema( new Schema() .field("server_date", DataTypes.DATE()) .field("server_time", DataTypes.TIME()) .field("reqsndtime_c", DataTypes.TIMESTAMP(3)) ).inAppendMode() .createTemporaryTable("xxx”); ``` server_date with format is ok, but server_time with DataTypes.DATE() and reqsndtime_c with DataTypes.TIMESTAMP(3) cause error. If I change them to DataTypes.STRING(), everything will be OK. Error message: ``` Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) at org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643) at cn.com.agree.Main.main(Main.java:122) Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110) ... 31 more Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: java.io.IOException: Failed to deserialize JSON object. at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) Caused by: java.time.format.DateTimeParseException: Text '14:02:00' could not be parsed at index 8 at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalTime(JsonRowDeserializationSchema.java:390) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) ... 7 more Process finished with exit code 1 ``` reqsndtime_c with DataTypes.TIMESTAMP(3) has similar exception. I see the doc, DataTypes.TIME() value range is from {@code 00:00:00} to {@code 23:59:59} , DataTypes.TIMESTAMP value range is from {@code 0000-01-01 00:00:00.000000000} to * {@code 9999-12-31 23:59:59.999999999}. And my value is in the range, I don’t know why. And I see this may be bug in java 8, I change jdk to 11, error still occurs. Can someone give me some help, thanks in advance. |
By the way, my flink version is 1.10.0. Original Message Sender: Outlook<[hidden email]> Recipient: user<[hidden email]> Date: Tuesday, Feb 25, 2020 17:43 Subject: TIME/TIMESTAMP parse in Flink TABLE/SQL API Hi all, I read json data from kafka, and print to console. When I do this, some error occurs when time/timestamp deserialization. json data in Kafka: ``` { "server_date": "2019-07-09", "server_time": "14:02:00", "reqsndtime_c": "2019-07-09 02:02:00.040" } ``` flink code: ``` bsTableEnv.connect( new Kafka() .version("universal") .topic("xxx") .property("bootstrap.servers", "localhost:9092") .property("zookeeper.connect", "localhost:2181") .property("group.id", "g1") .startFromEarliest() ).withFormat( new Json() .failOnMissingField(false) ).withSchema( new Schema() .field("server_date", DataTypes.DATE()) .field("server_time", DataTypes.TIME()) .field("reqsndtime_c", DataTypes.TIMESTAMP(3)) ).inAppendMode() .createTemporaryTable("xxx”); ``` server_date with format is ok, but server_time with DataTypes.DATE() and reqsndtime_c with DataTypes.TIMESTAMP(3) cause error. If I change them to DataTypes.STRING(), everything will be OK. Error message: ``` Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) at org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643) at cn.com.agree.Main.main(Main.java:122) Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110) ... 31 more Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: java.io.IOException: Failed to deserialize JSON object. at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) Caused by: java.time.format.DateTimeParseException: Text '14:02:00' could not be parsed at index 8 at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalTime(JsonRowDeserializationSchema.java:390) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) ... 7 more Process finished with exit code 1 ``` reqsndtime_c with DataTypes.TIMESTAMP(3) has similar exception. I see the doc, DataTypes.TIME() value range is from {@code 00:00:00} to {@code 23:59:59} , DataTypes.TIMESTAMP value range is from {@code 0000-01-01 00:00:00.000000000} to * {@code 9999-12-31 23:59:59.999999999}. And my value is in the range, I don’t know why. And I see this may be bug in java 8, I change jdk to 11, error still occurs. Can someone give me some help, thanks in advance. |
hi, I find that JsonRowDeserializationSchema only supports date-time with timezone according to RFC 3339. So you need add timezone to time data (like 14:02:00Z) and timestamp data(2019-07-09T02:02:00.040Z). Hope it can help you. Bests, godfrey Outlook <[hidden email]> 于2020年2月25日周二 下午5:49写道:
|
Hi,Outlook
Godfrey is right, you should follow the json format[1] when you parse your json message. You can use following code to produce a json data-time String. ``` Long time = System.currentTimeMillis(); ```
|
In reply to this post by godfrey he
Thanks Godfrey and Leonard, I tried your answers, result is OK. BTW, I think if only accept such format for a long time, the TIME and TIMESTAMP methods' doc in `org.apache.flink.table.api.DataTypes` may be better to update, because the document now is not what the method really support. For example, ``` /** * Data type of a time WITHOUT time zone {@code TIME} with no fractional seconds by default. * * <p>An instance consists of {@code hour:minute:second} with up to second precision * and values ranging from {@code 00:00:00} to {@code 23:59:59}. * * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the * semantics are closer to {@link java.time.LocalTime}. A time WITH time zone is not provided. * * @see #TIME(int) * @see TimeType */ public static DataType TIME() { return new AtomicDataType(new TimeType()); }``` Thanks again. Original Message Sender: Leonard Xu<[hidden email]> Recipient: godfrey he<[hidden email]> Cc: Outlook<[hidden email]>; user<[hidden email]> Date: Tuesday, Feb 25, 2020 22:56 Subject: Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API Hi,Outlook
Godfrey is right, you should follow the json format[1] when you parse your json message. You can use following code to produce a json data-time String. ``` Long time = System.currentTimeMillis(); ```
|
Hi Outlook, The explanation in DataTypes is correct, it is compliant to SQL standard. The problem is that JsonRowDeserializationSchema only support RFC-3339. On the other hand, CsvRowDeserializationSchema supports to parse "2019-07-09 02:02:00.040". So the question is shall we insist on the RFC-3339 "standard"? Shall we loosen it for usability? What do you think [hidden email] ? Best, Jark On Wed, 26 Feb 2020 at 09:29, Outlook <[hidden email]> wrote:
|
In reply to this post by godfrey he
Yes, these Types definition are general. As a user/developer, I would support “loosen it for usability”. If not, may add some explanation about JSON. Original Message Sender: Jark Wu<[hidden email]> Recipient: Outlook<[hidden email]>; Dawid Wysakowicz<[hidden email]> Date: Wednesday, Feb 26, 2020 09:55 Subject: Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API Hi Outlook, The explanation in DataTypes is correct, it is compliant to SQL standard. The problem is that JsonRowDeserializationSchema only support RFC-3339. On the other hand, CsvRowDeserializationSchema supports to parse "2019-07-09 02:02:00.040". So the question is shall we insist on the RFC-3339 "standard"? Shall we loosen it for usability? What do you think [hidden email] ? Best, Jark On Wed, 26 Feb 2020 at 09:29, Outlook <[hidden email]> wrote:
|
Yes, I'm also in favor of loosen the datetime format constraint. I guess most of the users don't know there is a JSON standard which follows RFC 3339. Best, Jark On Wed, 26 Feb 2020 at 10:06, NiYanchun <[hidden email]> wrote:
|
Hi all, @NiYanchun Thank you for reporting this. Yes I think we could
improve the behaviour of the JSON format. @Jark First of all I do agree we could/should improve the
"user-friendliness" of the JSON format (and unify the behavior
across text based formats). I am not sure though if it is as
simple as just ignore the time zone here. My suggestion would be rather to apply the logic of parsing a SQL
timestamp literal (if the expected type is of
LogicalTypeFamily.TIMESTAMP), which would actually also derive the
"stored" type of the timestamp (either WITHOUT TIMEZONE or WITH
TIMEZONE) and then apply a proper sql conversion. Therefore if the
parsed type | requested type | behaviour WITHOUT TIMEZONE | WITH TIMEZONE | store the
local timezone with the data WITHOUT TIMEZONE | WITH LOCAL TIMEZONE | do nothing in
the data, interpret the time in local timezone WITH TIMEZONE | WITH LOCAL TIMEZONE | convert the timestamp to local timezone and drop the time zone information WITH TIMEZONE | WITHOUT TIMEZONE | drop the time zone information It might just boil down to what you said "being more lenient with regards to parsing the time zone". Nevertheless I think this way it is a bit better defined behaviour, especially as it has a defined behaviour when converting between representation with or without time zone. An implementation note. I think we should aim to base the
implementation on the DataTypes already rather than going back to
the TypeInformation. I would still try to leave the RFC 3339 compatibility mode, but
maybe for that mode it would make sense to not support any types
WITHOUT TIMEZONE? This would be enabled with a switch (disabled by
default). As I understand the RFC, making the time zone mandatory
is actually a big part of the standard as it makes time types
unambiguous. What do you think? Ps. I cross posted this on the dev ML. Best, Dawid
On 26/02/2020 03:45, Jark Wu wrote:
signature.asc (849 bytes) Download Attachment |
Hi Dawid, I agree with you. If we want to loosen the format constraint, the important piece is the conversion matrix. The conversion matrix you listed makes sense to me. From my understanding, there should be 6 combination. We can add WITHOUT TIMEZONE => WITHOUT TIMEZONE and WITH TIMEZONE => WITH TIMEZONE to make the matrix complete. When the community reach an agreement on this, we should write it down on the documentation and follow the matrix in all text-based formats. Regarding to the RFC 3339 compatibility mode switch, it also sounds good to me. Best, Jark On Wed, 26 Feb 2020 at 17:44, Dawid Wysakowicz <[hidden email]> wrote:
|
Thanks all for your discussion. +1 to apply the logic of parsing a SQL timestamp literal. I don't fully understand the matrix your list. Should this be the semantics of SQL cast? Do you mean this is implicit cast in JSON parser?I doubt that because these implicit casts are not support in LogicalTypeCasts. And it is not so good to understand when it occur silently. How about add "timestampFormat" property to JSON parser? Its default value is SQL timestamp literal format. And user can configure this. Best, Jingsong Lee On Wed, Feb 26, 2020 at 6:39 PM Jark Wu <[hidden email]> wrote: Hi Dawid, Best, Jingsong Lee |
Hi Jingsong, I don't think it should follow SQL CAST semantics, because it is out of SQL, it happens in connectors which converts users'/external's format into SQL types. I also doubt "timestampFormat" may not work in some cases, because the timestamp format maybe various and mixed in a topic. Best, Jark On Wed, 26 Feb 2020 at 22:20, Jingsong Li <[hidden email]> wrote:
|
Hi Jark, The matrix I see is SQL cast. If we need bring another conversion matrix that is different from SQL cast, I don't understand the benefits. It makes me difficult to understand. And It seems bad to change the timestamp of different time zones to the same value silently. What way do you think can solve all the problems? Best, Jingsong Lee On Wed, Feb 26, 2020 at 10:45 PM Jark Wu <[hidden email]> wrote:
Best, Jingsong Lee |
Hi Jingsong, Dawid, I created https://issues.apache.org/jira/browse/FLINK-16725 to track this issue. We can continue discussion there. Best, Jark On Thu, 27 Feb 2020 at 10:32, Jingsong Li <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |