Doubt Regarding producing to kafka using flink

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Doubt Regarding producing to kafka using flink

Archit Mittal
Hi 

I am using flink-connector-kafka-0.10_2.10

while producing i am getting error as 

java.lang.IllegalArgumentException: Invalid timestamp -9223372036854775808
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:249) ~[flink-connector-kafka-0.10_2.10-1.2.0.jar:1.2.0]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:345) 

how do i put timestamp in my object before producing ?

Thanks
Archit
Reply | Threaded
Open this post in threaded view
|

Re: Doubt Regarding producing to kafka using flink

Tzu-Li (Gordon) Tai
Hi Archit!

You’ll need to assign timestamps to the records in your stream before producing them to Kafka (i.e. before the FlinkKafkaProducer operator).
Have a look at [1] and [2] on how to do that. Feel free to ask further questions if you bump into any!

Cheers,
Gordon


On April 2, 2017 at 6:38:13 PM, Archit Mittal ([hidden email]) wrote:

Hi 

I am using flink-connector-kafka-0.10_2.10

while producing i am getting error as 

java.lang.IllegalArgumentException: Invalid timestamp -9223372036854775808
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:249) ~[flink-connector-kafka-0.10_2.10-1.2.0.jar:1.2.0]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:345) 

how do i put timestamp in my object before producing ?

Thanks
Archit
Reply | Threaded
Open this post in threaded view
|

Re: Doubt Regarding producing to kafka using flink

Archit Mittal
Hi Gordon
This is the function snippet i am using but i am getting invalid timestamp        
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "word");
properties.setProperty("auto.offset.reset", "earliest");


DataStream < WordCount > stream = env.fromElements(wordCount);
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<WordCount>() {
@Override
public long extractAscendingTimestamp(WordCount element) {
return DateTime.now().getMillis();
}
});


FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, KAFKA_TOPIC, new WordCountSchema(), properties);
config.setWriteTimestampToKafka(true);

env.execute(
"job");

On Mon, Apr 3, 2017 at 8:20 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Archit!

You’ll need to assign timestamps to the records in your stream before producing them to Kafka (i.e. before the FlinkKafkaProducer operator).
Have a look at [1] and [2] on how to do that. Feel free to ask further questions if you bump into any!

Cheers,
Gordon


On April 2, 2017 at 6:38:13 PM, Archit Mittal ([hidden email]) wrote:

Hi 

I am using flink-connector-kafka-0.10_2.10

while producing i am getting error as 

java.lang.IllegalArgumentException: Invalid timestamp -9223372036854775808
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:249) ~[flink-connector-kafka-0.10_2.10-1.2.0.jar:1.2.0]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:345) 

how do i put timestamp in my object before producing ?

Thanks
Archit

Reply | Threaded
Open this post in threaded view
|

Re: Doubt Regarding producing to kafka using flink

Tzu-Li (Gordon) Tai
Hi Archit,

The problem is that you need to assign the returned `DataStream` from `stream.assignTimestampsAndWatermarks` to a separate variable, and use that when instantiating the Kafka 0.10 sink.
The `assignTimestampsAndWatermarks` method returns a new `DataStream` instance with records that have assigned timestamps. Calling it does not affect the original `DataStream` instance.

Cheers,
Gordon

On April 3, 2017 at 5:15:03 PM, Archit Mittal ([hidden email]) wrote:

Hi Gordon
This is the function snippet i am using but i am getting invalid timestamp        
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "word");
properties.setProperty("auto.offset.reset", "earliest");


DataStream < WordCount > stream = env.fromElements(wordCount);
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<WordCount>() {
@Override
public long extractAscendingTimestamp(WordCount element) {
return DateTime.now().getMillis();
}
});


FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, KAFKA_TOPIC, new WordCountSchema(), properties);
config.setWriteTimestampToKafka(true);

env.execute(
"job");

On Mon, Apr 3, 2017 at 8:20 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Archit!

You’ll need to assign timestamps to the records in your stream before producing them to Kafka (i.e. before the FlinkKafkaProducer operator).
Have a look at [1] and [2] on how to do that. Feel free to ask further questions if you bump into any!

Cheers,
Gordon


On April 2, 2017 at 6:38:13 PM, Archit Mittal ([hidden email]) wrote:

Hi 

I am using flink-connector-kafka-0.10_2.10

while producing i am getting error as 

java.lang.IllegalArgumentException: Invalid timestamp -9223372036854775808
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:249) ~[flink-connector-kafka-0.10_2.10-1.2.0.jar:1.2.0]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:345) 

how do i put timestamp in my object before producing ?

Thanks
Archit