Debugging watermarks?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Debugging watermarks?

Niels Basjes

Hi,

I was working on a streaming application last week and I got stuck in a situation where I got the same time based window many times.
I expect that I made a mistake in creating the watermarks in relation to the data I have and the watermark generating code.

Writing the events to the console (for debugging) is easy, yet I have not been able to write the watermarks to my console.

My question is very simple: How do I log the watermarks in the console so I can see the data and understand my mistake.

I would also like to know "where do the watermarks live" in relation to the actual data.

Thanks.

Niels Basjes

Reply | Threaded
Open this post in threaded view
|

Re: Debugging watermarks?

Aljoscha Krettek
Hi Niels,
you can log the watermarks by implementing a custom operator. (Operators have access to the watermarks.) The map operator is a good example for this:

@Internal
public class StreamMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element.replace(userFunction.map(element.getValue())));
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        output.emitWatermark(mark);
    }
}

In processWatermark() you would print/log the watermark. You can have a simple identity operator that just forwards and prints and insert it anywhere in the pipeline.


-Aljoscha

On Sat, 21 May 2016 at 16:05 Niels Basjes <[hidden email]> wrote:

Hi,

I was working on a streaming application last week and I got stuck in a situation where I got the same time based window many times.
I expect that I made a mistake in creating the watermarks in relation to the data I have and the watermark generating code.

Writing the events to the console (for debugging) is easy, yet I have not been able to write the watermarks to my console.

My question is very simple: How do I log the watermarks in the console so I can see the data and understand my mistake.

I would also like to know "where do the watermarks live" in relation to the actual data.

Thanks.

Niels Basjes

Reply | Threaded
Open this post in threaded view
|

Re: Debugging watermarks?

Stephan Ewen
Hi Niels!

It may also be interesting for you to know that with the extension of the metrics and the web frontend, watermarks should be easily trackable in the near future, via JMX metrics, or a tab in the Flink dashboard.

Stephan


On Sat, May 21, 2016 at 5:15 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi Niels,
you can log the watermarks by implementing a custom operator. (Operators have access to the watermarks.) The map operator is a good example for this:

@Internal
public class StreamMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element.replace(userFunction.map(element.getValue())));
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        output.emitWatermark(mark);
    }
}

In processWatermark() you would print/log the watermark. You can have a simple identity operator that just forwards and prints and insert it anywhere in the pipeline.


-Aljoscha

On Sat, 21 May 2016 at 16:05 Niels Basjes <[hidden email]> wrote:

Hi,

I was working on a streaming application last week and I got stuck in a situation where I got the same time based window many times.
I expect that I made a mistake in creating the watermarks in relation to the data I have and the watermark generating code.

Writing the events to the console (for debugging) is easy, yet I have not been able to write the watermarks to my console.

My question is very simple: How do I log the watermarks in the console so I can see the data and understand my mistake.

I would also like to know "where do the watermarks live" in relation to the actual data.

Thanks.

Niels Basjes


Reply | Threaded
Open this post in threaded view
|

Re: Debugging watermarks?

Niels Basjes
Thanks guys,
Using the above code as a reference I was quickly able to find the problems in my code.

Niels Basjes

On Sun, May 22, 2016 at 2:00 PM, Stephan Ewen <[hidden email]> wrote:
Hi Niels!

It may also be interesting for you to know that with the extension of the metrics and the web frontend, watermarks should be easily trackable in the near future, via JMX metrics, or a tab in the Flink dashboard.

Stephan


On Sat, May 21, 2016 at 5:15 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi Niels,
you can log the watermarks by implementing a custom operator. (Operators have access to the watermarks.) The map operator is a good example for this:

@Internal
public class StreamMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element.replace(userFunction.map(element.getValue())));
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        output.emitWatermark(mark);
    }
}

In processWatermark() you would print/log the watermark. You can have a simple identity operator that just forwards and prints and insert it anywhere in the pipeline.


-Aljoscha

On Sat, 21 May 2016 at 16:05 Niels Basjes <[hidden email]> wrote:

Hi,

I was working on a streaming application last week and I got stuck in a situation where I got the same time based window many times.
I expect that I made a mistake in creating the watermarks in relation to the data I have and the watermark generating code.

Writing the events to the console (for debugging) is easy, yet I have not been able to write the watermarks to my console.

My question is very simple: How do I log the watermarks in the console so I can see the data and understand my mistake.

I would also like to know "where do the watermarks live" in relation to the actual data.

Thanks.

Niels Basjes





--
Best regards / Met vriendelijke groeten,

Niels Basjes