Can't send kafka message with timestamp

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Can't send kafka message with timestamp

Alexander Smirnov
Hi,

I'm creating kafka producer with timestamps enabled following instructions at https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producer

        Optional<FlinkKafkaPartitioner<T>> customPartitioner = Optional.empty();

        FlinkKafkaProducer011<T> result = new FlinkKafkaProducer011<>(defaultTopic, serializationSchema, properties, customPartitioner);

result.setWriteTimestampToKafka(true);



but getting an exception:

java.lang.IllegalArgumentException: Invalid timestamp: -1. Timestamp should always be non-negative or null.
	at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:70)
	at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:93)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:642)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)



Is there anything else I need to configure to embed timestamp information into resulting kafka message?

Thank you,
Alex
Reply | Threaded
Open this post in threaded view
|

Re: Can't send kafka message with timestamp

Marvin777
Hi,

I think the problem is the  SerializationSchema parameter.

Best, 
QIngxiang Ma.

2018-04-26 20:59 GMT+08:00 Alexander Smirnov <[hidden email]>:
Hi,

I'm creating kafka producer with timestamps enabled following instructions at https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producer

        Optional<FlinkKafkaPartitioner<T>> customPartitioner = Optional.empty();

        FlinkKafkaProducer011<T> result = new FlinkKafkaProducer011<>(defaultTopic, serializationSchema, properties, customPartitioner);

result.setWriteTimestampToKafka(true);



but getting an exception:

java.lang.IllegalArgumentException: Invalid timestamp: -1. Timestamp should always be non-negative or null.
	at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:70)
	at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:93)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:642)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)



Is there anything else I need to configure to embed timestamp information into resulting kafka message?

Thank you,
Alex