Need help on timestamp type conversion for Table API on Pravega Connector

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Need help on timestamp type conversion for Table API on Pravega Connector

B.Zhou

Hi community,

 

Pravega connector is a connector that provides both Batch and Streaming Table API implementation. We uses descriptor API to build Table source. When we plan to upgrade to Flink 1.10, we found the unit tests are not passing with our existing Batch Table API. There is a type conversion error in the Timestamp with our descriptor Table API. The detail is in the issue here: https://github.com/pravega/flink-connectors/issues/341 Hope someone from Flink community can help us with some suggestions on this issue. Thanks.

 

Best Regards,

Brian

 

Reply | Threaded
Open this post in threaded view
|

Re: Need help on timestamp type conversion for Table API on Pravega Connector

Till Rohrmann
Thanks for reporting this issue Brian. I'm not a Table API expert but I know that there is some work on the type system ongoing. I've pulled Timo and Jingsong into the conversation who might be able to tell you what exactly changed and whether the timestamp issue might be caused by the changes.

Cheers,
Till

On Mon, Mar 16, 2020 at 5:48 AM <[hidden email]> wrote:

Hi community,

 

Pravega connector is a connector that provides both Batch and Streaming Table API implementation. We uses descriptor API to build Table source. When we plan to upgrade to Flink 1.10, we found the unit tests are not passing with our existing Batch Table API. There is a type conversion error in the Timestamp with our descriptor Table API. The detail is in the issue here: https://github.com/pravega/flink-connectors/issues/341 Hope someone from Flink community can help us with some suggestions on this issue. Thanks.

 

Best Regards,

Brian

 

Reply | Threaded
Open this post in threaded view
|

Re: Need help on timestamp type conversion for Table API on Pravega Connector

Jark Wu-3


On Tue, 17 Mar 2020 at 18:05, Till Rohrmann <[hidden email]> wrote:
Thanks for reporting this issue Brian. I'm not a Table API expert but I know that there is some work on the type system ongoing. I've pulled Timo and Jingsong into the conversation who might be able to tell you what exactly changed and whether the timestamp issue might be caused by the changes.

Cheers,
Till

On Mon, Mar 16, 2020 at 5:48 AM <[hidden email]> wrote:

Hi community,

 

Pravega connector is a connector that provides both Batch and Streaming Table API implementation. We uses descriptor API to build Table source. When we plan to upgrade to Flink 1.10, we found the unit tests are not passing with our existing Batch Table API. There is a type conversion error in the Timestamp with our descriptor Table API. The detail is in the issue here: https://github.com/pravega/flink-connectors/issues/341 Hope someone from Flink community can help us with some suggestions on this issue. Thanks.

 

Best Regards,

Brian

 

Reply | Threaded
Open this post in threaded view
|

RE: Need help on timestamp type conversion for Table API on Pravega Connector

B.Zhou

Hi,

 

Thanks for the reference, Jark. In Pravega connector, user will define Schema first and then create the table with the descriptor using the schema, see [1] and error also came from this test case. We also tried the recommended `bridgedTo(Timestamp.class)` method in the schema construction, it came with the same error stack trace.

We are also considering switching to Blink planner implementation, do you think we can get this issue fixed with the change?

 

Here is the full stacktrace:

 

```

org.apache.flink.table.codegen.CodeGenException: Unsupported cast from 'LocalDateTime' to 'Long'.

 

               at org.apache.flink.table.codegen.calls.ScalarOperators$.generateCast(ScalarOperators.scala:815)

               at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:941)

               at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)

               at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)

               at org.apache.flink.table.codegen.CodeGenerator.$anonfun$visitCall$1(CodeGenerator.scala:752)

               at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

               at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)

               at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)

               at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

               at scala.collection.TraversableLike.map(TraversableLike.scala:233)

               at scala.collection.TraversableLike.map$(TraversableLike.scala:226)

               at scala.collection.AbstractTraversable.map(Traversable.scala:104)

               at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:742)

               at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)

               at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)

               at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)

               at org.apache.flink.table.codegen.CodeGenerator.$anonfun$generateConverterResultExpression$1(CodeGenerator.scala:273)

               at org.apache.flink.table.codegen.CodeGenerator.$anonfun$generateConverterResultExpression$1$adapted(CodeGenerator.scala:269)

               at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

               at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)

               at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)

               at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:242)

               at scala.collection.TraversableLike.map(TraversableLike.scala:233)

               at scala.collection.TraversableLike.map$(TraversableLike.scala:226)

               at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)

               at org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:269)

               at org.apache.flink.table.plan.nodes.dataset.BatchScan.generateConversionMapper(BatchScan.scala:95)

               at org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToInternalRow(BatchScan.scala:59)

               at org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToInternalRow$(BatchScan.scala:35)

               at org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.convertToInternalRow(BatchTableSourceScan.scala:45)

               at org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:165)

               at org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.translateToPlan(DataSetWindowAggregate.scala:114)

               at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:92)

               at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:306)

               at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:281)

               at org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:87)

               at io.pravega.connectors.flink.FlinkPravegaTableITCase.testTableSourceBatchDescriptor(FlinkPravegaTableITCase.java:349)

               at io.pravega.connectors.flink.FlinkPravegaTableITCase.testTableSourceUsingDescriptor(FlinkPravegaTableITCase.java:246)

               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

               at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

               at java.lang.reflect.Method.invoke(Method.java:498)

               at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)

               at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)

               at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)

               at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)

               at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)

               at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)

               at java.util.concurrent.FutureTask.run(FutureTask.java:266)

               at java.lang.Thread.run(Thread.java:748)

 

Process finished with exit code -1

```

 

[1] https://github.com/pravega/flink-connectors/blob/master/src/test/java/io/pravega/connectors/flink/FlinkPravegaTableITCase.java#L310

 

Best Regards,

Brian

 

From: Jark Wu <[hidden email]>
Sent: Thursday, March 19, 2020 20:25
To: Till Rohrmann
Cc: Zhou, Brian; Timo Walther; Jingsong Li; user
Subject: Re: Need help on timestamp type conversion for Table API on Pravega Connector

 

[EXTERNAL EMAIL]

This maybe a similar issue to [1], we continue the discussion there. 

 

Best,

Jark 

 

 

 

On Tue, 17 Mar 2020 at 18:05, Till Rohrmann <[hidden email]> wrote:

Thanks for reporting this issue Brian. I'm not a Table API expert but I know that there is some work on the type system ongoing. I've pulled Timo and Jingsong into the conversation who might be able to tell you what exactly changed and whether the timestamp issue might be caused by the changes.

 

Cheers,

Till

 

On Mon, Mar 16, 2020 at 5:48 AM <[hidden email]> wrote:

Hi community,

 

Pravega connector is a connector that provides both Batch and Streaming Table API implementation. We uses descriptor API to build Table source. When we plan to upgrade to Flink 1.10, we found the unit tests are not passing with our existing Batch Table API. There is a type conversion error in the Timestamp with our descriptor Table API. The detail is in the issue here: https://github.com/pravega/flink-connectors/issues/341 Hope someone from Flink community can help us with some suggestions on this issue. Thanks.

 

Best Regards,

Brian

 

Reply | Threaded
Open this post in threaded view
|

Re: Need help on timestamp type conversion for Table API on Pravega Connector

Timo Walther
This issue is tracked under:

https://issues.apache.org/jira/browse/FLINK-16693

Could you provide us a little reproducible example in the issue? I think
that could help us in resolving this issue quickly in the next minor
release.

Thanks,
Timo


On 20.03.20 03:28, [hidden email] wrote:

> Hi,
>
> Thanks for the reference, Jark. In Pravega connector, user will define
> Schema first and then create the table with the descriptor using the
> schema, see [1] and error also came from this test case. We also tried
> the recommended `bridgedTo(Timestamp.class)` method in the schema
> construction, it came with the same error stack trace.
>
> We are also considering switching to Blink planner implementation, do
> you think we can get this issue fixed with the change?
>
> Here is the full stacktrace:
>
> ```
>
> org.apache.flink.table.codegen.CodeGenException: Unsupported cast from
> 'LocalDateTime' to 'Long'.
>
>                 at
> org.apache.flink.table.codegen.calls.ScalarOperators$.generateCast(ScalarOperators.scala:815)
>
>                 at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:941)
>
>                 at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
>
>                 at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
>
>                 at
> org.apache.flink.table.codegen.CodeGenerator.$anonfun$visitCall$1(CodeGenerator.scala:752)
>
>                 at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
>                 at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
>
>                 at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
>
>                 at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
>                 at
> scala.collection.TraversableLike.map(TraversableLike.scala:233)
>
>                 at
> scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>
>                 at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
>                 at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:742)
>
>                 at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
>
>                 at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
>
>                 at
> org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
>
>                 at
> org.apache.flink.table.codegen.CodeGenerator.$anonfun$generateConverterResultExpression$1(CodeGenerator.scala:273)
>
>                 at
> org.apache.flink.table.codegen.CodeGenerator.$anonfun$generateConverterResultExpression$1$adapted(CodeGenerator.scala:269)
>
>                 at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
>                 at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
>
>                 at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
>
>                 at
> scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:242)
>
>                 at
> scala.collection.TraversableLike.map(TraversableLike.scala:233)
>
>                 at
> scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>
>                 at
> scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
>
>                 at
> org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:269)
>
>                 at
> org.apache.flink.table.plan.nodes.dataset.BatchScan.generateConversionMapper(BatchScan.scala:95)
>
>                 at
> org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToInternalRow(BatchScan.scala:59)
>
>                 at
> org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToInternalRow$(BatchScan.scala:35)
>
>                 at
> org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.convertToInternalRow(BatchTableSourceScan.scala:45)
>
>                 at
> org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:165)
>
>                 at
> org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.translateToPlan(DataSetWindowAggregate.scala:114)
>
>                 at
> org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:92)
>
>                 at
> org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:306)
>
>                 at
> org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:281)
>
>                 at
> org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:87)
>
>                 at
> io.pravega.connectors.flink.FlinkPravegaTableITCase.testTableSourceBatchDescriptor(FlinkPravegaTableITCase.java:349)
>
>                 at
> io.pravega.connectors.flink.FlinkPravegaTableITCase.testTableSourceUsingDescriptor(FlinkPravegaTableITCase.java:246)
>
>                 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
>                 at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>                 at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>                 at java.lang.reflect.Method.invoke(Method.java:498)
>
>                 at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>
>                 at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>
>                 at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>
>                 at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>
>                 at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>
>                 at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>
>                 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>                 at java.lang.Thread.run(Thread.java:748)
>
> Process finished with exit code -1
>
> ```
>
> [1]
> https://github.com/pravega/flink-connectors/blob/master/src/test/java/io/pravega/connectors/flink/FlinkPravegaTableITCase.java#L310
>
> Best Regards,
>
> Brian
>
> *From:* Jark Wu <[hidden email]>
> *Sent:* Thursday, March 19, 2020 20:25
> *To:* Till Rohrmann
> *Cc:* Zhou, Brian; Timo Walther; Jingsong Li; user
> *Subject:* Re: Need help on timestamp type conversion for Table API on
> Pravega Connector
>
> [EXTERNAL EMAIL]
>
> This maybe a similar issue to [1], we continue the discussion there.
>
> Best,
>
> Jark
>
> [1]:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-Timetamp-types-incompatible-after-migration-to-1-10-td33784.html#a33791
>
> On Tue, 17 Mar 2020 at 18:05, Till Rohrmann <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Thanks for reporting this issue Brian. I'm not a Table API expert
>     but I know that there is some work on the type system ongoing. I've
>     pulled Timo and Jingsong into the conversation who might be able to
>     tell you what exactly changed and whether the timestamp issue might
>     be caused by the changes.
>
>     Cheers,
>
>     Till
>
>     On Mon, Mar 16, 2020 at 5:48 AM <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi community,
>
>         Pravega connector is a connector that provides both Batch and
>         Streaming Table API implementation. We uses descriptor API to
>         build Table source. When we plan to upgrade to Flink 1.10, we
>         found the unit tests are not passing with our existing Batch
>         Table API. There is a type conversion error in the Timestamp
>         with our descriptor Table API. The detail is in the issue here:
>         https://github.com/pravega/flink-connectors/issues/341 Hope
>         someone from Flink community can help us with some suggestions
>         on this issue. Thanks.
>
>         Best Regards,
>
>         Brian
>

