Setting writeTimestampToKafka from Kafka table descriptor

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Setting writeTimestampToKafka from Kafka table descriptor

Steve Whelan
Examining the org.apache.flink.table.descriptors.Kafka class in Flink v1.9, it seems to not have the ability to set whether the Kafka producer should attach a timestamp to the message. The FlinkKafkaProducer class has a setter for controlling this producer attribute. 

Can/should this attribute be added to the table descriptor? I assume it was left out because the table descriptor wants to maintain backwards compatibility with older versions of Kafka where this attribute is not supported.
Reply | Threaded
Open this post in threaded view
|

Re: Setting writeTimestampToKafka from Kafka table descriptor

Jingsong Li
Hi Steve,

There are some discussion in [1], this has been considered, but it is not supported in the current version.

From Fabian's word:

> I think timestamp fields of source-sink tables should be handled as follows when emitting the table:

  • proc-time: ignore
  • from-field: simply write out the timestamp as part of the row.
  • from-source: write the timestamp separately to the system and remove it from the row. This only works if we can set the timestamp to the sink system. If the system sets the ingestion timestamp by it own, i.e., not the actual value, rows would contain different timestamps when they are ingested. If the sink system does not support to set a timestamp, we cannot allow such a table definition.

FYI.


Best,
Jingsong Lee

On Mon, Dec 30, 2019 at 9:09 AM Steve Whelan <[hidden email]> wrote:
Examining the org.apache.flink.table.descriptors.Kafka class in Flink v1.9, it seems to not have the ability to set whether the Kafka producer should attach a timestamp to the message. The FlinkKafkaProducer class has a setter for controlling this producer attribute. 

Can/should this attribute be added to the table descriptor? I assume it was left out because the table descriptor wants to maintain backwards compatibility with older versions of Kafka where this attribute is not supported.


--
Best, Jingsong Lee