| 
					
	
	 
		Hello,
 
				I have an application which has two different streams of data, one represents a set of events and the other a set of rules that need to be matched against the events. In order to do this I use a coFlatMapOperator. The problem is that if I assign the timestamps and watermarks after the streams have been connected everything works fine but if I do it before, I get a negative currentwatermark at the window and the operations on windows have no effect. What could be the problem? If I assign Before the connect: ![]() If I assign After the connect: ![]() Main Code: DataStream<CSVEvent> sourceStream = environment .addSource(new SampleDataGenerator(sourceData, true)).name("Source").setParallelism(1) .assignTimestampsAndWatermarks(new TimestampAssigner()); // if I assign the timestamps here the watermak seen at the window is negative and the operations are not applied DataStream<String> rulesStream = environment .socketTextStream(monitorAddress, monitorPort, DELIMITER) .name("Rules Stream") .setParallelism(1); SplitStream<RBEvent> processedStream = sourceStream.connect(rulesStream) .flatMap(new RProcessor(rulesPath)).name("RBProcessor").setParallelism(1) //.assignTimestampsAndWatermarks(new DynamicTimestampAssigner()).name("Assign Timestamps").setParallelism(1) // If I assign the watermarks here everything works fine .split(new Spliter()); processedStream .select(RuleOperations.WINDOW_AGGRATION) .keyBy(new DynamicKeySelector()) .window(new DynamicSlidingWindowAssigner()) .apply(new AggregationOperation()).name("Aggregation Operation").setParallelism(1) .print().name("Windowed Rule Output").setParallelism(1); (..omitted details..) Timestamps and watermarks assigner: public class TimestampAssigner implements AssignerWithPeriodicWatermarks<CSVEvent> { private final long MAX_DELAY = 2000; // 2 seconds private long currentMaxTimestamp; private long lastEmittedWatermark = Long.MIN_VALUE; @Override public long extractTimestamp(CSVEvent element, long previousElementTimestamp) { long timestamp = Long.parseLong(element.event.get(element.getTimeField())); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { // return the watermark as current highest timestamp minus the out-of-orderness bound long potentialWM = currentMaxTimestamp - MAX_DELAY; if (potentialWM >= lastEmittedWatermark) { lastEmittedWatermark = potentialWM; } return new Watermark(lastEmittedWatermark); } } Regards, Pedro Chaves 
				Best Regards,
 
	Pedro Chaves  | 
			
| 
					
	
	 Hi Pedro, if I read you code correctly, you are not assigning timestamps and watermarks to the rules stream. Flink automatically derives watermarks from all streams involved.  If you do not assign a watermark, the default is watermark is Long.MIN_VALUE which is exactly the value you are observing. Best, Fabian 2016-11-23 19:08 GMT+01:00 PedroMrChaves <[hidden email]>: Hello,  | 
			
| 
					
	 
				In reply to this post by PedroMrChaves
			 
	Hi Pedro, Curious to know , Which tool you are using to see the watermark values ? Regards, Vinay Patil On Wed, Nov 23, 2016 at 11:38 PM, PedroMrChaves [via Apache Flink User Mailing List archive.] <[hidden email]> wrote: Hello,  | 
			
| 
					
	
	 
		Hi Vinay ,
 
				I'm simply using Netbeans Debugger. Regards, Pedro 
				Best Regards,
 
	Pedro Chaves  | 
			
| Free forum by Nabble | Edit this page | 
	
	
		