Reply | Threaded
Open this post in threaded view
|

RE: Need help on timestamp type conversion for Table API on Pravega Connector

B.Zhou
Hi,

Thanks for the information. I replied in the comment of this issue: https://issues.apache.org/jira/browse/FLINK-16693?focusedCommentId=17065486&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17065486 

Best Regards,
Brian

-----Original Message-----
From: Timo Walther <[hidden email]>
Sent: Tuesday, March 24, 2020 16:40
To: Zhou, Brian; [hidden email]
Cc: [hidden email]
Subject: Re: Need help on timestamp type conversion for Table API on Pravega Connector


[EXTERNAL EMAIL]

This issue is tracked under:

https://issues.apache.org/jira/browse/FLINK-16693

Could you provide us a little reproducible example in the issue? I think that could help us in resolving this issue quickly in the next minor release.

Thanks,
Timo


On 20.03.20 03:28, [hidden email] wrote:

> Hi,
>
> Thanks for the reference, Jark. In Pravega connector, user will define
> Schema first and then create the table with the descriptor using the
> schema, see [1] and error also came from this test case. We also tried
> the recommended `bridgedTo(Timestamp.class)` method in the schema
> construction, it came with the same error stack trace.
>
> We are also considering switching to Blink planner implementation, do
> you think we can get this issue fixed with the change?
>
> Here is the full stacktrace:
>
> ```
>
> org.apache.flink.table.codegen.CodeGenException: Unsupported cast from
> 'LocalDateTime' to 'Long'.
>
>                 at
> org.apache.flink.table.codegen.calls.ScalarOperators$.generateCast(Sca
> larOperators.scala:815)
>
>                 at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.s
> cala:941)
>
>                 at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.s
> cala:66)
>
>                 at
> org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
>
>                 at
> org.apache.flink.table.codegen.CodeGenerator.$anonfun$visitCall$1(Code
> Generator.scala:752)
>
>                 at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:
> 233)
>
>                 at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:5
> 8)
>
>                 at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:
> 51)
>
>                 at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
>                 at
> scala.collection.TraversableLike.map(TraversableLike.scala:233)
>
>                 at
> scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>
>                 at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
>                 at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.s
> cala:742)
>
>                 at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.s
> cala:66)
>
>                 at
> org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
>
>                 at
> org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGe
> nerator.scala:247)
>
>                 at
> org.apache.flink.table.codegen.CodeGenerator.$anonfun$generateConverte
> rResultExpression$1(CodeGenerator.scala:273)
>
>                 at
> org.apache.flink.table.codegen.CodeGenerator.$anonfun$generateConverte
> rResultExpression$1$adapted(CodeGenerator.scala:269)
>
>                 at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:
> 233)
>
>                 at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala
> :32)
>
>                 at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scal
> a:29)
>
>                 at
> scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:242)
>
>                 at
> scala.collection.TraversableLike.map(TraversableLike.scala:233)
>
>                 at
> scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>
>                 at
> scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
>
>                 at
> org.apache.flink.table.codegen.CodeGenerator.generateConverterResultEx
> pression(CodeGenerator.scala:269)
>
>                 at
> org.apache.flink.table.plan.nodes.dataset.BatchScan.generateConversion
> Mapper(BatchScan.scala:95)
>
>                 at
> org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToInternalR
> ow(BatchScan.scala:59)
>
>                 at
> org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToInternalR
> ow$(BatchScan.scala:35)
>
>                 at
> org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.convert
> ToInternalRow(BatchTableSourceScan.scala:45)
>
>                 at
> org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.transla
> teToPlan(BatchTableSourceScan.scala:165)
>
>                 at
> org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.trans
> lateToPlan(DataSetWindowAggregate.scala:114)
>
>                 at
> org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(
> DataSetCalc.scala:92)
>
>                 at
> org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchT
> ableEnvImpl.scala:306)
>
>                 at
> org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchT
> ableEnvImpl.scala:281)
>
>                 at
> org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl.toD
> ataSet(BatchTableEnvironmentImpl.scala:87)
>
>                 at
> io.pravega.connectors.flink.FlinkPravegaTableITCase.testTableSourceBat
> chDescriptor(FlinkPravegaTableITCase.java:349)
>
>                 at
> io.pravega.connectors.flink.FlinkPravegaTableITCase.testTableSourceUsi
> ngDescriptor(FlinkPravegaTableITCase.java:246)
>
>                 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
>                 at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.j
> ava:62)
>
>                 at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess
> orImpl.java:43)
>
>                 at java.lang.reflect.Method.invoke(Method.java:498)
>
>                 at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkM
> ethod.java:50)
>
>                 at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCall
> able.java:12)
>
>                 at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMet
> hod.java:47)
>
>                 at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMeth
> od.java:17)
>
>                 at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.
> call(FailOnTimeout.java:298)
>
>                 at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.
> call(FailOnTimeout.java:292)
>
>                 at
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>                 at java.lang.Thread.run(Thread.java:748)
>
> Process finished with exit code -1
>
> ```
>
> [1]
> https://github.com/pravega/flink-connectors/blob/master/src/test/java/
> io/pravega/connectors/flink/FlinkPravegaTableITCase.java#L310
>
> Best Regards,
>
> Brian
>
> *From:* Jark Wu <[hidden email]>
> *Sent:* Thursday, March 19, 2020 20:25
> *To:* Till Rohrmann
> *Cc:* Zhou, Brian; Timo Walther; Jingsong Li; user
> *Subject:* Re: Need help on timestamp type conversion for Table API on
> Pravega Connector
>
> [EXTERNAL EMAIL]
>
> This maybe a similar issue to [1], we continue the discussion there.
>
> Best,
>
> Jark
>
> [1]:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQ
> L-Timetamp-types-incompatible-after-migration-to-1-10-td33784.html#a33
> 791
>
> On Tue, 17 Mar 2020 at 18:05, Till Rohrmann <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Thanks for reporting this issue Brian. I'm not a Table API expert
>     but I know that there is some work on the type system ongoing. I've
>     pulled Timo and Jingsong into the conversation who might be able to
>     tell you what exactly changed and whether the timestamp issue might
>     be caused by the changes.
>
>     Cheers,
>
>     Till
>
>     On Mon, Mar 16, 2020 at 5:48 AM <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi community,
>
>         Pravega connector is a connector that provides both Batch and
>         Streaming Table API implementation. We uses descriptor API to
>         build Table source. When we plan to upgrade to Flink 1.10, we
>         found the unit tests are not passing with our existing Batch
>         Table API. There is a type conversion error in the Timestamp
>         with our descriptor Table API. The detail is in the issue here:
>         https://github.com/pravega/flink-connectors/issues/341 Hope
>         someone from Flink community can help us with some suggestions
>         on this issue. Thanks.
>
>         Best Regards,
>
>         Brian
>