Hi, I'm creating kafka producer with timestamps enabled following instructions at https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producer
but getting an exception: java.lang.IllegalArgumentException: Invalid timestamp: -1. Timestamp should always be non-negative or null. at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:70) at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:93) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:642) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) Is there anything else I need to configure to embed timestamp information into resulting kafka message?Thank you,Alex |
Hi, I think the problem is the SerializationSchema parameter. Best, QIngxiang Ma. 2018-04-26 20:59 GMT+08:00 Alexander Smirnov <[hidden email]>:
|
Free forum by Nabble | Edit this page |