Problem - Negative currentWatermark if the watermark assignment is made before connecting the streams

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Problem - Negative currentWatermark if the watermark assignment is made before connecting the streams

PedroMrChaves
Hello,

I have an application which has two different streams of data, one represents a set of events and the other a set of rules that need to be matched against the events. In order to do this I use a coFlatMapOperator. The problem is that if I assign the timestamps and watermarks after the streams have been connected everything works fine but if I do it before, I get a negative currentwatermark at the window and the operations on windows have no effect. What could be the problem?

If I assign Before the connect:



If I assign After the connect:



Main Code:

            DataStream<CSVEvent> sourceStream = environment
                    .addSource(new SampleDataGenerator(sourceData, true)).name("Source").setParallelism(1)
                    .assignTimestampsAndWatermarks(new TimestampAssigner()); // if I assign the timestamps here the watermak seen at the window is negative and the operations are not applied

            DataStream<String> rulesStream = environment
                    .socketTextStream(monitorAddress, monitorPort, DELIMITER)
                    .name("Rules Stream")
                    .setParallelism(1);

           
            SplitStream<RBEvent> processedStream = sourceStream.connect(rulesStream)
                    .flatMap(new RProcessor(rulesPath)).name("RBProcessor").setParallelism(1)
                    //.assignTimestampsAndWatermarks(new DynamicTimestampAssigner()).name("Assign Timestamps").setParallelism(1) // If I assign the watermarks here everything works fine
                    .split(new Spliter());

  processedStream
                    .select(RuleOperations.WINDOW_AGGRATION)
                    .keyBy(new DynamicKeySelector())
                    .window(new DynamicSlidingWindowAssigner())
                    .apply(new AggregationOperation()).name("Aggregation Operation").setParallelism(1)
                    .print().name("Windowed Rule Output").setParallelism(1);

(..omitted details..)



Timestamps and watermarks
 assigner:

public class TimestampAssigner implements AssignerWithPeriodicWatermarks<CSVEvent> {

    private final long MAX_DELAY = 2000; // 2 seconds
    private long currentMaxTimestamp;
    private long lastEmittedWatermark = Long.MIN_VALUE;

    @Override
    public long extractTimestamp(CSVEvent element, long previousElementTimestamp) {
        long timestamp = Long.parseLong(element.event.get(element.getTimeField()));
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        long potentialWM = currentMaxTimestamp - MAX_DELAY;
        if (potentialWM >= lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }

}


Regards,
Pedro Chaves

Best Regards,
Pedro Chaves
Reply | Threaded
Open this post in threaded view
|

Re: Problem - Negative currentWatermark if the watermark assignment is made before connecting the streams

Fabian Hueske-2
Hi Pedro,

if I read you code correctly, you are not assigning timestamps and watermarks to the rules stream.

Flink automatically derives watermarks from all streams involved.
If you do not assign a watermark, the default is watermark is Long.MIN_VALUE which is exactly the value you are observing.

Best,
Fabian

2016-11-23 19:08 GMT+01:00 PedroMrChaves <[hidden email]>:
Hello,

I have an application which has two different streams of data, one
represents a set of events and the other a set of rules that need to be
matched against the events. In order to do this I use a coFlatMapOperator.
The problem is that if I assign the timestamps and watermarks after the
streams have been connected everything works fine but if I do it before, I
get a negative *currentwatermark* at the window and the operations on
windows have no effect. What could be the problem?

If I assign *Before *the connect:

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n10315/negativeWatermark.png>

If I assign *After *the connect:

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n10315/normalWatermark.png>

*Main *Code:

/            DataStream<CSVEvent> sourceStream = environment
                    .addSource(new SampleDataGenerator(sourceData,
true)).name("Source").setParallelism(1)
                    .assignTimestampsAndWatermarks(new TimestampAssigner());
*// if I assign the timestamps here the watermak seen at the window is
negative and the operations are not applied*

            DataStream<String> rulesStream = environment
                    .socketTextStream(monitorAddress, monitorPort,
DELIMITER)
                    .name("Rules Stream")
                    .setParallelism(1);


