Hi ,
I have written a program which reads data from Kafka, parses the json and does some reduce operation. The problem I am facing is, the program executes perfectly for the first time on a day. But when I kill the program and execute it again, an empty file is created. Even after compiling again and running, an empty file is created. var kafkaConsumer = new FlinkKafkaConsumer08( params.getRequired("input-topic"), new SimpleStringSchema, params.getProperties) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
var messageStream = env.addSource(kafkaConsumer).filter(t=>t.contains(pattern)) var mts = messageStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] { var ts = Long.MinValue
override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = { var timestamp = json_decode(element).toLong ts = Math.max(timestamp,previousElementTimestamp) timestamp } override def getCurrentWatermark(): Watermark = { new Watermark(ts) } }) var output = mts .keyBy(t=>json_decode(t)) .window(EventTimeSessionWindows.withGap(Time.seconds(60))) .allowedLateness(Time.seconds(5)) .reduce((v1,v2)=>v1+"----"+v2) output.writeAsText(path).setParallelism(1) I am using FileSystem as statebackend. I am assuming this problem is related to memory cleaning, but I don't understand what's happening. Any help? Rahul Raj |
Hi,
I think the problem is that your Kafka consumer has the same group-id across those two runs. This means that it will pick up the last "read position" of the previous run, and thus not read anything. If you change the group-id for the second run you should be able to read your data again. Best, Aljoscha
|
Changing the group id didn't work for me, instead using setStartfromEarliest() on kafka consumer worked for me. But it created one confusion, that is in case of failure if I start from a particular checkpoint or savepoint will the application start reading the message from a particular offset where checkpoint/savepoint was created or it will start reading from the first record in Kafka partition? Rahul Raj On 11 October 2017 at 15:44, Aljoscha Krettek <[hidden email]> wrote:
|
Hi,
When you are restoring from a savepoint (or checkpoint) the offsets in Kafka are complete ignored. Flink is checkpointing the offset at the time the checkpoint/savepoint is taken and that will be used as the read offset when restoring. Best, Aljoscha
|
Free forum by Nabble | Edit this page |