Hi ,
I have written a program which reads data from Kafka, parses the json and does some reduce operation. The problem I am facing is, the program executes perfectly for the first time on a day. But when I kill the program and execute it again, an empty file is created. Even after compiling again and running, an empty file is created. var kafkaConsumer = new FlinkKafkaConsumer08( params.getRequired("input-topic"), new SimpleStringSchema, params.getProperties) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
var messageStream = env.addSource(kafkaConsumer).filter(t=>t.contains(pattern)) var mts = messageStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] { var ts = Long.MinValue
override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = { var timestamp = json_decode(element).toLong ts = Math.max(timestamp,previousElementTimestamp) timestamp } override def getCurrentWatermark(): Watermark = { new Watermark(ts) } }) var output = mts .keyBy(t=>json_decode(t)) .window(EventTimeSessionWindows.withGap(Time.seconds(60))) .allowedLateness(Time.seconds(5)) .reduce((v1,v2)=>v1+"----"+v2) output.writeAsText(path).setParallelism(1) I am using FileSystem as statebackend. I am assuming this problem is related to memory cleaning, but I don't understand what's happening. Any help? Rahul Raj |
I think the problem is that your Kafka consumer has the same group-id across those two runs. This means that it will pick up the last "read position" of the previous run, and thus not read anything. If you change the group-id for the second run you should be able to read your data again. Best, Aljoscha
Changing the group id didn't work for me, instead using setStartfromEarliest() on kafka consumer worked for me. But it created one confusion, that is in case of failure if I start from a particular checkpoint or savepoint will the application start reading the message from a particular offset where checkpoint/savepoint was created or it will start reading from the first record in Kafka partition? Rahul Raj On 11 October 2017 at 15:44, Aljoscha Krettek <[hidden email]> wrote:
When you are restoring from a savepoint (or checkpoint) the offsets in Kafka are complete ignored. Flink is checkpointing the offset at the time the checkpoint/savepoint is taken and that will be used as the read offset when restoring. Best, Aljoscha
Free forum by Nabble | Edit this page |