Windows getting created only on first execution

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Windows getting created only on first execution

Rahul Raj
Hi ,

I have written a program which reads data from Kafka, parses the json and does some reduce operation. The problem I am facing is, the program executes perfectly for the first time on a day. But when I kill the program and execute it again, an empty file is created. Even after compiling again and running, an empty file is created.

var kafkaConsumer = new FlinkKafkaConsumer08(

      params.getRequired("input-topic"),

      new SimpleStringSchema,

      params.getProperties)

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    var messageStream = env.addSource(kafkaConsumer).filter(t=>t.contains(pattern))


    var mts = messageStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] {

      var ts = Long.MinValue

      override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {

        var timestamp = json_decode(element).toLong

        ts = Math.max(timestamp,previousElementTimestamp)

        timestamp

      }


      override def getCurrentWatermark(): Watermark = {

        new Watermark(ts)

      }

    })

    var output = mts

      .keyBy(t=>json_decode(t))

      .window(EventTimeSessionWindows.withGap(Time.seconds(60)))

      .allowedLateness(Time.seconds(5))

      .reduce((v1,v2)=>v1+"----"+v2)


output.writeAsText(path).setParallelism(1)


I am using FileSystem as statebackend. I am assuming this problem is related to memory cleaning, but I don't understand what's happening.

Any help?


Rahul Raj


Reply | Threaded
Open this post in threaded view
|

Re: Windows getting created only on first execution

Aljoscha Krettek
Hi,

I think the problem is that your Kafka consumer has the same group-id across those two runs. This means that it will pick up the last "read position" of the previous run, and thus not read anything. If you change the group-id for the second run you should be able to read your data again.

Best,
Aljoscha

On 11. Oct 2017, at 06:19, Rahul Raj <[hidden email]> wrote:

Hi ,

I have written a program which reads data from Kafka, parses the json and does some reduce operation. The problem I am facing is, the program executes perfectly for the first time on a day. But when I kill the program and execute it again, an empty file is created. Even after compiling again and running, an empty file is created.

var kafkaConsumer = new FlinkKafkaConsumer08(

      params.getRequired("input-topic"),

      new SimpleStringSchema,

      params.getProperties)

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


    var messageStream = env.addSource(kafkaConsumer).filter(t=>t.contains(pattern))


    var mts = messageStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] {

      var ts = Long.MinValue



      override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {

        var timestamp = json_decode(element).toLong

        ts = Math.max(timestamp,previousElementTimestamp)

        timestamp

      }


      override def getCurrentWatermark(): Watermark = {

        new Watermark(ts)

      }

    })

    var output = mts

      .keyBy(t=>json_decode(t))

      .window(EventTimeSessionWindows.withGap(Time.seconds(60)))

      .allowedLateness(Time.seconds(5))

      .reduce((v1,v2)=>v1+"----"+v2)


output.writeAsText(path).setParallelism(1)


I am using FileSystem as statebackend. I am assuming this problem is related to memory cleaning, but I don't understand what's happening.

Any help?


Rahul Raj



Reply | Threaded
Open this post in threaded view
|

Re: Windows getting created only on first execution

Rahul Raj
Changing the group id didn't work for me, instead using setStartfromEarliest() on kafka consumer worked for me. But it created one confusion, that is in case of failure if I start from a particular checkpoint or savepoint will the application start reading the message from a particular offset where checkpoint/savepoint was created or it will start reading from the first record in Kafka partition?

Rahul Raj 

On 11 October 2017 at 15:44, Aljoscha Krettek <[hidden email]> wrote:
Hi,

I think the problem is that your Kafka consumer has the same group-id across those two runs. This means that it will pick up the last "read position" of the previous run, and thus not read anything. If you change the group-id for the second run you should be able to read your data again.

Best,
Aljoscha


On 11. Oct 2017, at 06:19, Rahul Raj <[hidden email]> wrote:

Hi ,

I have written a program which reads data from Kafka, parses the json and does some reduce operation. The problem I am facing is, the program executes perfectly for the first time on a day. But when I kill the program and execute it again, an empty file is created. Even after compiling again and running, an empty file is created.

var kafkaConsumer = new FlinkKafkaConsumer08(

      params.getRequired("input-topic"),

      new SimpleStringSchema,

      params.getProperties)

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


    var messageStream = env.addSource(kafkaConsumer).filter(t=>t.contains(pattern))


    var mts = messageStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] {

      var ts = Long.MinValue



      override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {

        var timestamp = json_decode(element).toLong

        ts = Math.max(timestamp,previousElementTimestamp)

        timestamp

      }


      override def getCurrentWatermark(): Watermark = {

        new Watermark(ts)

      }

    })

    var output = mts

      .keyBy(t=>json_decode(t))

      .window(EventTimeSessionWindows.withGap(Time.seconds(60)))

      .allowedLateness(Time.seconds(5))

      .reduce((v1,v2)=>v1+"----"+v2)


output.writeAsText(path).setParallelism(1)


I am using FileSystem as statebackend. I am assuming this problem is related to memory cleaning, but I don't understand what's happening.

Any help?


Rahul Raj




Reply | Threaded
Open this post in threaded view
|

Re: Windows getting created only on first execution

Aljoscha Krettek
Hi,

When you are restoring from a savepoint (or checkpoint) the offsets in Kafka are complete ignored. Flink is checkpointing the offset at the time the checkpoint/savepoint is taken and that will be used as the read offset when restoring.

Best,
Aljoscha

On 11. Oct 2017, at 12:58, Rahul Raj <[hidden email]> wrote:

Changing the group id didn't work for me, instead using setStartfromEarliest() on kafka consumer worked for me. But it created one confusion, that is in case of failure if I start from a particular checkpoint or savepoint will the application start reading the message from a particular offset where checkpoint/savepoint was created or it will start reading from the first record in Kafka partition?

Rahul Raj 

On 11 October 2017 at 15:44, Aljoscha Krettek <[hidden email]> wrote:
Hi,

I think the problem is that your Kafka consumer has the same group-id across those two runs. This means that it will pick up the last "read position" of the previous run, and thus not read anything. If you change the group-id for the second run you should be able to read your data again.

Best,
Aljoscha


On 11. Oct 2017, at 06:19, Rahul Raj <[hidden email]> wrote:

Hi ,

I have written a program which reads data from Kafka, parses the json and does some reduce operation. The problem I am facing is, the program executes perfectly for the first time on a day. But when I kill the program and execute it again, an empty file is created. Even after compiling again and running, an empty file is created.

var kafkaConsumer = new FlinkKafkaConsumer08(

      params.getRequired("input-topic"),

      new SimpleStringSchema,

      params.getProperties)

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


    var messageStream = env.addSource(kafkaConsumer).filter(t=>t.contains(pattern))


    var mts = messageStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] {

      var ts = Long.MinValue



      override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {

        var timestamp = json_decode(element).toLong

        ts = Math.max(timestamp,previousElementTimestamp)

        timestamp

      }


      override def getCurrentWatermark(): Watermark = {

        new Watermark(ts)

      }

    })

    var output = mts

      .keyBy(t=>json_decode(t))

      .window(EventTimeSessionWindows.withGap(Time.seconds(60)))

      .allowedLateness(Time.seconds(5))

      .reduce((v1,v2)=>v1+"----"+v2)


output.writeAsText(path).setParallelism(1)


I am using FileSystem as statebackend. I am assuming this problem is related to memory cleaning, but I don't understand what's happening.

Any help?


Rahul Raj