WaterMark & Eventwindow not fired correctly

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

WaterMark & Eventwindow not fired correctly

aitozi

Hi,

i have encounted a problem, i apply generate and assign watermark at the datastream, and then keyBy, and  EventTimewindow and  apply window Function.

in the log, i can see that watermark and the eventtime with the message are correct , and i think the situation bellow will trigger the window function :

1、watermark Time >= window_end_time
2、there is data in [window_start_time,window_end_time)

i check the log , it is satisfied . and i try to apply the trigger(CountTrigger.of(5)) Function and i can see in the log the windowapply Function is invocked.

And i am doubt why is the windowapply Function can not be triggerd only by the event time and watermark

thanks,
aitozi
Reply | Threaded
Open this post in threaded view
|

Re: WaterMark & Eventwindow not fired correctly

Aljoscha Krettek
Hi,

Could you please provide a snipped of code or some minimal example that would help us reproducing your problem?

Best,
Aljoscha

> On 3. Aug 2017, at 17:41, aitozi <[hidden email]> wrote:
>
>
> Hi,
>
> i have encounted a problem, i apply generate and assign watermark at the
> datastream, and then keyBy, and  EventTimewindow and  apply window Function.
>
> in the log, i can see that watermark and the eventtime with the message are
> correct , and i think the situation bellow will trigger the window function
> :
>
> 1、watermark Time >= window_end_time
> 2、there is data in [window_start_time,window_end_time)
>
> i check the log , it is satisfied . and i try to apply the
> trigger(CountTrigger.of(5)) Function and i can see in the log the
> windowapply Function is invocked.
>
> And i am doubt why is the windowapply Function can not be triggerd only by
> the event time and watermark
>
> thanks,
> aitozi
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: WaterMark & Eventwindow not fired correctly

aitozi

Hi,

my flink version is 1.2

i am work on this problem these days. Below is my found.

when i use "assignTimestampsAndWatermark" with same parallelism as 240 as the before operator, the before operator has two input(it is a "connected" Co-FlatMap operator with parallelism 240), it runs into that the watermark didn't update.

the i look into the source code, that the StreamTwoInputProcessor.java#processInput called by TwoInputStreamTask has method with processElement1() and processElement2() method, but all of them do not run processElement in StreamInputProcessor to extractTimestamp(shown in TimestampsAndPeriodicWatermarksOperator)

so that, the timestamp is not update, and my waterMark is update just like the class BoundedOutOfOrdernessTimestampExtractor .

So, is it a bug that the timestamp is not update when deal with a two input stream.

Ps: my English is not very good , i dont know can you understand me :)

thanks,
aitozi
Reply | Threaded
Open this post in threaded view
|

Re: WaterMark & Eventwindow not fired correctly

Aljoscha Krettek
Hi,

So I understood that you have roughly this pipeline:

Input 1 --\
           |- CoFlatMap - TimestampAndWatermarkAssigner - KeyBy - Window    
Input 2 --/

If the timestamp assigner is after the CoFlatMap the processInput() method of the extractor should still be called. Not by the StreamInputProcessor but by ChainingOutput [1], which basically connects the Two-Input CoFlatMap to the one-input operator that comes after that. The could still be a bug in there somewhere, however.

Could you maybe send me the relevant parts of your code, so that I can have a look. Or provide a minimal example.

Best,
Aljoscha

[1] https://github.com/apache/flink/blob/6f5fa7f741538207244368c275bee9958c43a25a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L394

> On 7. Aug 2017, at 19:21, aitozi <[hidden email]> wrote:
>
>
> Hi,
>
> my flink version is 1.2
>
> i am work on this problem these days. Below is my found.
>
> when i use "assignTimestampsAndWatermark" with same parallelism as 240 as
> the before operator, the before operator has two input(it is a "connected"
> Co-FlatMap operator with parallelism 240), it runs into that the watermark
> didn't update.
>
> the i look into the source code, that the
> StreamTwoInputProcessor.java#processInput called by TwoInputStreamTask has
> method with processElement1() and processElement2() method, but all of them
> do not run processElement in StreamInputProcessor to extractTimestamp(shown
> in TimestampsAndPeriodicWatermarksOperator)
>
> so that, the timestamp is not update, and my waterMark is update just like
> the class BoundedOutOfOrdernessTimestampExtractor .
>
> So, is it a bug that the timestamp is not update when deal with a two input
> stream.
>
> Ps: my English is not very good , i dont know can you understand me :)
>
> thanks,
> aitozi
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14727.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: WaterMark & Eventwindow not fired correctly

aitozi
Hi, Bellow is my code

 splitStream.select(duringTime + "")
                .map(new KeyMapFunc())
                .assignTimestampsAndWatermarks(new DelaySaltWatermarks())
                .setParallelism(300)
                .keyBy(_SQL, _KEY, _SALT)
                .window(TumblingEventTimeWindows.of(Time.seconds(duringTime/10)))
                .apply(new WindowSaltFunc())
                .keyBy(_SQL, _KEY)
                .window(TumblingEventTimeWindows.of(Time.seconds(duringTime)))
                .apply(new WindowFunc())
                .addSink(new FlinkKafkaProducer010<>("topic", new SimpleSerializationSchema(), this.properties));

and

public class DelaySaltWatermarks implements AssignerWithPeriodicWatermarks<ContentMessage> {

    private long currentMaxTimestamp;

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - MAX_OUT_OF_ORDER);
    }

    @Override
    public long extractTimestamp(ContentMessage contentMessage, long l) {
        long timestamp = contentMessage.getTimestamp();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }
}

and when i changed the Parallelism(300) of assigntimestampandwatermarks , the window can be fired.

