Hi there,
Flink 1.11 comes with the new WatermarkStrategy API to assign timestamp and watermark. I find there is no example in Scala. I have a (String, Long) Stream, can anyone help implement WatermarkStrategy?
I will be really gratefully!
val input: DataStream[(String, Long)] = ...
val watermark = input.assignTimestampsAndWatermarks(
WatermarkStrategy.forGenerator(...)
)class MyPeriodicGenerator extends WatermarkGenerator[(String, Long)] { |
Hi, Regrettably I must admit the WatermarkStrategy is not very scala friendly :(
1) After a couple of tries what I'd recommend as the most reliable is to pass it through anonymous classes:
.assignTimestampsAndWatermarks( new MyPeriodicGenerator
2) With scala 2.12 you can try the automatic conversion of
scala's lambdas to java's SAM, but unfortunately when I tried it,
it failed for timestamp assigner with some problems in the
serialization stack. I could not identify the root problem of it
yet. Therefore I can not fully recommend it.
.assignTimestampsAndWatermarks(
I create a ticket to improve the situation here:
https://issues.apache.org/jira/browse/FLINK-18873
Best, Dawid
On 08/08/2020 10:18, Lu Weizheng wrote:
|
Thank you Dawid,
I
am using Scala 1.11. I come up with the same 1) solution as it may not be scala friendly. So I come here to ask question. Hope the new API may not change significantly.
Best
Regards,
Weizheng
|
Free forum by Nabble | Edit this page |