[Table API] ClassCastException when converting a table to DataStream<Row>

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

[Table API] ClassCastException when converting a table to DataStream<Row>

Dongwon Kim-2
Hello,

Consider the following snippet:
    Table sourceTable = getKafkaSource0(tEnv);
    DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
      .map(a -> a)
      .returns(sourceTable.getSchema().toRowType());

    stream.print();
where sourceTable.printSchema() shows:
root
 |-- time1: TimeIndicatorTypeInfo(rowtime)


 This program returns the following exception:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
at app.metatron.test.Main2.main(Main2.java:231)
Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long
at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
...

The row serializer seems to try to deep-copy an instance of java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
Could anybody help me?

Best,

- Dongwon

p.s. though removing .returns() makes everything okay, I need to do that as I want to convert DataStream<Row> into another table later.
p.s. the source table is created as follows:

private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
    ConnectorDescriptor connectorDescriptor = new Kafka()
      .version("universal")
      .topic("mytopic")
      .property("bootstrap.servers", "localhost:9092")
      .property("group.id", "mygroup")
      .startFromEarliest();
    FormatDescriptor formatDescriptor = new Csv()
      .deriveSchema()
      .ignoreParseErrors()
      .fieldDelimiter(',');
    Schema schemaDescriptor = new Schema()     
      .field("time1", SQL_TIMESTAMP())
      .rowtime(
        new Rowtime()
          .timestampsFromField("rowTime")
          .watermarksPeriodicBounded(100)
      );
    tEnv.connect(connectorDescriptor)
      .withFormat(formatDescriptor)
      .withSchema(schemaDescriptor)
      .inAppendMode()
      .registerTableSource("mysrc");
    return tEnv.scan("mysrc");
  }
Reply | Threaded
Open this post in threaded view
|

Re: [Table API] ClassCastException when converting a table to DataStream<Row>

Rong Rong
Hi Dongwon,

Can you provide a bit more information: 
which Flink version are you using? 
what is the "sourceTable.getSchema().toRowType()" return?
what is the line ".map(a -> a)" do and can you remove it?
if I am understanding correctly, you are also using "time1" as the rowtime, is that want your intension is to use it later as well?

As far as I know ".returns(sourceTable.getSchema().toRowType());" only adds a type information hint about the return type of this operator. It is used in cases where Flink cannot determine automatically[1]. 

Thanks,
Rong

--


On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <[hidden email]> wrote:
Hello,

Consider the following snippet:
    Table sourceTable = getKafkaSource0(tEnv);
    DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
      .map(a -> a)
      .returns(sourceTable.getSchema().toRowType());

    stream.print();
where sourceTable.printSchema() shows:
root
 |-- time1: TimeIndicatorTypeInfo(rowtime)


 This program returns the following exception:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
at app.metatron.test.Main2.main(Main2.java:231)
Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long
at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
...

The row serializer seems to try to deep-copy an instance of java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
Could anybody help me?

Best,

- Dongwon

p.s. though removing .returns() makes everything okay, I need to do that as I want to convert DataStream<Row> into another table later.
p.s. the source table is created as follows:

private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
    ConnectorDescriptor connectorDescriptor = new Kafka()
      .version("universal")
      .topic("mytopic")
      .property("bootstrap.servers", "localhost:9092")
      .property("group.id", "mygroup")
      .startFromEarliest();
    FormatDescriptor formatDescriptor = new Csv()
      .deriveSchema()
      .ignoreParseErrors()
      .fieldDelimiter(',');
    Schema schemaDescriptor = new Schema()     
      .field("time1", SQL_TIMESTAMP())
      .rowtime(
        new Rowtime()
          .timestampsFromField("rowTime")
          .watermarksPeriodicBounded(100)
      );
    tEnv.connect(connectorDescriptor)
      .withFormat(formatDescriptor)
      .withSchema(schemaDescriptor)
      .inAppendMode()
      .registerTableSource("mysrc");
    return tEnv.scan("mysrc");
  }
Reply | Threaded
Open this post in threaded view
|

Re: [Table API] ClassCastException when converting a table to DataStream<Row>

Dongwon Kim-2
Hi Rong,

Thank you for reply :-)

which Flink version are you using? 
I'm using Flink-1.8.0.

what is the "sourceTable.getSchema().toRowType()" return?
Row(time1: TimeIndicatorTypeInfo(rowtime))

what is the line ".map(a -> a)" do and can you remove it?
".map(a->a)" is just to illustrate a problem.
My actual code contains a process function (instead of .map() in the snippet) which appends a new field containing watermark to a row.
If there were ways to get watermark inside a scalar UDF, I wouldn't convert table to datastream and vice versa.

if I am understanding correctly, you are also using "time1" as the rowtime, is that want your intension is to use it later as well?
yup :-) 

As far as I know ".returns(sourceTable.getSchema().toRowType());" only adds a type information hint about the return type of this operator. It is used in cases where Flink cannot determine automatically[1]. 
The reason why I specify ".returns(sourceTable.getSchema().toRowType());" is to give a type information hint as you said.
That is needed later when I need to make another table like
   "Table anotherTable = tEnv.fromDataStream(stream);",
Without the type information hint, I've got an error 
   "An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo."
That's why I give a type information hint in that way.

Best,

Dongwon

On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <[hidden email]> wrote:
Hi Dongwon,

Can you provide a bit more information: 
which Flink version are you using? 
what is the "sourceTable.getSchema().toRowType()" return?
what is the line ".map(a -> a)" do and can you remove it?
if I am understanding correctly, you are also using "time1" as the rowtime, is that want your intension is to use it later as well?

As far as I know ".returns(sourceTable.getSchema().toRowType());" only adds a type information hint about the return type of this operator. It is used in cases where Flink cannot determine automatically[1]. 

Thanks,
Rong

--


On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <[hidden email]> wrote:
Hello,

Consider the following snippet:
    Table sourceTable = getKafkaSource0(tEnv);
    DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
      .map(a -> a)
      .returns(sourceTable.getSchema().toRowType());

    stream.print();
where sourceTable.printSchema() shows:
root
 |-- time1: TimeIndicatorTypeInfo(rowtime)


 This program returns the following exception:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
at app.metatron.test.Main2.main(Main2.java:231)
Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long
at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
...

The row serializer seems to try to deep-copy an instance of java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
Could anybody help me?

Best,

- Dongwon

p.s. though removing .returns() makes everything okay, I need to do that as I want to convert DataStream<Row> into another table later.
p.s. the source table is created as follows:

private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
    ConnectorDescriptor connectorDescriptor = new Kafka()
      .version("universal")
      .topic("mytopic")
      .property("bootstrap.servers", "localhost:9092")
      .property("group.id", "mygroup")
      .startFromEarliest();
    FormatDescriptor formatDescriptor = new Csv()
      .deriveSchema()
      .ignoreParseErrors()
      .fieldDelimiter(',');
    Schema schemaDescriptor = new Schema()     
      .field("time1", SQL_TIMESTAMP())
      .rowtime(
        new Rowtime()
          .timestampsFromField("rowTime")
          .watermarksPeriodicBounded(100)
      );
    tEnv.connect(connectorDescriptor)
      .withFormat(formatDescriptor)
      .withSchema(schemaDescriptor)
      .inAppendMode()
      .registerTableSource("mysrc");
    return tEnv.scan("mysrc");
  }
Reply | Threaded
Open this post in threaded view
|

Re: [Table API] ClassCastException when converting a table to DataStream<Row>

Rong Rong
Hi Dongwon,

I have to dig deeper into the code to reproduce this error. This seems to be a bug to me and will update once I find anything.

However from what you explained, if I understand correctly you can do all of your processing within the TableAPI scope without converting it back and forth to DataStream.
E.g. if your "map(a -> a)" placeholder represents some sort of map function that's simple enough, you can implement and connect with the table API via UserDefinedFunction[1].
As TableAPI becoming the first class citizen [2,3,4], this would be much cleaner implementation from my perspective. 

--
Rong



On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <[hidden email]> wrote:
Hi Rong,

Thank you for reply :-)

which Flink version are you using? 
I'm using Flink-1.8.0.

what is the "sourceTable.getSchema().toRowType()" return?
Row(time1: TimeIndicatorTypeInfo(rowtime))

what is the line ".map(a -> a)" do and can you remove it?
".map(a->a)" is just to illustrate a problem.
My actual code contains a process function (instead of .map() in the snippet) which appends a new field containing watermark to a row.
If there were ways to get watermark inside a scalar UDF, I wouldn't convert table to datastream and vice versa.

if I am understanding correctly, you are also using "time1" as the rowtime, is that want your intension is to use it later as well?
yup :-) 

As far as I know ".returns(sourceTable.getSchema().toRowType());" only adds a type information hint about the return type of this operator. It is used in cases where Flink cannot determine automatically[1]. 
The reason why I specify ".returns(sourceTable.getSchema().toRowType());" is to give a type information hint as you said.
That is needed later when I need to make another table like
   "Table anotherTable = tEnv.fromDataStream(stream);",
Without the type information hint, I've got an error 
   "An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo."
That's why I give a type information hint in that way.

Best,

Dongwon

On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <[hidden email]> wrote:
Hi Dongwon,

Can you provide a bit more information: 
which Flink version are you using? 
what is the "sourceTable.getSchema().toRowType()" return?
what is the line ".map(a -> a)" do and can you remove it?
if I am understanding correctly, you are also using "time1" as the rowtime, is that want your intension is to use it later as well?

As far as I know ".returns(sourceTable.getSchema().toRowType());" only adds a type information hint about the return type of this operator. It is used in cases where Flink cannot determine automatically[1]. 

Thanks,
Rong

--


On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <[hidden email]> wrote:
Hello,

Consider the following snippet:
    Table sourceTable = getKafkaSource0(tEnv);
    DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
      .map(a -> a)
      .returns(sourceTable.getSchema().toRowType());

    stream.print();
where sourceTable.printSchema() shows:
root
 |-- time1: TimeIndicatorTypeInfo(rowtime)


 This program returns the following exception:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
at app.metatron.test.Main2.main(Main2.java:231)
Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long
at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
...

The row serializer seems to try to deep-copy an instance of java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
Could anybody help me?

Best,

- Dongwon

p.s. though removing .returns() makes everything okay, I need to do that as I want to convert DataStream<Row> into another table later.
p.s. the source table is created as follows:

private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
    ConnectorDescriptor connectorDescriptor = new Kafka()
      .version("universal")
      .topic("mytopic")
      .property("bootstrap.servers", "localhost:9092")
      .property("group.id", "mygroup")
      .startFromEarliest();
    FormatDescriptor formatDescriptor = new Csv()
      .deriveSchema()
      .ignoreParseErrors()
      .fieldDelimiter(',');
    Schema schemaDescriptor = new Schema()     
      .field("time1", SQL_TIMESTAMP())
      .rowtime(
        new Rowtime()
          .timestampsFromField("rowTime")
          .watermarksPeriodicBounded(100)
      );
    tEnv.connect(connectorDescriptor)
      .withFormat(formatDescriptor)
      .withSchema(schemaDescriptor)
      .inAppendMode()
      .registerTableSource("mysrc");
    return tEnv.scan("mysrc");
  }
Reply | Threaded
Open this post in threaded view
|

Re: [Table API] ClassCastException when converting a table to DataStream<Row>

Dongwon Kim-2
Hi Rong,

I have to dig deeper into the code to reproduce this error. This seems to be a bug to me and will update once I find anything.
Thanks a lot for spending your time on this.

However from what you explained, if I understand correctly you can do all of your processing within the TableAPI scope without converting it back and forth to DataStream.
E.g. if your "map(a -> a)" placeholder represents some sort of map function that's simple enough, you can implement and connect with the table API via UserDefinedFunction[1].
As TableAPI becoming the first class citizen [2,3,4], this would be much cleaner implementation from my perspective. 
I also agree with you in that the first class citizen Table API will make everything not only easier but also a lot cleaner.
We however contain some corner cases that force us to covert Table from and to DataStream.
One such case is to append to Table a column showing the current watermark of each record; there's no other way but to do that as ScalarFunction doesn't allow us to get the runtime context information as ProcessFunction does.

I have a question regarding the conversion.
Do I have to worry about runtime performance penalty in case that I cannot help but convert back and fourth to DataStream?

Best,

Dongwon

On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <[hidden email]> wrote:
Hi Dongwon,

I have to dig deeper into the code to reproduce this error. This seems to be a bug to me and will update once I find anything.

However from what you explained, if I understand correctly you can do all of your processing within the TableAPI scope without converting it back and forth to DataStream.
E.g. if your "map(a -> a)" placeholder represents some sort of map function that's simple enough, you can implement and connect with the table API via UserDefinedFunction[1].
As TableAPI becoming the first class citizen [2,3,4], this would be much cleaner implementation from my perspective. 

--
Rong



On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <[hidden email]> wrote:
Hi Rong,

Thank you for reply :-)

which Flink version are you using? 
I'm using Flink-1.8.0.

what is the "sourceTable.getSchema().toRowType()" return?
Row(time1: TimeIndicatorTypeInfo(rowtime))

what is the line ".map(a -> a)" do and can you remove it?
".map(a->a)" is just to illustrate a problem.
My actual code contains a process function (instead of .map() in the snippet) which appends a new field containing watermark to a row.
If there were ways to get watermark inside a scalar UDF, I wouldn't convert table to datastream and vice versa.

if I am understanding correctly, you are also using "time1" as the rowtime, is that want your intension is to use it later as well?
yup :-) 

As far as I know ".returns(sourceTable.getSchema().toRowType());" only adds a type information hint about the return type of this operator. It is used in cases where Flink cannot determine automatically[1]. 
The reason why I specify ".returns(sourceTable.getSchema().toRowType());" is to give a type information hint as you said.
That is needed later when I need to make another table like
   "Table anotherTable = tEnv.fromDataStream(stream);",
Without the type information hint, I've got an error 
   "An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo."
That's why I give a type information hint in that way.

Best,

Dongwon

On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <[hidden email]> wrote:
Hi Dongwon,

Can you provide a bit more information: 
which Flink version are you using? 
what is the "sourceTable.getSchema().toRowType()" return?
what is the line ".map(a -> a)" do and can you remove it?
if I am understanding correctly, you are also using "time1" as the rowtime, is that want your intension is to use it later as well?

As far as I know ".returns(sourceTable.getSchema().toRowType());" only adds a type information hint about the return type of this operator. It is used in cases where Flink cannot determine automatically[1]. 

Thanks,
Rong

--


On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <[hidden email]> wrote:
Hello,

Consider the following snippet:
    Table sourceTable = getKafkaSource0(tEnv);
    DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
      .map(a -> a)
      .returns(sourceTable.getSchema().toRowType());

    stream.print();
where sourceTable.printSchema() shows:
root
 |-- time1: TimeIndicatorTypeInfo(rowtime)


 This program returns the following exception:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
at app.metatron.test.Main2.main(Main2.java:231)
Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long
at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
...

The row serializer seems to try to deep-copy an instance of java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
Could anybody help me?

Best,

- Dongwon

p.s. though removing .returns() makes everything okay, I need to do that as I want to convert DataStream<Row> into another table later.
p.s. the source table is created as follows:

private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
    ConnectorDescriptor connectorDescriptor = new Kafka()
      .version("universal")
      .topic("mytopic")
      .property("bootstrap.servers", "localhost:9092")
      .property("group.id", "mygroup")
      .startFromEarliest();
    FormatDescriptor formatDescriptor = new Csv()
      .deriveSchema()
      .ignoreParseErrors()
      .fieldDelimiter(',');
    Schema schemaDescriptor = new Schema()     
      .field("time1", SQL_TIMESTAMP())
      .rowtime(
        new Rowtime()
          .timestampsFromField("rowTime")
          .watermarksPeriodicBounded(100)
      );
    tEnv.connect(connectorDescriptor)
      .withFormat(formatDescriptor)
      .withSchema(schemaDescriptor)
      .inAppendMode()
      .registerTableSource("mysrc");
    return tEnv.scan("mysrc");
  }
Reply | Threaded
Open this post in threaded view
|

Re: [Table API] ClassCastException when converting a table to DataStream<Row>

Fabian Hueske-2
Hi Dongwon,

regarding the question about the conversion: If you keep using the Row type and not adding/removing fields, the conversion is pretty much for free right now.
It will be a MapFunction (sometimes even not function at all) that should be chained with the other operators. Hence, it should boil down to a function call.

Best, Fabian

Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <[hidden email]>:
Hi Rong,

I have to dig deeper into the code to reproduce this error. This seems to be a bug to me and will update once I find anything.
Thanks a lot for spending your time on this.

However from what you explained, if I understand correctly you can do all of your processing within the TableAPI scope without converting it back and forth to DataStream.
E.g. if your "map(a -> a)" placeholder represents some sort of map function that's simple enough, you can implement and connect with the table API via UserDefinedFunction[1].
As TableAPI becoming the first class citizen [2,3,4], this would be much cleaner implementation from my perspective. 
I also agree with you in that the first class citizen Table API will make everything not only easier but also a lot cleaner.
We however contain some corner cases that force us to covert Table from and to DataStream.
One such case is to append to Table a column showing the current watermark of each record; there's no other way but to do that as ScalarFunction doesn't allow us to get the runtime context information as ProcessFunction does.

I have a question regarding the conversion.
Do I have to worry about runtime performance penalty in case that I cannot help but convert back and fourth to DataStream?

Best,

Dongwon

On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <[hidden email]> wrote:
Hi Dongwon,

I have to dig deeper into the code to reproduce this error. This seems to be a bug to me and will update once I find anything.

However from what you explained, if I understand correctly you can do all of your processing within the TableAPI scope without converting it back and forth to DataStream.
E.g. if your "map(a -> a)" placeholder represents some sort of map function that's simple enough, you can implement and connect with the table API via UserDefinedFunction[1].
As TableAPI becoming the first class citizen [2,3,4], this would be much cleaner implementation from my perspective. 

--
Rong



On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <[hidden email]> wrote:
Hi Rong,

Thank you for reply :-)

which Flink version are you using? 
I'm using Flink-1.8.0.

what is the "sourceTable.getSchema().toRowType()" return?
Row(time1: TimeIndicatorTypeInfo(rowtime))

what is the line ".map(a -> a)" do and can you remove it?
".map(a->a)" is just to illustrate a problem.
My actual code contains a process function (instead of .map() in the snippet) which appends a new field containing watermark to a row.
If there were ways to get watermark inside a scalar UDF, I wouldn't convert table to datastream and vice versa.

if I am understanding correctly, you are also using "time1" as the rowtime, is that want your intension is to use it later as well?
yup :-) 

As far as I know ".returns(sourceTable.getSchema().toRowType());" only adds a type information hint about the return type of this operator. It is used in cases where Flink cannot determine automatically[1]. 
The reason why I specify ".returns(sourceTable.getSchema().toRowType());" is to give a type information hint as you said.
That is needed later when I need to make another table like
   "Table anotherTable = tEnv.fromDataStream(stream);",
Without the type information hint, I've got an error 
   "An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo."
That's why I give a type information hint in that way.

Best,

Dongwon

On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <[hidden email]> wrote:
Hi Dongwon,

Can you provide a bit more information: 
which Flink version are you using? 
what is the "sourceTable.getSchema().toRowType()" return?
what is the line ".map(a -> a)" do and can you remove it?
if I am understanding correctly, you are also using "time1" as the rowtime, is that want your intension is to use it later as well?

As far as I know ".returns(sourceTable.getSchema().toRowType());" only adds a type information hint about the return type of this operator. It is used in cases where Flink cannot determine automatically[1]. 

Thanks,
Rong

--


On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <[hidden email]> wrote:
Hello,

Consider the following snippet:
    Table sourceTable = getKafkaSource0(tEnv);
    DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
      .map(a -> a)
      .returns(sourceTable.getSchema().toRowType());

    stream.print();
where sourceTable.printSchema() shows:
root
 |-- time1: TimeIndicatorTypeInfo(rowtime)


 This program returns the following exception:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
at app.metatron.test.Main2.main(Main2.java:231)
Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long
at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
...

The row serializer seems to try to deep-copy an instance of java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
Could anybody help me?

Best,

- Dongwon

p.s. though removing .returns() makes everything okay, I need to do that as I want to convert DataStream<Row> into another table later.
p.s. the source table is created as follows:

private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
    ConnectorDescriptor connectorDescriptor = new Kafka()
      .version("universal")
      .topic("mytopic")
      .property("bootstrap.servers", "localhost:9092")
      .property("group.id", "mygroup")
      .startFromEarliest();
    FormatDescriptor formatDescriptor = new Csv()
      .deriveSchema()
      .ignoreParseErrors()
      .fieldDelimiter(',');
    Schema schemaDescriptor = new Schema()     
      .field("time1", SQL_TIMESTAMP())
      .rowtime(
        new Rowtime()
          .timestampsFromField("rowTime")
          .watermarksPeriodicBounded(100)
      );
    tEnv.connect(connectorDescriptor)
      .withFormat(formatDescriptor)
      .withSchema(schemaDescriptor)
      .inAppendMode()
      .registerTableSource("mysrc");
    return tEnv.scan("mysrc");
  }
Reply | Threaded
Open this post in threaded view
|

Re: [Table API] ClassCastException when converting a table to DataStream<Row>

Dongwon Kim-2
Hi Fabian,

Thanks for clarification :-)
I could convert back and forth without worrying about it as I keep using Row type during the conversion (even though fields are added).

Best,

Dongwon



On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske <[hidden email]> wrote:
Hi Dongwon,

regarding the question about the conversion: If you keep using the Row type and not adding/removing fields, the conversion is pretty much for free right now.
It will be a MapFunction (sometimes even not function at all) that should be chained with the other operators. Hence, it should boil down to a function call.

Best, Fabian

Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <[hidden email]>:
Hi Rong,

I have to dig deeper into the code to reproduce this error. This seems to be a bug to me and will update once I find anything.
Thanks a lot for spending your time on this.

However from what you explained, if I understand correctly you can do all of your processing within the TableAPI scope without converting it back and forth to DataStream.
E.g. if your "map(a -> a)" placeholder represents some sort of map function that's simple enough, you can implement and connect with the table API via UserDefinedFunction[1].
As TableAPI becoming the first class citizen [2,3,4], this would be much cleaner implementation from my perspective. 
I also agree with you in that the first class citizen Table API will make everything not only easier but also a lot cleaner.
We however contain some corner cases that force us to covert Table from and to DataStream.
One such case is to append to Table a column showing the current watermark of each record; there's no other way but to do that as ScalarFunction doesn't allow us to get the runtime context information as ProcessFunction does.

I have a question regarding the conversion.
Do I have to worry about runtime performance penalty in case that I cannot help but convert back and fourth to DataStream?

Best,

Dongwon

On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <[hidden email]> wrote:
Hi Dongwon,

I have to dig deeper into the code to reproduce this error. This seems to be a bug to me and will update once I find anything.

However from what you explained, if I understand correctly you can do all of your processing within the TableAPI scope without converting it back and forth to DataStream.
E.g. if your "map(a -> a)" placeholder represents some sort of map function that's simple enough, you can implement and connect with the table API via UserDefinedFunction[1].
As TableAPI becoming the first class citizen [2,3,4], this would be much cleaner implementation from my perspective. 

--
Rong



On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <[hidden email]> wrote:
Hi Rong,

Thank you for reply :-)

which Flink version are you using? 
I'm using Flink-1.8.0.

what is the "sourceTable.getSchema().toRowType()" return?
Row(time1: TimeIndicatorTypeInfo(rowtime))

what is the line ".map(a -> a)" do and can you remove it?
".map(a->a)" is just to illustrate a problem.
My actual code contains a process function (instead of .map() in the snippet) which appends a new field containing watermark to a row.
If there were ways to get watermark inside a scalar UDF, I wouldn't convert table to datastream and vice versa.

if I am understanding correctly, you are also using "time1" as the rowtime, is that want your intension is to use it later as well?
yup :-) 

As far as I know ".returns(sourceTable.getSchema().toRowType());" only adds a type information hint about the return type of this operator. It is used in cases where Flink cannot determine automatically[1]. 
The reason why I specify ".returns(sourceTable.getSchema().toRowType());" is to give a type information hint as you said.
That is needed later when I need to make another table like
   "Table anotherTable = tEnv.fromDataStream(stream);",
Without the type information hint, I've got an error 
   "An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo."
That's why I give a type information hint in that way.

Best,

Dongwon

On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <[hidden email]> wrote:
Hi Dongwon,

Can you provide a bit more information: 
which Flink version are you using? 
what is the "sourceTable.getSchema().toRowType()" return?
what is the line ".map(a -> a)" do and can you remove it?
if I am understanding correctly, you are also using "time1" as the rowtime, is that want your intension is to use it later as well?

As far as I know ".returns(sourceTable.getSchema().toRowType());" only adds a type information hint about the return type of this operator. It is used in cases where Flink cannot determine automatically[1]. 

Thanks,
Rong

--


On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <[hidden email]> wrote:
Hello,

Consider the following snippet:
    Table sourceTable = getKafkaSource0(tEnv);
    DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
      .map(a -> a)
      .returns(sourceTable.getSchema().toRowType());

    stream.print();
where sourceTable.printSchema() shows:
root
 |-- time1: TimeIndicatorTypeInfo(rowtime)


 This program returns the following exception:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
at app.metatron.test.Main2.main(Main2.java:231)
Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long
at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
...

The row serializer seems to try to deep-copy an instance of java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
Could anybody help me?

Best,

- Dongwon

p.s. though removing .returns() makes everything okay, I need to do that as I want to convert DataStream<Row> into another table later.
p.s. the source table is created as follows:

private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
    ConnectorDescriptor connectorDescriptor = new Kafka()
      .version("universal")
      .topic("mytopic")
      .property("bootstrap.servers", "localhost:9092")
      .property("group.id", "mygroup")
      .startFromEarliest();
    FormatDescriptor formatDescriptor = new Csv()
      .deriveSchema()
      .ignoreParseErrors()
      .fieldDelimiter(',');
    Schema schemaDescriptor = new Schema()     
      .field("time1", SQL_TIMESTAMP())
      .rowtime(
        new Rowtime()
          .timestampsFromField("rowTime")
          .watermarksPeriodicBounded(100)
      );
    tEnv.connect(connectorDescriptor)
      .withFormat(formatDescriptor)
      .withSchema(schemaDescriptor)
      .inAppendMode()
      .registerTableSource("mysrc");
    return tEnv.scan("mysrc");
  }
Reply | Threaded
Open this post in threaded view
|

Re: [Table API] ClassCastException when converting a table to DataStream<Row>

Rong Rong
Hi Dongwon, 

Sorry for the late reply. I did try some experiment and seems like you are right: 
Setting the `.return()` type actually alter the underlying type of the DataStream from a GenericType into a specific RowTypeInfo. Please see the JIRA ticket [1] for more info.

Regarding the approach, yes I think you cannot access the timer service from the table/SQL API at this moment so that might be the best approach. 
And as Fabian suggested, I don't think there's too much problem if you are not changing the type info underlying in your DataStream. I will follow up with this in the JIRA ticket.

--
Rong


On Tue, Jul 23, 2019 at 6:30 AM Dongwon Kim <[hidden email]> wrote:
Hi Fabian,

Thanks for clarification :-)
I could convert back and forth without worrying about it as I keep using Row type during the conversion (even though fields are added).

Best,

Dongwon



On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske <[hidden email]> wrote:
Hi Dongwon,

regarding the question about the conversion: If you keep using the Row type and not adding/removing fields, the conversion is pretty much for free right now.
It will be a MapFunction (sometimes even not function at all) that should be chained with the other operators. Hence, it should boil down to a function call.

Best, Fabian

Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <[hidden email]>:
Hi Rong,

I have to dig deeper into the code to reproduce this error. This seems to be a bug to me and will update once I find anything.
Thanks a lot for spending your time on this.

However from what you explained, if I understand correctly you can do all of your processing within the TableAPI scope without converting it back and forth to DataStream.
E.g. if your "map(a -> a)" placeholder represents some sort of map function that's simple enough, you can implement and connect with the table API via UserDefinedFunction[1].
As TableAPI becoming the first class citizen [2,3,4], this would be much cleaner implementation from my perspective. 
I also agree with you in that the first class citizen Table API will make everything not only easier but also a lot cleaner.
We however contain some corner cases that force us to covert Table from and to DataStream.
One such case is to append to Table a column showing the current watermark of each record; there's no other way but to do that as ScalarFunction doesn't allow us to get the runtime context information as ProcessFunction does.

