Hi,
I have a source that generates events with timestamps. These flow nicely, until encountering a conversion from Table -> DataStream[Row]: def toRowRetractStream(implicit ev: TypeInformation[Row]): DataStream[Row] = table .toRetractStream[Row] .flatMap { (row, collector: Collector[Row]) => if (row._1) collector.collect(row._2) } The transformation causes a SinkConversion to be generated with the following code: @Override public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception { org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue(); Object[] fields$12 = new Object[2]; fields$12[0] = org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg(in1); fields$12[1] = (org.apache.flink.types.Row) converter$9.toExternal((org.apache.flink.table.data.RowData) in1); scala.Tuple2 result$10 = (scala.Tuple2) serializer$11.createInstance(fields$12); output.collect(outElement.replace(result$10)); } The code receives an element of type StreamRecord, which does have a timestamp attached to it, but fails to forward it to the new element (outElement) which is initialized as: private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); Am I missing anything in the Table -> DataStream[Row] conversion that should make the timestamp follow through? or is this a bug? Best Regards, Yuval Itzchakov. |
Hey Yuval, Could it be that you are hitting this bug[1], which has been fixed recently? Best, Dawid [1] https://issues.apache.org/jira/browse/FLINK-21013 On 15/02/2021 08:20, Yuval Itzchakov
wrote:
signature.asc (849 bytes) Download Attachment |
Hi Dawid, Yes, looks like it. Thanks! Is there an ETA on 1.12.2 yet? On Mon, Feb 15, 2021 at 9:48 AM Dawid Wysakowicz <[hidden email]> wrote:
Best Regards, Yuval Itzchakov. |
The best I can do is point you to the thread[1]. I am also cc'ing Yuan who is the release manager for 1.12.2. Best, Dawid On 15/02/2021 08:51, Yuval Itzchakov
wrote:
signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |