Hello, Consider the following snippet: Table sourceTable = getKafkaSource0(tEnv); where sourceTable.printSchema() shows: root This program returns the following exception: Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. The row serializer seems to try to deep-copy an instance of java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer. Could anybody help me? Best, - Dongwon p.s. though removing .returns() makes everything okay, I need to do that as I want to convert DataStream<Row> into another table later. p.s. the source table is created as follows: private static final Table getKafkaSource0(StreamTableEnvironment tEnv) { |
Hi Dongwon, Can you provide a bit more information: which Flink version are you using? what is the "sourceTable.getSchema().toRowType()" return? what is the line ".map(a -> a)" do and can you remove it? if I am understanding correctly, you are also using "time1" as the rowtime, is that want your intension is to use it later as well?As far as I know ".returns(sourceTable.getSchema().toRowType());" only adds a type information hint about the return type of this operator. It is used in cases where Flink cannot determine automatically[1]. Thanks, Rong -- On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <[hidden email]> wrote:
|
Hi Rong, Thank you for reply :-) which Flink version are you using? I'm using Flink-1.8.0. what is the "sourceTable.getSchema().toRowType()" return? Row(time1: TimeIndicatorTypeInfo(rowtime)) what is the line ".map(a -> a)" do and can you remove it? ".map(a->a)" is just to illustrate a problem. My actual code contains a process function (instead of .map() in the snippet) which appends a new field containing watermark to a row. If there were ways to get watermark inside a scalar UDF, I wouldn't convert table to datastream and vice versa. if I am understanding correctly, you are also using "time1" as the rowtime, is that want your intension is to use it later as well? yup :-) As far as I know ".returns(sourceTable.getSchema().toRowType());" only adds a type information hint about the return type of this operator. It is used in cases where Flink cannot determine automatically[1]. The reason why I specify ".returns(sourceTable.getSchema().toRowType());" is to give a type information hint as you said. That is needed later when I need to make another table like "Table anotherTable = tEnv.fromDataStream(stream);", Without the type information hint, I've got an error "An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo." That's why I give a type information hint in that way. Best, Dongwon On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <[hidden email]> wrote:
|
Hi Dongwon, I have to dig deeper into the code to reproduce this error. This seems to be a bug to me and will update once I find anything. However from what you explained, if I understand correctly you can do all of your processing within the TableAPI scope without converting it back and forth to DataStream. E.g. if your "map(a -> a)" placeholder represents some sort of map function that's simple enough, you can implement and connect with the table API via UserDefinedFunction[1]. As TableAPI becoming the first class citizen [2,3,4], this would be much cleaner implementation from my perspective. -- Rong On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <[hidden email]> wrote:
|
Hi Rong, I have to dig deeper into the code to reproduce this error. This seems to be a bug to me and will update once I find anything. Thanks a lot for spending your time on this. However from what you explained, if I understand correctly you can do all of your processing within the TableAPI scope without converting it back and forth to DataStream. I also agree with you in that the first class citizen Table API will make everything not only easier but also a lot cleaner. We however contain some corner cases that force us to covert Table from and to DataStream. One such case is to append to Table a column showing the current watermark of each record; there's no other way but to do that as ScalarFunction doesn't allow us to get the runtime context information as ProcessFunction does. I have a question regarding the conversion. Do I have to worry about runtime performance penalty in case that I cannot help but convert back and fourth to DataStream? Best, Dongwon On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <[hidden email]> wrote:
|
Hi Dongwon, regarding the question about the conversion: If you keep using the Row type and not adding/removing fields, the conversion is pretty much for free right now. It will be a MapFunction (sometimes even not function at all) that should be chained with the other operators. Hence, it should boil down to a function call. Best, Fabian Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <[hidden email]>:
|
Hi Fabian, Thanks for clarification :-) I could convert back and forth without worrying about it as I keep using Row type during the conversion (even though fields are added). Best, Dongwon On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske <[hidden email]> wrote:
|
Hi Dongwon, Sorry for the late reply. I did try some experiment and seems like you are right: Setting the `.return()` type actually alter the underlying type of the DataStream from a GenericType into a specific RowTypeInfo. Please see the JIRA ticket [1] for more info. Regarding the approach, yes I think you cannot access the timer service from the table/SQL API at this moment so that might be the best approach. And as Fabian suggested, I don't think there's too much problem if you are not changing the type info underlying in your DataStream. I will follow up with this in the JIRA ticket. -- Rong On Tue, Jul 23, 2019 at 6:30 AM Dongwon Kim <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |