Flink EventTime Processing Watermark is always coming as 9223372036854725808

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink EventTime Processing Watermark is always coming as 9223372036854725808

anuj.aj07

I am trying to use process function to some processing on a set of events. I am using event time and keystream. The issue I am facing is The watermark value is always coming as 9223372036854725808. I have put print statement to debug and it shows like this:

timestamp------1583128014000 extractedTimestamp 1583128014000 currentwatermark-----9223372036854775808

timestamp------1583128048000 extractedTimestamp 1583128048000 currentwatermark-----9223372036854775808

timestamp------1583128089000 extractedTimestamp 1583128089000 currentwatermark-----9223372036854775808

timestamp and extracted timestamp changing but watermark not getting updated. So no record is getting in the queue as context.timestamp is never less than the watermark.


DataStream<GenericRecord> dataStream = env.addSource(searchConsumer).name("search_list_keyless");
        DataStream<GenericRecord> dataStreamWithWaterMark =  dataStream.assignTimestampsAndWatermarks(new SessionAssigner());

       try {
            dataStreamWithWaterMark.keyBy((KeySelector<GenericRecord, String>) record -> {
                StringBuilder builder = new StringBuilder();
                builder.append(record.get("session_id"));
                builder.append(record.get("user_id"));
                return builder.toString();
            }).process(new MatchFunction()).print();
        }
        catch (Exception e){
            e.printStackTrace();
        }
        env.execute("start session process");

    }

    public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord>  {
        @Override
        public long extractTimestamp(GenericRecord record, long previousElementTimestamp) {
            long timestamp = (long) record.get("event_ts");
            System.out.println("timestamp------"+ timestamp);
            return timestamp;
        }

        @Override
        public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) {
            // simply emit a watermark with every event
            System.out.println("extractedTimestamp "+extractedTimestamp);
            return new Watermark(extractedTimestamp - 30000);
        }
 }
   
@Override
    public void processElement(GenericRecord record, Context context, Collector<Object> collector) throws Exception {

        TimerService timerService = context.timerService();
        System.out.println("currentwatermark----"+ timerService.currentWatermark());
        if (context.timestamp() > timerService.currentWatermark()) {

            Tuple2<Long, PriorityQueue<GenericRecord>> queueval = queueState.value();
            PriorityQueue<GenericRecord> queue = queueval.f1;
            long startTime = queueval.f0;
            System.out.println("starttime----"+ startTime);

            if (queue == null) {
                queue = new PriorityQueue<>(10, new TimeStampComprator());
                startTime = (long) record.get("event_ts");
            }
            queueState.update(new Tuple2<>(startTime, queue));
            timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
        }
    }

}
Please help me to underand what i am doing wrong.

--
Thanks & Regards,
Anuj Jain



Reply | Threaded
Open this post in threaded view
|

Re: Flink EventTime Processing Watermark is always coming as 9223372036854725808

Dawid Wysakowicz-2

Hi Anuj,

What parallelism has your source? Do all of your source tasks produce records? Watermark is always the minimum of timestamps seen from all the upstream operators. Therefore if some of them do not produce records the watermark will not progress. You can read more about Watermarks and how they work here: https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams

Hope that helps

Best,

Dawid

On 02/03/2020 16:26, aj wrote:

I am trying to use process function to some processing on a set of events. I am using event time and keystream. The issue I am facing is The watermark value is always coming as 9223372036854725808. I have put print statement to debug and it shows like this:

timestamp------1583128014000 extractedTimestamp 1583128014000 currentwatermark-----9223372036854775808

timestamp------1583128048000 extractedTimestamp 1583128048000 currentwatermark-----9223372036854775808

timestamp------1583128089000 extractedTimestamp 1583128089000 currentwatermark-----9223372036854775808

timestamp and extracted timestamp changing but watermark not getting updated. So no record is getting in the queue as context.timestamp is never less than the watermark.


DataStream<GenericRecord> dataStream = env.addSource(searchConsumer).name("search_list_keyless");
        DataStream<GenericRecord> dataStreamWithWaterMark =  dataStream.assignTimestampsAndWatermarks(new SessionAssigner());

       try {
            dataStreamWithWaterMark.keyBy((KeySelector<GenericRecord, String>) record -> {
                StringBuilder builder = new StringBuilder();
                builder.append(record.get("session_id"));
                builder.append(record.get("user_id"));
                return builder.toString();
            }).process(new MatchFunction()).print();
        }
        catch (Exception e){
            e.printStackTrace();
        }
        env.execute("start session process");

    }

    public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord>  {
        @Override
        public long extractTimestamp(GenericRecord record, long previousElementTimestamp) {
            long timestamp = (long) record.get("event_ts");
            System.out.println("timestamp------"+ timestamp);
            return timestamp;
        }

        @Override
        public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) {
            // simply emit a watermark with every event
            System.out.println("extractedTimestamp "+extractedTimestamp);
            return new Watermark(extractedTimestamp - 30000);
        }
 }
   
@Override
    public void processElement(GenericRecord record, Context context, Collector<Object> collector) throws Exception {

        TimerService timerService = context.timerService();
        System.out.println("currentwatermark----"+ timerService.currentWatermark());
        if (context.timestamp() > timerService.currentWatermark()) {

            Tuple2<Long, PriorityQueue<GenericRecord>> queueval = queueState.value();
            PriorityQueue<GenericRecord> queue = queueval.f1;
            long startTime = queueval.f0;
            System.out.println("starttime----"+ startTime);

            if (queue == null) {
                queue = new PriorityQueue<>(10, new TimeStampComprator());
                startTime = (long) record.get("event_ts");
            }
            queueState.update(new Tuple2<>(startTime, queue));
            timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
        }
    }

}
Please help me to underand what i am doing wrong.

--
Thanks & Regards,
Anuj Jain




signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink EventTime Processing Watermark is always coming as 9223372036854725808

anuj.aj07
Hi David,

Currently, I am testing it with a single source and parallelism 1 only so not able to understand this behavior.

On Mon, Mar 2, 2020 at 9:02 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Anuj,

What parallelism has your source? Do all of your source tasks produce records? Watermark is always the minimum of timestamps seen from all the upstream operators. Therefore if some of them do not produce records the watermark will not progress. You can read more about Watermarks and how they work here: https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams

Hope that helps

Best,

Dawid

On 02/03/2020 16:26, aj wrote:

I am trying to use process function to some processing on a set of events. I am using event time and keystream. The issue I am facing is The watermark value is always coming as 9223372036854725808. I have put print statement to debug and it shows like this:

timestamp------1583128014000 extractedTimestamp 1583128014000 currentwatermark-----9223372036854775808

timestamp------1583128048000 extractedTimestamp 1583128048000 currentwatermark-----9223372036854775808

timestamp------1583128089000 extractedTimestamp 1583128089000 currentwatermark-----9223372036854775808

timestamp and extracted timestamp changing but watermark not getting updated. So no record is getting in the queue as context.timestamp is never less than the watermark.


DataStream<GenericRecord> dataStream = env.addSource(searchConsumer).name("search_list_keyless");
        DataStream<GenericRecord> dataStreamWithWaterMark =  dataStream.assignTimestampsAndWatermarks(new SessionAssigner());

       try {
            dataStreamWithWaterMark.keyBy((KeySelector<GenericRecord, String>) record -> {
                StringBuilder builder = new StringBuilder();
                builder.append(record.get("session_id"));
                builder.append(record.get("user_id"));
                return builder.toString();
            }).process(new MatchFunction()).print();
        }
        catch (Exception e){
            e.printStackTrace();
        }
        env.execute("start session process");

    }

    public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord>  {
        @Override
        public long extractTimestamp(GenericRecord record, long previousElementTimestamp) {
            long timestamp = (long) record.get("event_ts");
            System.out.println("timestamp------"+ timestamp);
            return timestamp;
        }

        @Override
        public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) {
            // simply emit a watermark with every event
            System.out.println("extractedTimestamp "+extractedTimestamp);
            return new Watermark(extractedTimestamp - 30000);
        }
 }
   