thanks,
aitozi

Aljoscha Krettek wrote
Hi,

So I understood that you have roughly this pipeline:

Input 1 --\
           |- CoFlatMap - TimestampAndWatermarkAssigner - KeyBy - Window    
Input 2 --/

If the timestamp assigner is after the CoFlatMap the processInput() method of the extractor should still be called. Not by the StreamInputProcessor but by ChainingOutput [1], which basically connects the Two-Input CoFlatMap to the one-input operator that comes after that. The could still be a bug in there somewhere, however.

Could you maybe send me the relevant parts of your code, so that I can have a look. Or provide a minimal example.

Best,
Aljoscha

[1] https://github.com/apache/flink/blob/6f5fa7f741538207244368c275bee9958c43a25a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L394

> On 7. Aug 2017, at 19:21, aitozi <[hidden email]> wrote:
>
>
> Hi,
>
> my flink version is 1.2
>
> i am work on this problem these days. Below is my found.
>
> when i use "assignTimestampsAndWatermark" with same parallelism as 240 as
> the before operator, the before operator has two input(it is a "connected"
> Co-FlatMap operator with parallelism 240), it runs into that the watermark
> didn't update.
>
> the i look into the source code, that the
> StreamTwoInputProcessor.java#processInput called by TwoInputStreamTask has
> method with processElement1() and processElement2() method, but all of them
> do not run processElement in StreamInputProcessor to extractTimestamp(shown
> in TimestampsAndPeriodicWatermarksOperator)
>
> so that, the timestamp is not update, and my waterMark is update just like
> the class BoundedOutOfOrdernessTimestampExtractor .
>
> So, is it a bug that the timestamp is not update when deal with a two input
> stream.
>
> Ps: my English is not very good , i dont know can you understand me :)
>
> thanks,
> aitozi
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14727.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: WaterMark & Eventwindow not fired correctly

Aljoscha Krettek
Hi,

So when the parallelism of the timestamp assigner is different from the parallelism of the map(KeyMapFunc()) or the window then it works? But when the parallelism is the same it does not work?

If this is true, then I would assume, that some parallel instances of the timestamp assigner don't get any events and therefore don't advance the watermark. This, in turn, would mean that the downstream watermark also doesn't advance. Could you check in the web interface if all parallel instances of the assigner are processing elements when you have the same parallelism for all operations?

Best,
Aljoscha

On 9. Aug 2017, at 11:33, aitozi <[hidden email]> wrote:

Hi, Bellow is my code 

splitStream.select(duringTime + "")
               .map(new KeyMapFunc())
               .assignTimestampsAndWatermarks(new DelaySaltWatermarks())
               .setParallelism(300)
               .keyBy(_SQL, _KEY, _SALT)

.window(TumblingEventTimeWindows.of(Time.seconds(duringTime/10)))
               .apply(new WindowSaltFunc())
               .keyBy(_SQL, _KEY)

.window(TumblingEventTimeWindows.of(Time.seconds(duringTime)))
               .apply(new WindowFunc())
               .addSink(new FlinkKafkaProducer010<>("topic", new
SimpleSerializationSchema(), this.properties));

and 

public class DelaySaltWatermarks implements
AssignerWithPeriodicWatermarks<ContentMessage> {

   private long currentMaxTimestamp;

   @Nullable
   @Override
   public Watermark getCurrentWatermark() {
       return new Watermark(currentMaxTimestamp - MAX_OUT_OF_ORDER);
   }

   @Override
   public long extractTimestamp(ContentMessage contentMessage, long l) {
       long timestamp = contentMessage.getTimestamp();
       currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
       return timestamp;
   }
}

and when i changed the Parallelism(300) of assigntimestampandwatermarks ,
the window can be fired.

thanks,
aitozi


Aljoscha Krettek wrote
Hi,

So I understood that you have roughly this pipeline:

Input 1 --\
          |- CoFlatMap - TimestampAndWatermarkAssigner - KeyBy - Window    
Input 2 --/

If the timestamp assigner is after the CoFlatMap the processInput() method
of the extractor should still be called. Not by the StreamInputProcessor
but by ChainingOutput [1], which basically connects the Two-Input
CoFlatMap to the one-input operator that comes after that. The could still
be a bug in there somewhere, however.

Could you maybe send me the relevant parts of your code, so that I can
have a look. Or provide a minimal example.

Best,
Aljoscha

[1]
https://github.com/apache/flink/blob/6f5fa7f741538207244368c275bee9958c43a25a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L394

On 7. Aug 2017, at 19:21, aitozi &lt;

gjying1314@

&gt; wrote:


Hi,

my flink version is 1.2

i am work on this problem these days. Below is my found.

when i use "assignTimestampsAndWatermark" with same parallelism as 240 as
the before operator, the before operator has two input(it is a
"connected"
Co-FlatMap operator with parallelism 240), it runs into that the
watermark
didn't update.

the i look into the source code, that the
StreamTwoInputProcessor.java#processInput called by TwoInputStreamTask
has
method with processElement1() and processElement2() method, but all of
them
do not run processElement in StreamInputProcessor to
extractTimestamp(shown
in TimestampsAndPeriodicWatermarksOperator)

so that, the timestamp is not update, and my waterMark is update just
like
the class BoundedOutOfOrdernessTimestampExtractor .

So, is it a bug that the timestamp is not update when deal with a two
input
stream.

Ps: my English is not very good , i dont know can you understand me :)

thanks,
aitozi



--
View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14727.html
Sent from the Apache Flink User Mailing List archive. mailing list
archive at Nabble.com.





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14753.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.