How to write WatermarkStrategy in Scala?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to write WatermarkStrategy in Scala?

Lu Weizheng
Hi there,

Flink 1.11 comes with the new WatermarkStrategy API to assign timestamp and watermark. I find there is no example in Scala.  I have a (String, Long) Stream, can anyone help implement WatermarkStrategy? I will be really gratefully!

val input: DataStream[(String, Long)] = ...
val watermark = input.assignTimestampsAndWatermarks(
    WatermarkStrategy.forGenerator(...)
    )

class MyPeriodicGenerator extends WatermarkGenerator[(String, Long)] {
final private val maxOutOfOrderness = 60 * 1000

private var currentMaxTimestamp = 0L

override def onEvent(event: (String, Long), eventTimestamp: Long, output: WatermarkOutput): Unit = {

currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp)
}

override def onPeriodicEmit(output: WatermarkOutput): Unit = {

output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness))
}
}


Reply | Threaded
Open this post in threaded view
|

Re: How to write WatermarkStrategy in Scala?

Dawid Wysakowicz-2

Hi,

Regrettably I must admit the WatermarkStrategy is not very scala friendly :(


1) After a couple of tries what I'd recommend as the most reliable is to pass it through anonymous classes:


    .assignTimestampsAndWatermarks(
      WatermarkStrategy.forGenerator[(String, Long)](
        new WatermarkGeneratorSupplier[(String, Long)] {
          override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Long)] =

            new MyPeriodicGenerator
        }
      )
        .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
          override def extractTimestamp(t: (String, Long), l: Long): Long = t._2
        })
    )


2) With scala 2.12 you can try the automatic conversion of scala's lambdas to java's SAM, but unfortunately when I tried it, it failed for timestamp assigner with some problems in the serialization stack. I could not identify the root problem of it yet. Therefore I can not fully recommend it.


.assignTimestampsAndWatermarks(
      WatermarkStrategy.forGenerator[(String, Long)](
        _ => new MyPeriodicGenerator
      )
        .withTimestampAssigner((e, _) => e._2)
    )


I create a ticket to improve the situation here: https://issues.apache.org/jira/browse/FLINK-18873


Best,

Dawid


On 08/08/2020 10:18, Lu Weizheng wrote:
Hi there,

Flink 1.11 comes with the new WatermarkStrategy API to assign timestamp and watermark. I find there is no example in Scala.  I have a (String, Long) Stream, can anyone help implement WatermarkStrategy? I will be really gratefully!

val input: DataStream[(String, Long)] = ...
val watermark = input.assignTimestampsAndWatermarks(
    WatermarkStrategy.forGenerator(...)
    )

class MyPeriodicGenerator extends WatermarkGenerator[(String, Long)] {
  final private val maxOutOfOrderness = 60 * 1000 

  private var currentMaxTimestamp = 0L 

  override def onEvent(event: (String, Long), eventTimestamp: Long, output: WatermarkOutput): Unit = {
    
    currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp)
  }

  override def onPeriodicEmit(output: WatermarkOutput): Unit = {
    
    output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness))
  }
}



signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: How to write WatermarkStrategy in Scala?

Lu Weizheng
Thank you Dawid,

I am using Scala 1.11. I come up with the same 1) solution as it may not be scala friendly. So I come here to ask question.  Hope the new API may not change significantly.

Best Regards,
Weizheng

2020年8月10日 下午8:29,Dawid Wysakowicz <[hidden email]> 写道:

Hi,
Regrettably I must admit the WatermarkStrategy is not very scala friendly :(

1) After a couple of tries what I'd recommend as the most reliable is to pass it through anonymous classes:

    .assignTimestampsAndWatermarks(
      WatermarkStrategy.forGenerator[(String, Long)](
        new WatermarkGeneratorSupplier[(String, Long)] {
          override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Long)] = 
            new MyPeriodicGenerator
        }
      )
        .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
          override def extractTimestamp(t: (String, Long), l: Long): Long = t._2
        })
    )

2) With scala 2.12 you can try the automatic conversion of scala's lambdas to java's SAM, but unfortunately when I tried it, it failed for timestamp assigner with some problems in the serialization stack. I could not identify the root problem of it yet. Therefore I can not fully recommend it.

.assignTimestampsAndWatermarks(
      WatermarkStrategy.forGenerator[(String, Long)](
        _ => new MyPeriodicGenerator
      )
        .withTimestampAssigner((e, _) => e._2)
    )

I create a ticket to improve the situation here: https://issues.apache.org/jira/browse/FLINK-18873

Best,
Dawid

On 08/08/2020 10:18, Lu Weizheng wrote:
Hi there,

Flink 1.11 comes with the new WatermarkStrategy API to assign timestamp and watermark. I find there is no example in Scala.  I have a (String, Long) Stream, can anyone help implement WatermarkStrategy? I will be really gratefully!

val input: DataStream[(String, Long)] = ...
val watermark = input.assignTimestampsAndWatermarks(
    WatermarkStrategy.forGenerator(...)
    )

class MyPeriodicGenerator extends WatermarkGenerator[(String, Long)] {
  final private val maxOutOfOrderness = 60 * 1000 

  private var currentMaxTimestamp = 0L 

  override def onEvent(event: (String, Long), eventTimestamp: Long, output: WatermarkOutput): Unit = {
    
    currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp)
  }

  override def onPeriodicEmit(output: WatermarkOutput): Unit = {
    
    output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness))
  }
}