assignTimestampsAndWatermarks not working as expected

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

assignTimestampsAndWatermarks not working as expected

Jayesh Patel

Can anybody see what’s wrong with the following code?  I am using Flink 1.2 and have tried running it in Eclipse (local mode) as well as on a 3 node cluster and it’s not behaving as expected.

 

The idea is to have a custom source collect messages from a JMS topic (I have a fake source for now that generates some out of order messages with event time that is not delayed more than 5 seconds).  The source doesn’t collectWithTimestamp() or emitWatermark().

The messages (events) include the event time.  In order to allow for late or out of order messages I use assignTimestampsAndWatermarks with BoundedOutOfOrdernessTimestampExtractor and the extractTimestamp() method retrieves the event time from the event.

 

When I run this job, I don’t get the printout from the extractTimestamp() method, nor do I get the logTuples.print() or stampedLogs.print() output.  When running on the local environment(Eclipse) I do see the printouts from the fake source (MockSource – not shown here).  But I don’t even get those when run from my 3 node cluster with parallelism of 3.

 

public static void main(String[] args) throws Exception {

       final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

       env.getConfig().setAutoWatermarkInterval(2000); // just for debugging, didn’t affect the behavior

 

       DataStream<Message> logs = env.addSource(new MockSource());

       DataStream<Tuple2<String, CEFEvent>> logTuples = logs.map(new ParseEvent());

       logTuples.print();

 

 

       DataStream<Tuple2<String, CEFEvent>> stampedLogs = logTuples.assignTimestampsAndWatermarks(

new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,CEFEvent>>(Time.seconds(5)) {

                     private static final long serialVersionUID = 1L;

                     @Override

                     public long extractTimestamp(Tuple2<String,CEFEvent> element) {

                            // This is how to extract timestamp from the event

                           long eventTime = element.f1.getEventStartTime().toInstant().toEpochMilli();

                           System.out.println("returning event time " + eventTime);

                           return eventTime;

                     }});

       stampedLogs.print();

       env.execute(“simulation”);

}

 

Thank you,

Jayesh


smime.p7s (5K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

RE: assignTimestampsAndWatermarks not working as expected

Jayesh Patel

I figured out what’s wrong – there was a silly mistake on my side.  There is nothing wrong with the code  here, but please do let me know if you see anything wrong with my approach.

 

Thank you.

 

From: Jayesh Patel
Sent: Thursday, May 04, 2017 10:00 AM
To: '[hidden email]' <[hidden email]>
Subject: assignTimestampsAndWatermarks not working as expected

 

Can anybody see what’s wrong with the following code?  I am using Flink 1.2 and have tried running it in Eclipse (local mode) as well as on a 3 node cluster and it’s not behaving as expected.

 

The idea is to have a custom source collect messages from a JMS topic (I have a fake source for now that generates some out of order messages with event time that is not delayed more than 5 seconds).  The source doesn’t collectWithTimestamp() or emitWatermark().

The messages (events) include the event time.  In order to allow for late or out of order messages I use assignTimestampsAndWatermarks with BoundedOutOfOrdernessTimestampExtractor and the extractTimestamp() method retrieves the event time from the event.

 

When I run this job, I don’t get the printout from the extractTimestamp() method, nor do I get the logTuples.print() or stampedLogs.print() output.  When running on the local environment(Eclipse) I do see the printouts from the fake source (MockSource – not shown here).  But I don’t even get those when run from my 3 node cluster with parallelism of 3.

 

public static void main(String[] args) throws Exception {

       final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

       env.getConfig().setAutoWatermarkInterval(2000); // just for debugging, didn’t affect the behavior

 

       DataStream<Message> logs = env.addSource(new MockSource());

       DataStream<Tuple2<String, CEFEvent>> logTuples = logs.map(new ParseEvent());

       logTuples.print();

 

 

       DataStream<Tuple2<String, CEFEvent>> stampedLogs = logTuples.assignTimestampsAndWatermarks(

new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,CEFEvent>>(Time.seconds(5)) {

                     private static final long serialVersionUID = 1L;

                     @Override

                     public long extractTimestamp(Tuple2<String,CEFEvent> element) {

                            // This is how to extract timestamp from the event

                           long eventTime = element.f1.getEventStartTime().toInstant().toEpochMilli();

                           System.out.println("returning event time " + eventTime);

                           return eventTime;

                     }});

       stampedLogs.print();

       env.execute(“simulation”);

}

 

Thank you,

Jayesh


smime.p7s (5K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: assignTimestampsAndWatermarks not working as expected

Kostas Kloudas
Hi Jayesh,

Glad that it finally worked! 

From a first look, I cannot spot anything wrong with the code itself.
The only thing I have to note is that the locations of the logs and the printouts  you put
in your code differ and normally they are not printed in the console.

Thanks,
Kostas

On May 4, 2017, at 6:45 PM, Jayesh Patel <[hidden email]> wrote:

I figured out what’s wrong – there was a silly mistake on my side.  There is nothing wrong with the code  here, but please do let me know if you see anything wrong with my approach.
 
Thank you.
 
From: Jayesh Patel 
Sent: Thursday, May 04, 2017 10:00 AM
To: '[hidden email]' <[hidden email]>
Subject: assignTimestampsAndWatermarks not working as expected
 
Can anybody see what’s wrong with the following code?  I am using Flink 1.2 and have tried running it in Eclipse (local mode) as well as on a 3 node cluster and it’s not behaving as expected.
 
The idea is to have a custom source collect messages from a JMS topic (I have a fake source for now that generates some out of order messages with event time that is not delayed more than 5 seconds).  The source doesn’t collectWithTimestamp() or emitWatermark().
The messages (events) include the event time.  In order to allow for late or out of order messages I use assignTimestampsAndWatermarks with BoundedOutOfOrdernessTimestampExtractor and the extractTimestamp() method retrieves the event time from the event.
 
When I run this job, I don’t get the printout from the extractTimestamp() method, nor do I get the logTuples.print() or stampedLogs.print() output.  When running on the local environment(Eclipse) I do see the printouts from the fake source (MockSource – not shown here).  But I don’t even get those when run from my 3 node cluster with parallelism of 3.
 
public static void main(String[] args) throws Exception {
       final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
       env.getConfig().setAutoWatermarkInterval(2000); // just for debugging, didn’t affect the behavior
 
       DataStream<Message> logs = env.addSource(new MockSource());
       DataStream<Tuple2<String, CEFEvent>> logTuples = logs.map(new ParseEvent());
       logTuples.print();
 
 
       DataStream<Tuple2<String, CEFEvent>> stampedLogs = logTuples.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,CEFEvent>>(Time.seconds(5)) {
                     private static final long serialVersionUID = 1L;
                     @Override
                     public long extractTimestamp(Tuple2<String,CEFEvent> element) {
                            // This is how to extract timestamp from the event
                           long eventTime = element.f1.getEventStartTime().toInstant().toEpochMilli();
                           System.out.println("returning event time " + eventTime);
                           return eventTime;
                     }});
       stampedLogs.print();
       env.execute(“simulation”);
}
 
Thank you,
Jayesh