@Override
    public void processElement(GenericRecord record, Context context, Collector<Object> collector) throws Exception {

        TimerService timerService = context.timerService();
        System.out.println("currentwatermark----"+ timerService.currentWatermark());
        if (context.timestamp() > timerService.currentWatermark()) {

            Tuple2<Long, PriorityQueue<GenericRecord>> queueval = queueState.value();
            PriorityQueue<GenericRecord> queue = queueval.f1;
            long startTime = queueval.f0;
            System.out.println("starttime----"+ startTime);

            if (queue == null) {
                queue = new PriorityQueue<>(10, new TimeStampComprator());
                startTime = (long) record.get("event_ts");
            }
            queueState.update(new Tuple2<>(startTime, queue));
            timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
        }
    }

}
Please help me to underand what i am doing wrong.

--
Thanks & Regards,
Anuj Jain





--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07



Reply | Threaded
Open this post in threaded view
|

Re: Flink EventTime Processing Watermark is always coming as 9223372036854725808

rmetzger0
side note: this question has been asked on SO as well: https://stackoverflow.com/questions/60487571/flink-eventtime-processing-watermark-is-always-coming-as-9223372036854725808
(I'm mentioning this here so that we are not wasting support resources in our community on double-debugging issues)

On Mon, Mar 2, 2020 at 5:36 PM aj <[hidden email]> wrote:
Hi David,

Currently, I am testing it with a single source and parallelism 1 only so not able to understand this behavior.

On Mon, Mar 2, 2020 at 9:02 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Anuj,

What parallelism has your source? Do all of your source tasks produce records? Watermark is always the minimum of timestamps seen from all the upstream operators. Therefore if some of them do not produce records the watermark will not progress. You can read more about Watermarks and how they work here: https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams

Hope that helps

Best,

Dawid

On 02/03/2020 16:26, aj wrote:

I am trying to use process function to some processing on a set of events. I am using event time and keystream. The issue I am facing is The watermark value is always coming as 9223372036854725808. I have put print statement to debug and it shows like this:

timestamp------1583128014000 extractedTimestamp 1583128014000 currentwatermark-----9223372036854775808

timestamp------1583128048000 extractedTimestamp 1583128048000 currentwatermark-----9223372036854775808

timestamp------1583128089000 extractedTimestamp 1583128089000 currentwatermark-----9223372036854775808

timestamp and extracted timestamp changing but watermark not getting updated. So no record is getting in the queue as context.timestamp is never less than the watermark.


DataStream<GenericRecord> dataStream = env.addSource(searchConsumer).name("search_list_keyless");
        DataStream<GenericRecord> dataStreamWithWaterMark =  dataStream.assignTimestampsAndWatermarks(new SessionAssigner());

       try {
            dataStreamWithWaterMark.keyBy((KeySelector<GenericRecord, String>) record -> {
                StringBuilder builder = new StringBuilder();
                builder.append(record.get("session_id"));
                builder.append(record.get("user_id"));
                return builder.toString();
            }).process(new MatchFunction()).print();
        }
        catch (Exception e){
            e.printStackTrace();
        }
        env.execute("start session process");

    }

    public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord>  {
        @Override
        public long extractTimestamp(GenericRecord record, long previousElementTimestamp) {
            long timestamp = (long) record.get("event_ts");
            System.out.println("timestamp------"+ timestamp);
            return timestamp;
        }

        @Override
        public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) {
            // simply emit a watermark with every event
            System.out.println("extractedTimestamp "+extractedTimestamp);
            return new Watermark(extractedTimestamp - 30000);
        }
 }
   
@Override
    public void processElement(GenericRecord record, Context context, Collector<Object> collector) throws Exception {

        TimerService timerService = context.timerService();
        System.out.println("currentwatermark----"+ timerService.currentWatermark());
        if (context.timestamp() > timerService.currentWatermark()) {

            Tuple2<Long, PriorityQueue<GenericRecord>> queueval = queueState.value();
            PriorityQueue<GenericRecord> queue = queueval.f1;
            long startTime = queueval.f0;
            System.out.println("starttime----"+ startTime);

            if (queue == null) {
                queue = new PriorityQueue<>(10, new TimeStampComprator());
                startTime = (long) record.get("event_ts");
            }
            queueState.update(new Tuple2<>(startTime, queue));
            timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
        }
    }

}
Please help me to underand what i am doing wrong.

--
Thanks & Regards,
Anuj Jain





--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07



Reply | Threaded
Open this post in threaded view
|

Re: Flink EventTime Processing Watermark is always coming as 9223372036854725808

