Generated SinkConversion code ignores incoming StreamRecord timestamp

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Generated SinkConversion code ignores incoming StreamRecord timestamp

Yuval Itzchakov
Hi,

I have a source that generates events with timestamps. These flow nicely, until encountering a conversion from Table -> DataStream[Row]:

    def toRowRetractStream(implicit ev: TypeInformation[Row]): DataStream[Row] =
      table
        .toRetractStream[Row]
        .flatMap { (row, collector: Collector[Row]) =>
          if (row._1)
            collector.collect(row._2)
        }

The transformation causes a SinkConversion to be generated with the following code:

        @Override
        public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
          org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue();
           
          Object[] fields$12 = new Object[2];
          fields$12[0] = org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg(in1);
          fields$12[1] = (org.apache.flink.types.Row) converter$9.toExternal((org.apache.flink.table.data.RowData) in1);
          scala.Tuple2 result$10 = (scala.Tuple2) serializer$11.createInstance(fields$12);
          output.collect(outElement.replace(result$10));         
        }

The code receives an element of type StreamRecord, which does have a timestamp attached to it, but fails to forward it to the new element (outElement) which is initialized as:

        private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);

Am I missing anything in the Table -> DataStream[Row] conversion that should make the timestamp follow through? or is this a bug?

--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: Generated SinkConversion code ignores incoming StreamRecord timestamp

Dawid Wysakowicz-2

Hey Yuval,

Could it be that you are hitting this bug[1], which has been fixed recently?

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-21013

On 15/02/2021 08:20, Yuval Itzchakov wrote:
Hi,

I have a source that generates events with timestamps. These flow nicely, until encountering a conversion from Table -> DataStream[Row]:

    def toRowRetractStream(implicit ev: TypeInformation[Row]): DataStream[Row] =
      table
        .toRetractStream[Row]
        .flatMap { (row, collector: Collector[Row]) =>
          if (row._1)
            collector.collect(row._2)
        }

The transformation causes a SinkConversion to be generated with the following code:

        @Override
        public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
          org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue();
           
          Object[] fields$12 = new Object[2];
          fields$12[0] = org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg(in1);
          fields$12[1] = (org.apache.flink.types.Row) converter$9.toExternal((org.apache.flink.table.data.RowData) in1);
          scala.Tuple2 result$10 = (scala.Tuple2) serializer$11.createInstance(fields$12);
          output.collect(outElement.replace(result$10));         
        }

The code receives an element of type StreamRecord, which does have a timestamp attached to it, but fails to forward it to the new element (outElement) which is initialized as:

        private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);

Am I missing anything in the Table -> DataStream[Row] conversion that should make the timestamp follow through? or is this a bug?

--
Best Regards,
Yuval Itzchakov.

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Generated SinkConversion code ignores incoming StreamRecord timestamp

Yuval Itzchakov
Hi Dawid,
Yes, looks like it. Thanks!

Is there an ETA on 1.12.2 yet?

On Mon, Feb 15, 2021 at 9:48 AM Dawid Wysakowicz <[hidden email]> wrote:

Hey Yuval,

Could it be that you are hitting this bug[1], which has been fixed recently?

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-21013

On 15/02/2021 08:20, Yuval Itzchakov wrote:
Hi,

I have a source that generates events with timestamps. These flow nicely, until encountering a conversion from Table -> DataStream[Row]:

    def toRowRetractStream(implicit ev: TypeInformation[Row]): DataStream[Row] =
      table
        .toRetractStream[Row]
        .flatMap { (row, collector: Collector[Row]) =>
          if (row._1)
            collector.collect(row._2)
        }

The transformation causes a SinkConversion to be generated with the following code:

        @Override
        public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
          org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue();
           
          Object[] fields$12 = new Object[2];
          fields$12[0] = org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg(in1);
          fields$12[1] = (org.apache.flink.types.Row) converter$9.toExternal((org.apache.flink.table.data.RowData) in1);
          scala.Tuple2 result$10 = (scala.Tuple2) serializer$11.createInstance(fields$12);
          output.collect(outElement.replace(result$10));         
        }

The code receives an element of type StreamRecord, which does have a timestamp attached to it, but fails to forward it to the new element (outElement) which is initialized as:

        private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);

Am I missing anything in the Table -> DataStream[Row] conversion that should make the timestamp follow through? or is this a bug?

--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: Generated SinkConversion code ignores incoming StreamRecord timestamp

Dawid Wysakowicz-2

The best I can do is point you to the thread[1].

I am also cc'ing Yuan who is the release manager for 1.12.2.

Best,

Dawid

[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Apache-Flink-1-12-2-td48603.html

On 15/02/2021 08:51, Yuval Itzchakov wrote:
Hi Dawid,
Yes, looks like it. Thanks!

Is there an ETA on 1.12.2 yet?

On Mon, Feb 15, 2021 at 9:48 AM Dawid Wysakowicz <[hidden email]> wrote:

Hey Yuval,

Could it be that you are hitting this bug[1], which has been fixed recently?

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-21013

On 15/02/2021 08:20, Yuval Itzchakov wrote:
Hi,

I have a source that generates events with timestamps. These flow nicely, until encountering a conversion from Table -> DataStream[Row]:

    def toRowRetractStream(implicit ev: TypeInformation[Row]): DataStream[Row] =
      table
        .toRetractStream[Row]
        .flatMap { (row, collector: Collector[Row]) =>
          if (row._1)
            collector.collect(row._2)
        }

The transformation causes a SinkConversion to be generated with the following code:

        @Override
        public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
          org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue();
           
          Object[] fields$12 = new Object[2];
          fields$12[0] = org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg(in1);
          fields$12[1] = (org.apache.flink.types.Row) converter$9.toExternal((org.apache.flink.table.data.RowData) in1);
          scala.Tuple2 result$10 = (scala.Tuple2) serializer$11.createInstance(fields$12);
          output.collect(outElement.replace(result$10));         
        }

The code receives an element of type StreamRecord, which does have a timestamp attached to it, but fails to forward it to the new element (outElement) which is initialized as:

        private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);

Am I missing anything in the Table -> DataStream[Row] conversion that should make the timestamp follow through? or is this a bug?

--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.

signature.asc (849 bytes) Download Attachment