Hi,I'm creating kafka producer with timestamps enabled following instructions at https://ci.apache.org/projects/flink/flink-docs- release-1.4/dev/connectors/ kafka.html#kafka-producer Optional<
FlinkKafkaPartitioner<T>> customPartitioner = Optional.empty(); FlinkKafkaProducer011<T> result = new FlinkKafkaProducer011<>(defaul
tTopic , serializationSchema, properties, customPartitioner);result.
setWriteTimestampToKafka( true); but getting an exception:java.lang.IllegalArgumentException: Invalid timestamp: -1. Timestamp should always be non-negative or null. at org.apache.kafka.clients. producer.ProducerRecord.<init> (ProducerRecord.java:70) at org.apache.kafka.clients. producer.ProducerRecord.<init> (ProducerRecord.java:93) at org.apache.flink.streaming. connectors.kafka. FlinkKafkaProducer011.invoke( FlinkKafkaProducer011.java: 642) at org.apache.flink.streaming. connectors.kafka. FlinkKafkaProducer011.invoke( FlinkKafkaProducer011.java:93) at org.apache.flink.streaming. api.functions.sink. TwoPhaseCommitSinkFunction. invoke( TwoPhaseCommitSinkFunction. java:219) at org.apache.flink.streaming. api.operators.StreamSink. processElement(StreamSink. java:56) at org.apache.flink.streaming. runtime.tasks.OperatorChain$ CopyingChainingOutput. pushToOperator(OperatorChain. java:549) at org.apache.flink.streaming. runtime.tasks.OperatorChain$ CopyingChainingOutput.collect( OperatorChain.java:524) at org.apache.flink.streaming. runtime.tasks.OperatorChain$ CopyingChainingOutput.collect( OperatorChain.java:504) at org.apache.flink.streaming. api.operators. AbstractStreamOperator$ CountingOutput.collect( AbstractStreamOperator.java: 830) at org.apache.flink.streaming. api.operators. AbstractStreamOperator$ CountingOutput.collect( AbstractStreamOperator.java: 808) at org.apache.flink.streaming. api.operators. TimestampedCollector.collect( TimestampedCollector.java:51) Is there anything else I need to configure to embed timestamp information into resulting kafka message?Thank you,Alex
Free forum by Nabble | Edit this page |