I have a question regarding the conversion.
Do I have to worry about runtime performance penalty in case that I cannot help but convert back and fourth to DataStream?

Best,

Dongwon

On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <[hidden email]> wrote:
Hi Dongwon,

I have to dig deeper into the code to reproduce this error. This seems to be a bug to me and will update once I find anything.

However from what you explained, if I understand correctly you can do all of your processing within the TableAPI scope without converting it back and forth to DataStream.
E.g. if your "map(a -> a)" placeholder represents some sort of map function that's simple enough, you can implement and connect with the table API via UserDefinedFunction[1].
As TableAPI becoming the first class citizen [2,3,4], this would be much cleaner implementation from my perspective. 

--
Rong



On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <[hidden email]> wrote:
Hi Rong,

Thank you for reply :-)

which Flink version are you using? 
I'm using Flink-1.8.0.

what is the "sourceTable.getSchema().toRowType()" return?
Row(time1: TimeIndicatorTypeInfo(rowtime))

what is the line ".map(a -> a)" do and can you remove it?
".map(a->a)" is just to illustrate a problem.
My actual code contains a process function (instead of .map() in the snippet) which appends a new field containing watermark to a row.
If there were ways to get watermark inside a scalar UDF, I wouldn't convert table to datastream and vice versa.

if I am understanding correctly, you are also using "time1" as the rowtime, is that want your intension is to use it later as well?
yup :-) 

As far as I know ".returns(sourceTable.getSchema().toRowType());" only adds a type information hint about the return type of this operator. It is used in cases where Flink cannot determine automatically[1]. 
The reason why I specify ".returns(sourceTable.getSchema().toRowType());" is to give a type information hint as you said.
That is needed later when I need to make another table like
   "Table anotherTable = tEnv.fromDataStream(stream);",
Without the type information hint, I've got an error 
   "An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo."
That's why I give a type information hint in that way.

Best,

Dongwon

On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <[hidden email]> wrote:
Hi Dongwon,

Can you provide a bit more information: 
which Flink version are you using? 
what is the "sourceTable.getSchema().toRowType()" return?
what is the line ".map(a -> a)" do and can you remove it?
if I am understanding correctly, you are also using "time1" as the rowtime, is that want your intension is to use it later as well?

As far as I know ".returns(sourceTable.getSchema().toRowType());" only adds a type information hint about the return type of this operator. It is used in cases where Flink cannot determine automatically[1]. 

Thanks,
Rong

--


On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <[hidden email]> wrote:
Hello,

Consider the following snippet:
    Table sourceTable = getKafkaSource0(tEnv);
    DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
      .map(a -> a)
      .returns(sourceTable.getSchema().toRowType());

    stream.print();
where sourceTable.printSchema() shows:
root
 |-- time1: TimeIndicatorTypeInfo(rowtime)


 This program returns the following exception:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
at app.metatron.test.Main2.main(Main2.java:231)
Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long
at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
...

The row serializer seems to try to deep-copy an instance of java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
Could anybody help me?

Best,

- Dongwon

p.s. though removing .returns() makes everything okay, I need to do that as I want to convert DataStream<Row> into another table later.
p.s. the source table is created as follows:

private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
    ConnectorDescriptor connectorDescriptor = new Kafka()
      .version("universal")
      .topic("mytopic")
      .property("bootstrap.servers", "localhost:9092")
      .property("group.id", "mygroup")
      .startFromEarliest();
    FormatDescriptor formatDescriptor = new Csv()
      .deriveSchema()
      .ignoreParseErrors()
      .fieldDelimiter(',');
    Schema schemaDescriptor = new Schema()     
      .field("time1", SQL_TIMESTAMP())
      .rowtime(
        new Rowtime()
          .timestampsFromField("rowTime")
          .watermarksPeriodicBounded(100)
      );
    tEnv.connect(connectorDescriptor)
      .withFormat(formatDescriptor)
      .withSchema(schemaDescriptor)
      .inAppendMode()
      .registerTableSource("mysrc");
    return tEnv.scan("mysrc");
  }