Hello,
I have an application which has two different streams of data, one represents a set of events and the other a set of rules that need to be matched against the events. In order to do this I use a coFlatMapOperator. The problem is that if I assign the timestamps and watermarks after the streams have been connected everything works fine but if I do it before, I get a negative currentwatermark at the window and the operations on windows have no effect. What could be the problem? If I assign Before the connect: If I assign After the connect: Main Code: DataStream<CSVEvent> sourceStream = environment .addSource(new SampleDataGenerator(sourceData, true)).name("Source").setParallelism(1) .assignTimestampsAndWatermarks(new TimestampAssigner()); // if I assign the timestamps here the watermak seen at the window is negative and the operations are not applied DataStream<String> rulesStream = environment .socketTextStream(monitorAddress, monitorPort, DELIMITER) .name("Rules Stream") .setParallelism(1); SplitStream<RBEvent> processedStream = sourceStream.connect(rulesStream) .flatMap(new RProcessor(rulesPath)).name("RBProcessor").setParallelism(1) //.assignTimestampsAndWatermarks(new DynamicTimestampAssigner()).name("Assign Timestamps").setParallelism(1) // If I assign the watermarks here everything works fine .split(new Spliter()); processedStream .select(RuleOperations.WINDOW_AGGRATION) .keyBy(new DynamicKeySelector()) .window(new DynamicSlidingWindowAssigner()) .apply(new AggregationOperation()).name("Aggregation Operation").setParallelism(1) .print().name("Windowed Rule Output").setParallelism(1); (..omitted details..) Timestamps and watermarks assigner: public class TimestampAssigner implements AssignerWithPeriodicWatermarks<CSVEvent> { private final long MAX_DELAY = 2000; // 2 seconds private long currentMaxTimestamp; private long lastEmittedWatermark = Long.MIN_VALUE; @Override public long extractTimestamp(CSVEvent element, long previousElementTimestamp) { long timestamp = Long.parseLong(element.event.get(element.getTimeField())); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { // return the watermark as current highest timestamp minus the out-of-orderness bound long potentialWM = currentMaxTimestamp - MAX_DELAY; if (potentialWM >= lastEmittedWatermark) { lastEmittedWatermark = potentialWM; } return new Watermark(lastEmittedWatermark); } } Regards, Pedro Chaves
Best Regards,
Pedro Chaves |
Hi Pedro, if I read you code correctly, you are not assigning timestamps and watermarks to the rules stream. Flink automatically derives watermarks from all streams involved. If you do not assign a watermark, the default is watermark is Long.MIN_VALUE which is exactly the value you are observing. Best, Fabian 2016-11-23 19:08 GMT+01:00 PedroMrChaves <[hidden email]>: Hello, |
In reply to this post by PedroMrChaves
Hi Pedro, Curious to know , Which tool you are using to see the watermark values ? Regards, Vinay Patil On Wed, Nov 23, 2016 at 11:38 PM, PedroMrChaves [via Apache Flink User Mailing List archive.] <[hidden email]> wrote: Hello, |
Hi Vinay ,
I'm simply using Netbeans Debugger. Regards, Pedro
Best Regards,
Pedro Chaves |
Free forum by Nabble | Edit this page |