            SplitStream<RBEvent> processedStream =
sourceStream.connect(rulesStream)
                    .flatMap(new
RProcessor(rulesPath)).name("RBProcessor").setParallelism(1)
                    //.assignTimestampsAndWatermarks(new
DynamicTimestampAssigner()).name("Assign Timestamps").setParallelism(1) *//
If I assign the watermarks here everything works fine*
                    .split(new Spliter());

  processedStream
                    .select(RuleOperations.WINDOW_AGGRATION)
                    .keyBy(new DynamicKeySelector())
                    .window(new DynamicSlidingWindowAssigner())
                    .apply(new AggregationOperation()).name("Aggregation
Operation").setParallelism(1)
                    .print().name("Windowed Rule Output").setParallelism(1);

(..omitted details..)/

*
Timestamps and watermarks* assigner:
/
public class TimestampAssigner implements
AssignerWithPeriodicWatermarks<CSVEvent> {

    private final long MAX_DELAY = 2000; // 2 seconds
    private long currentMaxTimestamp;
    private long lastEmittedWatermark = Long.MIN_VALUE;

    @Override
    public long extractTimestamp(CSVEvent element, long
previousElementTimestamp) {
        long timestamp =
Long.parseLong(element.event.get(element.getTimeField()));
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the
out-of-orderness bound
        long potentialWM = currentMaxTimestamp - MAX_DELAY;
        if (potentialWM >= lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }

}/

Regards,
Pedro Chaves





-----
Best Regards,
Pedro Chaves
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-Negative-currentWatermark-if-the-watermark-assignment-is-made-before-connecting-the-streams-tp10315.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Problem - Negative currentWatermark if the watermark assignment is made before connecting the streams

Vinay Patil
In reply to this post by PedroMrChaves
Hi Pedro,

Curious to know , Which tool you are using to see the watermark values ?

Regards,
Vinay Patil

On Wed, Nov 23, 2016 at 11:38 PM, PedroMrChaves [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Hello,

I have an application which has two different streams of data, one represents a set of events and the other a set of rules that need to be matched against the events. In order to do this I use a coFlatMapOperator. The problem is that if I assign the timestamps and watermarks after the streams have been connected everything works fine but if I do it before, I get a negative currentwatermark at the window and the operations on windows have no effect. What could be the problem?

If I assign Before the connect:



If I assign After the connect:



Main Code:

            DataStream<CSVEvent> sourceStream = environment
                    .addSource(new SampleDataGenerator(sourceData, true)).name("Source").setParallelism(1)
                    .assignTimestampsAndWatermarks(new TimestampAssigner()); // if I assign the timestamps here the watermak seen at the window is negative and the operations are not applied

            DataStream<String> rulesStream = environment
                    .socketTextStream(monitorAddress, monitorPort, DELIMITER)
                    .name("Rules Stream")
                    .setParallelism(1);

           
            SplitStream<RBEvent> processedStream = sourceStream.connect(rulesStream)
                    .flatMap(new RProcessor(rulesPath)).name("RBProcessor").setParallelism(1)
                    //.assignTimestampsAndWatermarks(new DynamicTimestampAssigner()).name("Assign Timestamps").setParallelism(1) // If I assign the watermarks here everything works fine
                    .split(new Spliter());

  processedStream
                    .select(RuleOperations.WINDOW_AGGRATION)
                    .keyBy(new DynamicKeySelector())
                    .window(new DynamicSlidingWindowAssigner())
                    .apply(new AggregationOperation()).name("Aggregation Operation").setParallelism(1)
                    .print().name("Windowed Rule Output").setParallelism(1);

(..omitted details..)



Timestamps and watermarks
 assigner:

public class TimestampAssigner implements AssignerWithPeriodicWatermarks<CSVEvent> {

    private final long MAX_DELAY = 2000; // 2 seconds
    private long currentMaxTimestamp;
    private long lastEmittedWatermark = Long.MIN_VALUE;

    @Override
    public long extractTimestamp(CSVEvent element, long previousElementTimestamp) {
        long timestamp = Long.parseLong(element.event.get(element.getTimeField()));
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        long potentialWM = currentMaxTimestamp - MAX_DELAY;
        if (potentialWM >= lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }

}


Regards,
Pedro Chaves

Best Regards,
Pedro Chaves



To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re: Problem - Negative currentWatermark if the watermark assignment is made before connecting the streams

PedroMrChaves
Hi Vinay ,

I'm simply using Netbeans Debugger.

Regards,
Pedro
Best Regards,
Pedro Chaves