anuj.aj07
Thanks, Robert for mentioning this, I will take care of it in future posts.

I am able to figure out the issue. When I disable checkpoint then the watermark is getting updated and its working. I need to understand 2 things :

1. Please help to understand what is happening when I enable checkpointing, and how to make it work with enable checkpointing as I need to write a data stream with checkpoint enable. 

2. Second, so basically I want to collect all the session data and want to process all the events data at the end of the session (using inactivity for x minutes). 
I know this functionality is available in the session window where I can create a session window using an inactive period But there enrichment and processing of events is not recommended. So, how I can use the same functionality to trigger based on the inactivity period and process all the events and clear the queue.


Thanks,
Anuj


On Tue, Mar 3, 2020 at 3:40 AM Robert Metzger <[hidden email]> wrote:
side note: this question has been asked on SO as well: https://stackoverflow.com/questions/60487571/flink-eventtime-processing-watermark-is-always-coming-as-9223372036854725808
(I'm mentioning this here so that we are not wasting support resources in our community on double-debugging issues)

On Mon, Mar 2, 2020 at 5:36 PM aj <[hidden email]> wrote:
Hi David,

Currently, I am testing it with a single source and parallelism 1 only so not able to understand this behavior.

On Mon, Mar 2, 2020 at 9:02 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Anuj,

What parallelism has your source? Do all of your source tasks produce records? Watermark is always the minimum of timestamps seen from all the upstream operators. Therefore if some of them do not produce records the watermark will not progress. You can read more about Watermarks and how they work here: https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams

Hope that helps

Best,

Dawid

On 02/03/2020 16:26, aj wrote:

I am trying to use process function to some processing on a set of events. I am using event time and keystream. The issue I am facing is The watermark value is always coming as 9223372036854725808. I have put print statement to debug and it shows like this:

timestamp------1583128014000 extractedTimestamp 1583128014000 currentwatermark-----9223372036854775808

timestamp------1583128048000 extractedTimestamp 1583128048000 currentwatermark-----9223372036854775808

timestamp------1583128089000 extractedTimestamp 1583128089000 currentwatermark-----9223372036854775808

timestamp and extracted timestamp changing but watermark not getting updated. So no record is getting in the queue as context.timestamp is never less than the watermark.


DataStream<GenericRecord> dataStream = env.addSource(searchConsumer).name("search_list_keyless");
        DataStream<GenericRecord> dataStreamWithWaterMark =  dataStream.assignTimestampsAndWatermarks(new SessionAssigner());

       try {
            dataStreamWithWaterMark.keyBy((KeySelector<GenericRecord, String>) record -> {
                StringBuilder builder = new StringBuilder();
                builder.append(record.get("session_id"));
                builder.append(record.get("user_id"));
                return builder.toString();
            }).process(new MatchFunction()).print();
        }
        catch (Exception e){
            e.printStackTrace();
        }
        env.execute("start session process");

    }

    public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord>  {
        @Override
        public long extractTimestamp(GenericRecord record, long previousElementTimestamp) {
            long timestamp = (long) record.get("event_ts");
            System.out.println("timestamp------"+ timestamp);
            return timestamp;
        }

        @Override
        public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) {
            // simply emit a watermark with every event
            System.out.println("extractedTimestamp "+extractedTimestamp);
            return new Watermark(extractedTimestamp - 30000);
        }
 }
   
@Override
    public void processElement(GenericRecord record, Context context, Collector<Object> collector) throws Exception {

        TimerService timerService = context.timerService();
        System.out.println("currentwatermark----"+ timerService.currentWatermark());
        if (context.timestamp() > timerService.currentWatermark()) {

            Tuple2<Long, PriorityQueue<GenericRecord>> queueval = queueState.value();
            PriorityQueue<GenericRecord> queue = queueval.f1;
            long startTime = queueval.f0;
            System.out.println("starttime----"+ startTime);

            if (queue == null) {
                queue = new PriorityQueue<>(10, new TimeStampComprator());
                startTime = (long) record.get("event_ts");
            }
            queueState.update(new Tuple2<>(startTime, queue));
            timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
        }
    }

}
Please help me to underand what i am doing wrong.

--
Thanks & Regards,
Anuj Jain





--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07





--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07