Not able to Assign Watermark in Flink 1.11

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Not able to Assign Watermark in Flink 1.11

anuj.aj07

I am getting this error when trying to assign watermark in Flink  1.11

"Cannot resolve method 'withTimestampAssigner(anonymous org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<org.apache.avro.generic.GenericRecord>)'"

FlinkKafkaConsumer<GenericRecord> bookingFlowConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

bookingFlowConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(15))
.withTimestampAssigner(new SerializableTimestampAssigner<GenericRecord>() {
@Override
public long extractTimestamp(GenericRecord genericRecord, long l) {
return (long)genericRecord.get("event_ts");
}
}));

What is wrong with this. 

In Flink 1.9 I was using this function and it was working fine 

public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord> {

@Override
public long extractTimestamp(GenericRecord record, long previousElementTimestamp) {
long timestamp = (long) record.get("event_ts");
// LOGGER.info("timestamp----", timestamp);
return timestamp;
}

@Override
public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) {
// simply emit a watermark with every event
// LOGGER.info("extractedTimestamp ", extractedTimestamp);
return new Watermark(extractedTimestamp);
}
}

--
Thanks & Regards,
Anuj Jain



Reply | Threaded
Open this post in threaded view
|

Re: Not able to Assign Watermark in Flink 1.11

r_khachatryan
Hi Anuj Jain,

You need to provide the type parameter when calling WatermarkStrategy.forBoundedOutOfOrderness like this:

bookingFlowConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.<GenericRecord>forBoundedOutOfOrderness(Duration.ofMinutes(15))
Regards,
Roman


On Fri, Aug 28, 2020 at 6:49 AM aj <[hidden email]> wrote:

I am getting this error when trying to assign watermark in Flink  1.11

"Cannot resolve method 'withTimestampAssigner(anonymous org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<org.apache.avro.generic.GenericRecord>)'"

FlinkKafkaConsumer<GenericRecord> bookingFlowConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

bookingFlowConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(15))
.withTimestampAssigner(new SerializableTimestampAssigner<GenericRecord>() {
@Override
public long extractTimestamp(GenericRecord genericRecord, long l) {
return (long)genericRecord.get("event_ts");
}
}));

What is wrong with this. 

In Flink 1.9 I was using this function and it was working fine 

public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord> {

@Override
public long extractTimestamp(GenericRecord record, long previousElementTimestamp) {
long timestamp = (long) record.get("event_ts");
// LOGGER.info("timestamp----", timestamp);
return timestamp;
}

@Override
public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) {
// simply emit a watermark with every event
// LOGGER.info("extractedTimestamp ", extractedTimestamp);
return new Watermark(extractedTimestamp);
}
}

--
Thanks & Regards,
Anuj Jain