RuntimeException with valve output watermark when using CoGroup

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

RuntimeException with valve output watermark when using CoGroup

Taneli Saastamoinen
Morning everyone,

I'm getting the following exception in my Flink job (Flink version is 1.5.0):

java.lang.RuntimeException: Exception occurred while processing valve output watermark:
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)

I'm not sure why this is happening but I suspect it could be a bug in Flink.

My code is too proprietary to be shared directly, but here's the general gist. I'm getting data in as JSON, parsing it into POJOs, and then aggregating those with coGroup(), taking the maximum of two separate fields. I then take the results of this and aggregate it again, taking the average of these maximums grouped by a different field. In pseudocode:

// first aggregation

parsed = source
    .flatMap(new JsonConverterAndFilterer())
    .assignTimestampsAndWatermarks(new MyTimestampExtractor(MAX_OUT_OF_ORDERNESS));

X = parsed.filter(q -> q.getX() != null);

Y = parsed.filter(q -> q.getY() != null).map(AggregatedPojoClass::new);

joined = X
    .coGroup(Y)
    .where(PojoClass::getId)
    .equalTo(AggregatedPojoClass::getId)
    .window(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
    .apply(findMaximums());

// findMaximums() returns:

@Override
public void coGroup(Iterable<PojoClass> xMaybe, Iterable<AggregatedPojoClass> yMaybe, Collector<AggregatedPojoClass> collector) throws Exception {
    final Iterator<PojoClass> x = xMaybe.iterator();
    final Iterator<AggregatedPojoClass> y = yMaybe.iterator();
    if(x.hasNext() && y.hasNext()) {
        final PojoClass maxX = findMaxX(x);
        final AggregatedPojoClass maxY = findMaxY(y);
        if(maxX != null && maxY != null) {
            collector.collect(new AggregatedPojoClass(maxX).updateMaxY(maxY.getY()));
        } else {
            log.warn("[CoGroup case 1] Max X or max Y is null - SKIPPING: max x {}, max y {}", maxX, maxY);
        }
    } // ...other cases omitted for brevity...

// second aggregation

final DataStream<AggregatedPojoClass> result = joined
    .keyBy(AggregatedPojoClass::getSecondaryId)
    .window(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
    .apply(new WindowAverageFunction());

final DataStream<String> resultJson = result
    .map(JsonUtil::toJson);
// then a simple sink on this

// WindowAverageFunction is:

public class WindowAverageFunction implements WindowFunction<AggregatedPojoClass, AggregatedPojoClass, String, TimeWindow> {
    @Override
    public void apply(String secondaryId, TimeWindow timeWindow, Iterable<AggregatedPojoClass> input, Collector<AggregatedPojoClass> out) throws Exception {
        final Iterator<AggregatedPojoClass> i = input.iterator();
        if(!i.hasNext()) {
            log.warn("Got empty window for secondary id '{}' - ignoring...", secondaryId);
            return;
        }
        // calculate some simple averages of the X and Y...


Now when I run this code with some real data, an exception happens on line 88 here, inside the custom coGroup() function above (I've added line numbers to clarify):

87        if(maxY != null) {
88            collector.collect(maxY);
89        } else {
90            log.warn("[CoGroup case 3] Max Y null - SKIPPING");
91        }

The stack trace is as follows:

java.lang.RuntimeException: Exception occurred while processing valve output watermark:
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at com.mycorp.flink.AggregateStuffFlinkJob$1.coGroup(AggregateStuffFlinkJob.java:88)
at org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:683)
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
Caused by: java.lang.NullPointerException
2018-07-27 13:34:36,271 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Window(TumblingEventTimeWindows(300000), EventTimeTrigger, CoGroupWindowFunction) (1/1) (c3a09b0645721f8afd02a57d6a24ea39).
2018-07-27 13:34:36,271 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Window(TumblingEventTimeWindows(300000), EventTimeTrigger, CoGroupWindowFunction) (1/1) (c3a09b0645721f8afd02a57d6a24ea39) [FAILED]
2018-07-27 13:34:36,271 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Window(TumblingEventTimeWindows(300000), EventTimeTrigger, CoGroupWindowFunction) c3a09b0645721f8afd02a57d6a24ea39.



Am I doing something wrong? Or is this a bug? Any ideas appreciated.

Cheers,
Reply | Threaded
Open this post in threaded view
|

Re: RuntimeException with valve output watermark when using CoGroup

Chesnay Schepler
At first glance this looks like a bug. Is the nothing in the stack trace
after the NullPointerException?

How reliably can you reproduce this?

On 27.07.2018 19:00, Taneli Saastamoinen wrote:

> Morning everyone,
>
> I'm getting the following exception in my Flink job (Flink version is
> 1.5.0):
>
> java.lang.RuntimeException: Exception occurred while processing valve
> output watermark:
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>
> I'm not sure why this is happening but I suspect it could be a bug in
> Flink.
>
> My code is too proprietary to be shared directly, but here's the
> general gist. I'm getting data in as JSON, parsing it into POJOs, and
> then aggregating those with coGroup(), taking the maximum of two
> separate fields. I then take the results of this and aggregate it
> again, taking the average of these maximums grouped by a different
> field. In pseudocode:
>
> // first aggregation
>
> parsed = source
>     .flatMap(new JsonConverterAndFilterer())
>     .assignTimestampsAndWatermarks(new
> MyTimestampExtractor(MAX_OUT_OF_ORDERNESS));
>
> X = parsed.filter(q -> q.getX() != null);
>
> Y = parsed.filter(q -> q.getY() != null).map(AggregatedPojoClass::new);
>
> joined = X
>     .coGroup(Y)
>     .where(PojoClass::getId)
>     .equalTo(AggregatedPojoClass::getId)
> .window(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
>     .apply(findMaximums());
>
> // findMaximums() returns:
>
> @Override
> public void coGroup(Iterable<PojoClass> xMaybe,
> Iterable<AggregatedPojoClass> yMaybe, Collector<AggregatedPojoClass>
> collector) throws Exception {
>     final Iterator<PojoClass> x = xMaybe.iterator();
>     final Iterator<AggregatedPojoClass> y = yMaybe.iterator();
>     if(x.hasNext() && y.hasNext()) {
>         final PojoClass maxX = findMaxX(x);
>         final AggregatedPojoClass maxY = findMaxY(y);
>         if(maxX != null && maxY != null) {
>             collector.collect(new
> AggregatedPojoClass(maxX).updateMaxY(maxY.getY()));
>         } else {
>             log.warn("[CoGroup case 1] Max X or max Y is null -
> SKIPPING: max x {}, max y {}", maxX, maxY);
>         }
>     } // ...other cases omitted for brevity...
>
> // second aggregation
>
> final DataStream<AggregatedPojoClass> result = joined
>     .keyBy(AggregatedPojoClass::getSecondaryId)
> .window(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
>     .apply(new WindowAverageFunction());
>
> final DataStream<String> resultJson = result
>     .map(JsonUtil::toJson);
> // then a simple sink on this
>
> // WindowAverageFunction is:
>
> public class WindowAverageFunction implements
> WindowFunction<AggregatedPojoClass, AggregatedPojoClass, String,
> TimeWindow> {
>     @Override
>     public void apply(String secondaryId, TimeWindow timeWindow,
> Iterable<AggregatedPojoClass> input, Collector<AggregatedPojoClass>
> out) throws Exception {
>         final Iterator<AggregatedPojoClass> i = input.iterator();
>         if(!i.hasNext()) {
>             log.warn("Got empty window for secondary id '{}' -
> ignoring...", secondaryId);
>             return;
>         }
>         // calculate some simple averages of the X and Y...
>
>
> Now when I run this code with some real data, an exception happens on
> line 88 here, inside the custom coGroup() function above (I've added
> line numbers to clarify):
>
> 87        if(maxY != null) {
> 88            collector.collect(maxY);
> 89        } else {
> 90            log.warn("[CoGroup case 3] Max Y null - SKIPPING");
> 91        }
>
> The stack trace is as follows:
>
> java.lang.RuntimeException: Exception occurred while processing valve
> output watermark:
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.mycorp.flink.AggregateStuffFlinkJob$1.coGroup(AggregateStuffFlinkJob.java:88)
> at
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:683)
> at
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
> at
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457)
> at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
> ... 7 more
> Caused by: java.lang.NullPointerException
> 2018-07-27 13:34:36,271 INFO org.apache.flink.runtime.taskmanager.Task
> - Freeing task resources for Window(TumblingEventTimeWindows(300000),
> EventTimeTrigger, CoGroupWindowFunction) (1/1)
> (c3a09b0645721f8afd02a57d6a24ea39).
> 2018-07-27 13:34:36,271 INFO org.apache.flink.runtime.taskmanager.Task
> - Ensuring all FileSystem streams are closed for task
> Window(TumblingEventTimeWindows(300000), EventTimeTrigger,
> CoGroupWindowFunction) (1/1) (c3a09b0645721f8afd02a57d6a24ea39) [FAILED]
> 2018-07-27 13:34:36,271 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering
> task and sending final execution state FAILED to JobManager for task
> Window(TumblingEventTimeWindows(300000), EventTimeTrigger,
> CoGroupWindowFunction) c3a09b0645721f8afd02a57d6a24ea39.
>
>
>
> Am I doing something wrong? Or is this a bug? Any ideas appreciated.
>
> Cheers,


Reply | Threaded
Open this post in threaded view
|

Re: RuntimeException with valve output watermark when using CoGroup

Taneli Saastamoinen
On 27 July 2018 at 19:21, Chesnay Schepler <[hidden email]> wrote:
> At first glance this looks like a bug. Is the nothing in the stack trace after the NullPointerException?

Hmm, there is actually, sorry about that:

Caused by: java.lang.NullPointerException
at org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
at org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 22 more

> How reliably can you reproduce this?

100%, though I've only run this job a handful of times. What's frustrating is that I cannot reproduce this easily in a unit test (all my unit tests work fine). On production data it happens every time and pretty much instantly, but our data volumes are big enough that it's difficult to try to dig into it further.

For now I think I'll split the job into two, have the first aggregation write to Kafka and have the second aggregation as a separate job that reads its input from Kafka. When I run the first aggregation only that is fine and no errors occur, so the issue seems to be the combination of aggregations.

Cheers,

Reply | Threaded
Open this post in threaded view
|

Re: RuntimeException with valve output watermark when using CoGroup

Taneli Saastamoinen
To return to this old thing, this was basically user error. The second of the transformations was keying by a field that was sometimes null after the first transformation. (This was supposed to never happen, but then it did happen in production.)

The confusing part is where the exception occurs. The NullPointerException happens because of, and in, the second transformation, but in my example here the stack trace points to the first transformation. Of course Flink doesn't execute the lines literally like that (i.e. there is optimisation going on), so the true location of the error is obscured.

I tried to create a small reproducible example of this but only managed to get a situation where the NullPointerException instead very clearly points to the second transformation. I'm not sure how to reproduce the weirder edition of the error since it seems to depend on the query optimiser, which in turn might depend on data volumes, pojo structures etc.

In any case, errors like this can of course be easily detected and fixed with proper unit tests, whereas I didn't originally have quite full coverage for unexpected partially-null data.

Cheers,



On Mon, 30 Jul 2018 at 10:21, Taneli Saastamoinen <[hidden email]> wrote:
On 27 July 2018 at 19:21, Chesnay Schepler <[hidden email]> wrote:
> At first glance this looks like a bug. Is the nothing in the stack trace after the NullPointerException?

Hmm, there is actually, sorry about that:

Caused by: java.lang.NullPointerException
at org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
at org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 22 more

> How reliably can you reproduce this?

100%, though I've only run this job a handful of times. What's frustrating is that I cannot reproduce this easily in a unit test (all my unit tests work fine). On production data it happens every time and pretty much instantly, but our data volumes are big enough that it's difficult to try to dig into it further.

For now I think I'll split the job into two, have the first aggregation write to Kafka and have the second aggregation as a separate job that reads its input from Kafka. When I run the first aggregation only that is fine and no errors occur, so the issue seems to be the combination of aggregations.

Cheers,

Reply | Threaded
Open this post in threaded view
|

Re: RuntimeException with valve output watermark when using CoGroup

Till Rohrmann
Thanks for the update Taneli. Glad that you solved the problem. If you should find out more about the more obscure case, let us know. Maybe there is something we can still improve to prevent misleading exceptions in the future.

Cheers,
Till

On Tue, Jan 1, 2019 at 3:01 PM Taneli Saastamoinen <[hidden email]> wrote:
To return to this old thing, this was basically user error. The second of the transformations was keying by a field that was sometimes null after the first transformation. (This was supposed to never happen, but then it did happen in production.)

The confusing part is where the exception occurs. The NullPointerException happens because of, and in, the second transformation, but in my example here the stack trace points to the first transformation. Of course Flink doesn't execute the lines literally like that (i.e. there is optimisation going on), so the true location of the error is obscured.

I tried to create a small reproducible example of this but only managed to get a situation where the NullPointerException instead very clearly points to the second transformation. I'm not sure how to reproduce the weirder edition of the error since it seems to depend on the query optimiser, which in turn might depend on data volumes, pojo structures etc.

In any case, errors like this can of course be easily detected and fixed with proper unit tests, whereas I didn't originally have quite full coverage for unexpected partially-null data.

Cheers,



On Mon, 30 Jul 2018 at 10:21, Taneli Saastamoinen <[hidden email]> wrote:
On 27 July 2018 at 19:21, Chesnay Schepler <[hidden email]> wrote:
> At first glance this looks like a bug. Is the nothing in the stack trace after the NullPointerException?

Hmm, there is actually, sorry about that:

Caused by: java.lang.NullPointerException
at org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
at org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 22 more

> How reliably can you reproduce this?

100%, though I've only run this job a handful of times. What's frustrating is that I cannot reproduce this easily in a unit test (all my unit tests work fine). On production data it happens every time and pretty much instantly, but our data volumes are big enough that it's difficult to try to dig into it further.

For now I think I'll split the job into two, have the first aggregation write to Kafka and have the second aggregation as a separate job that reads its input from Kafka. When I run the first aggregation only that is fine and no errors occur, so the issue seems to be the combination of aggregations.

Cheers,