Using Flink EventTime feature, I implement the class AssignerWithPeriodicWatermark such that:
public static class SampleTimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple3<String, Long, JSONObject>> { In addition, I set the watermark interval to 100 milliseconds: env.getConfig().setAutoWatermarkInterval(100);But when I check the logs, some watermarks are -3000, so in getCurrentWatermark method, it considers the MAX_TIMESTAMP zero (0 - 3000 = -3000), while I can see in the logs that the MAX_TIMESTAMP has a value greater than zero! Here is a part of the output: Max TimeStamp : 1532934243136 Max TimeStamp : 1532934243136 Max TimeStamp : 1532934243144 Max TimeStamp : 1532934243144 Max TimeStamp : 1532934243152 Max TimeStamp : 1532934243152 Max TimeStamp : 1532934243160 Max TimeStamp : 1532934243160 Max TimeStamp : 1532934243168 Max TimeStamp : 1532934243168 Current WatreMark : 1532934240168 Current WatreMark : -3000 Current WatreMark : -3000 Current WatreMark : 1532934240168 Max TimeStamp : 1532934243176 Max TimeStamp : 1532934243176 Max TimeStamp : 1532934243184 Max TimeStamp : 1532934243200 Max TimeStamp : 1532934243208 Max TimeStamp : 1532934243184 |
HI Soheil,
That may relate to your parallelism since each extractor instance compute its own watermarks. Try to print the max timestamps with the current thread’s name and you will notice this. Best, Xingcan
|
Free forum by Nabble | Edit this page |