Previously working job fails on Flink 1.2.0

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Previously working job fails on Flink 1.2.0

Steffen Hausmann
Hi there,

I’m having problems running a job on Flink 1.2.0 that successfully
executes on Flink 1.1.3. The job is supposed to read events from a
Kinesis stream and to send outputs to Elasticsearch and it actually
initiates successfully on a Flink 1.2.0 cluster running on YARN, but as
soon as I start to ingest events into the Kinesis stream, the job fails
(see the attachment for more information):

java.lang.RuntimeException: Unexpected key group index. This indicates a
bug.

at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)

at
org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)

at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)

at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)

at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

at java.lang.Thread.run(Thread.java:745)

Any ideas what’s going wrong here? The job executes successfully when
it’s compiled against the Flink 1.1.3 artifacts and run on a Flink 1.1.3
cluster. Does this indicate a bug in my code or is this rather a bug in
Flink? How can I further debug this?

Any guidance is highly appreciated.

Thanks,

Steffen

Reply | Threaded
Open this post in threaded view
|

Re: Previously working job fails on Flink 1.2.0

Stefan Richter
Hi,

Flink 1.2 is partitioning all keys into key-groups, the atomic units for rescaling. This partitioning is done by hash partitioning and is also in sync with the routing of tuples to operator instances (each parallel instance of a keyed operator is responsible for some range of key groups). This exception means that Flink detected a tuple in the state backend of a parallel operator instance that should not be there because, by its key hash, it belongs to a different key-group. Or phrased differently, this tuple belongs to a different parallel operator instance. If this is a Flink bug or user code bug is very hard to tell, the log also does not provide additional insights. I could see this happen in case that your keys are mutable and your code makes some changes to the object that change the hash code. Another question is also: did you migrate your job from Flink 1.1.3 through an old savepoint or did you do a fresh start. Other than that, I can recommend to check your code for mutating of keys. If this fails deterministically, you could also try to set a breakpoint for the line of the exception and take a look if the key that is about to be inserted is somehow special.

Best,
Stefan


> Am 20.02.2017 um 14:32 schrieb Steffen Hausmann <[hidden email]>:
>
> Hi there,
>
> I’m having problems running a job on Flink 1.2.0 that successfully executes on Flink 1.1.3. The job is supposed to read events from a Kinesis stream and to send outputs to Elasticsearch and it actually initiates successfully on a Flink 1.2.0 cluster running on YARN, but as soon as I start to ingest events into the Kinesis stream, the job fails (see the attachment for more information):
>
> java.lang.RuntimeException: Unexpected key group index. This indicates a bug.
>
> at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)
>
> at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
>
> at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)
>
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
>
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Any ideas what’s going wrong here? The job executes successfully when it’s compiled against the Flink 1.1.3 artifacts and run on a Flink 1.1.3 cluster. Does this indicate a bug in my code or is this rather a bug in Flink? How can I further debug this?
>
> Any guidance is highly appreciated.
>
> Thanks,
>
> Steffen
>
> <log>

Reply | Threaded
Open this post in threaded view
|

Re: Previously working job fails on Flink 1.2.0

Steffen Hausmann
Thanks for these pointers, Stefan.

I've started a fresh job and didn't migrate any state from previous
execution. Moreover, all the fields of all the events I'm using are
declared final.

I've set a breakpoint to figure out what event is causing the problem,
and it turns out that Flink starts processing the incoming events for
some time and only when a certain window triggers an exception is
thrown. The specific code that causes the exception is as follows:

> DataStream<IdleDuration> idleDuration = cleanedTrips
>         .keyBy("license")
>         .flatMap(new DetermineIdleDuration())
>         .filter(duration -> duration.avg_idle_duration >= 0 && duration.avg_idle_duration <= 240)
>         .keyBy("location")
>         .timeWindow(Time.minutes(10))
>         .apply((Tuple tuple, TimeWindow window, Iterable<IdleDuration> input, Collector<IdleDuration> out) -> {
>             double[] location = Iterables.get(input, 0).location;
>             double avgDuration = StreamSupport
>                     .stream(input.spliterator(), false)
>                     .mapToDouble(idle -> idle.avg_idle_duration)
>                     .average()
>                     .getAsDouble();
>
>             out.collect(new IdleDuration(location, avgDuration, window.maxTimestamp()));
>         });
If the apply statement is removed, there is no exception during runtime.

The location field that is referenced by the keyBy statement is actually
a double[]. May this cause the problems I'm experiencing?

You can find some more code for additional context in the attached document.

Thanks for looking into this!

Steffen



On 20/02/2017 15:22, Stefan Richter wrote:

> Hi,
>
> Flink 1.2 is partitioning all keys into key-groups, the atomic units for rescaling. This partitioning is done by hash partitioning and is also in sync with the routing of tuples to operator instances (each parallel instance of a keyed operator is responsible for some range of key groups). This exception means that Flink detected a tuple in the state backend of a parallel operator instance that should not be there because, by its key hash, it belongs to a different key-group. Or phrased differently, this tuple belongs to a different parallel operator instance. If this is a Flink bug or user code bug is very hard to tell, the log also does not provide additional insights. I could see this happen in case that your keys are mutable and your code makes some changes to the object that change the hash code. Another question is also: did you migrate your job from Flink 1.1.3 through an old savepoint or did you do a fresh start. Other than that, I can recommend to check your code for mutating of keys. If this fails deterministically, you could also try to set a breakpoint for the line of the exception and take a look if the key that is about to be inserted is somehow special.
>
> Best,
> Stefan
>
>
>> Am 20.02.2017 um 14:32 schrieb Steffen Hausmann <[hidden email]>:
>>
>> Hi there,
>>
>> I’m having problems running a job on Flink 1.2.0 that successfully executes on Flink 1.1.3. The job is supposed to read events from a Kinesis stream and to send outputs to Elasticsearch and it actually initiates successfully on a Flink 1.2.0 cluster running on YARN, but as soon as I start to ingest events into the Kinesis stream, the job fails (see the attachment for more information):
>>
>> java.lang.RuntimeException: Unexpected key group index. This indicates a bug.
>>
>> at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)
>>
>> at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
>>
>> at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)
>>
>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
>>
>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>>
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
>>
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Any ideas what’s going wrong here? The job executes successfully when it’s compiled against the Flink 1.1.3 artifacts and run on a Flink 1.1.3 cluster. Does this indicate a bug in my code or is this rather a bug in Flink? How can I further debug this?
>>
>> Any guidance is highly appreciated.
>>
>> Thanks,
>>
>> Steffen
>>
>> <log>
>

snipplet.java (3K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Previously working job fails on Flink 1.2.0

Stefan Richter
Hi,

if you key is a double[], even if the field is a final double[], it is mutable because the array entries can be mutated and maybe that is what happened? You can check if the following two points are in sync, hash-wise: KeyGroupStreamPartitioner::selectChannels and AbstractKeyedStateBackend::setCurrentKey. The first method basically determines to which parallel operator a tuple is routed in a keyed stream. The second is determining the tuple’s key group for the backend. Both must be in sync w.r.t. their result of the key-group that is determined for the tuple. And this assignment is done based on the hash of the key. Therefore, the hash of the tuple’s key should never change and must be immutable. If you can notice a change in hash code, that change is what breaks your code. I am pretty sure that Flink 1.1.x might just silently accept a mutation of the key, but actually this is arguably incorrect.

Best,
Stefan

> Am 21.02.2017 um 14:51 schrieb Steffen Hausmann <[hidden email]>:
>
> Thanks for these pointers, Stefan.
>
> I've started a fresh job and didn't migrate any state from previous execution. Moreover, all the fields of all the events I'm using are declared final.
>
> I've set a breakpoint to figure out what event is causing the problem, and it turns out that Flink starts processing the incoming events for some time and only when a certain window triggers an exception is thrown. The specific code that causes the exception is as follows:
>
>> DataStream<IdleDuration> idleDuration = cleanedTrips
>>        .keyBy("license")
>>        .flatMap(new DetermineIdleDuration())
>>        .filter(duration -> duration.avg_idle_duration >= 0 && duration.avg_idle_duration <= 240)
>>        .keyBy("location")
>>        .timeWindow(Time.minutes(10))
>>        .apply((Tuple tuple, TimeWindow window, Iterable<IdleDuration> input, Collector<IdleDuration> out) -> {
>>            double[] location = Iterables.get(input, 0).location;
>>            double avgDuration = StreamSupport
>>                    .stream(input.spliterator(), false)
>>                    .mapToDouble(idle -> idle.avg_idle_duration)
>>                    .average()
>>                    .getAsDouble();
>>
>>            out.collect(new IdleDuration(location, avgDuration, window.maxTimestamp()));
>>        });
>
> If the apply statement is removed, there is no exception during runtime.
>
> The location field that is referenced by the keyBy statement is actually a double[]. May this cause the problems I'm experiencing?
>
> You can find some more code for additional context in the attached document.
>
> Thanks for looking into this!
>
> Steffen
>
>
>
> On 20/02/2017 15:22, Stefan Richter wrote:
>> Hi,
>>
>> Flink 1.2 is partitioning all keys into key-groups, the atomic units for rescaling. This partitioning is done by hash partitioning and is also in sync with the routing of tuples to operator instances (each parallel instance of a keyed operator is responsible for some range of key groups). This exception means that Flink detected a tuple in the state backend of a parallel operator instance that should not be there because, by its key hash, it belongs to a different key-group. Or phrased differently, this tuple belongs to a different parallel operator instance. If this is a Flink bug or user code bug is very hard to tell, the log also does not provide additional insights. I could see this happen in case that your keys are mutable and your code makes some changes to the object that change the hash code. Another question is also: did you migrate your job from Flink 1.1.3 through an old savepoint or did you do a fresh start. Other than that, I can recommend to check your code for mutating of keys. If this fails deterministically, you could also try to set a breakpoint for the line of the exception and take a look if the key that is about to be inserted is somehow special.
>>
>> Best,
>> Stefan
>>
>>
>>> Am 20.02.2017 um 14:32 schrieb Steffen Hausmann <[hidden email]>:
>>>
>>> Hi there,
>>>
>>> I’m having problems running a job on Flink 1.2.0 that successfully executes on Flink 1.1.3. The job is supposed to read events from a Kinesis stream and to send outputs to Elasticsearch and it actually initiates successfully on a Flink 1.2.0 cluster running on YARN, but as soon as I start to ingest events into the Kinesis stream, the job fails (see the attachment for more information):
>>>
>>> java.lang.RuntimeException: Unexpected key group index. This indicates a bug.
>>>
>>> at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)
>>>
>>> at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
>>>
>>> at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)
>>>
>>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
>>>
>>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>>>
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
>>>
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Any ideas what’s going wrong here? The job executes successfully when it’s compiled against the Flink 1.1.3 artifacts and run on a Flink 1.1.3 cluster. Does this indicate a bug in my code or is this rather a bug in Flink? How can I further debug this?
>>>
>>> Any guidance is highly appreciated.
>>>
>>> Thanks,
>>>
>>> Steffen
>>>
>>> <log>
>>
> <snipplet.java>

Reply | Threaded
Open this post in threaded view
|

Re: Previously working job fails on Flink 1.2.0

Stephan Ewen
@Steffen

Yes, you can currently not use arrays as keys. There is a check missing that gives you a proper error message for that.

The double[] is hashed on the sender side before sending it. Java's hash over an array does not take its contents into account, but the array's memory address, which makes it a non-deterministic hash.
When the double is re-hashed on the receiver, you get a different hash, which is detected as violating the key groups.

In fact, your program was probably behaving wrong before, but now you get a message for the error...



On Tue, Feb 21, 2017 at 3:14 PM, Stefan Richter <[hidden email]> wrote:
Hi,

if you key is a double[], even if the field is a final double[], it is mutable because the array entries can be mutated and maybe that is what happened? You can check if the following two points are in sync, hash-wise: KeyGroupStreamPartitioner::selectChannels and AbstractKeyedStateBackend::setCurrentKey. The first method basically determines to which parallel operator a tuple is routed in a keyed stream. The second is determining the tuple’s key group for the backend. Both must be in sync w.r.t. their result of the key-group that is determined for the tuple. And this assignment is done based on the hash of the key. Therefore, the hash of the tuple’s key should never change and must be immutable. If you can notice a change in hash code, that change is what breaks your code. I am pretty sure that Flink 1.1.x might just silently accept a mutation of the key, but actually this is arguably incorrect.

Best,
Stefan

> Am 21.02.2017 um 14:51 schrieb Steffen Hausmann <[hidden email]>:
>
> Thanks for these pointers, Stefan.
>
> I've started a fresh job and didn't migrate any state from previous execution. Moreover, all the fields of all the events I'm using are declared final.
>
> I've set a breakpoint to figure out what event is causing the problem, and it turns out that Flink starts processing the incoming events for some time and only when a certain window triggers an exception is thrown. The specific code that causes the exception is as follows:
>
>> DataStream<IdleDuration> idleDuration = cleanedTrips
>>        .keyBy("license")
>>        .flatMap(new DetermineIdleDuration())
>>        .filter(duration -> duration.avg_idle_duration >= 0 && duration.avg_idle_duration <= 240)
>>        .keyBy("location")
>>        .timeWindow(Time.minutes(10))
>>        .apply((Tuple tuple, TimeWindow window, Iterable<IdleDuration> input, Collector<IdleDuration> out) -> {
>>            double[] location = Iterables.get(input, 0).location;
>>            double avgDuration = StreamSupport
>>                    .stream(input.spliterator(), false)
>>                    .mapToDouble(idle -> idle.avg_idle_duration)
>>                    .average()
>>                    .getAsDouble();
>>
>>            out.collect(new IdleDuration(location, avgDuration, window.maxTimestamp()));
>>        });
>
> If the apply statement is removed, there is no exception during runtime.
>
> The location field that is referenced by the keyBy statement is actually a double[]. May this cause the problems I'm experiencing?
>
> You can find some more code for additional context in the attached document.
>
> Thanks for looking into this!
>
> Steffen
>
>
>
> On 20/02/2017 15:22, Stefan Richter wrote:
>> Hi,
>>
>> Flink 1.2 is partitioning all keys into key-groups, the atomic units for rescaling. This partitioning is done by hash partitioning and is also in sync with the routing of tuples to operator instances (each parallel instance of a keyed operator is responsible for some range of key groups). This exception means that Flink detected a tuple in the state backend of a parallel operator instance that should not be there because, by its key hash, it belongs to a different key-group. Or phrased differently, this tuple belongs to a different parallel operator instance. If this is a Flink bug or user code bug is very hard to tell, the log also does not provide additional insights. I could see this happen in case that your keys are mutable and your code makes some changes to the object that change the hash code. Another question is also: did you migrate your job from Flink 1.1.3 through an old savepoint or did you do a fresh start. Other than that, I can recommend to check your code for mutating of keys. If this fails deterministically, you could also try to set a breakpoint for the line of the exception and take a look if the key that is about to be inserted is somehow special.
>>
>> Best,
>> Stefan
>>
>>
>>> Am 20.02.2017 um 14:32 schrieb Steffen Hausmann <[hidden email]>:
>>>
>>> Hi there,
>>>
>>> I’m having problems running a job on Flink 1.2.0 that successfully executes on Flink 1.1.3. The job is supposed to read events from a Kinesis stream and to send outputs to Elasticsearch and it actually initiates successfully on a Flink 1.2.0 cluster running on YARN, but as soon as I start to ingest events into the Kinesis stream, the job fails (see the attachment for more information):
>>>
>>> java.lang.RuntimeException: Unexpected key group index. This indicates a bug.
>>>
>>> at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)
>>>
>>> at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
>>>
>>> at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)
>>>
>>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
>>>
>>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>>>
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
>>>
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Any ideas what’s going wrong here? The job executes successfully when it’s compiled against the Flink 1.1.3 artifacts and run on a Flink 1.1.3 cluster. Does this indicate a bug in my code or is this rather a bug in Flink? How can I further debug this?
>>>
>>> Any guidance is highly appreciated.
>>>
>>> Thanks,
>>>
>>> Steffen
>>>
>>> <log>
>>
> <snipplet.java>


Reply | Threaded
Open this post in threaded view
|

Re: Previously working job fails on Flink 1.2.0

rmetzger0
I've filed a JIRA for the problem: https://issues.apache.org/jira/browse/FLINK-5874

On Tue, Feb 21, 2017 at 4:09 PM, Stephan Ewen <[hidden email]> wrote:
@Steffen

Yes, you can currently not use arrays as keys. There is a check missing that gives you a proper error message for that.

The double[] is hashed on the sender side before sending it. Java's hash over an array does not take its contents into account, but the array's memory address, which makes it a non-deterministic hash.
When the double is re-hashed on the receiver, you get a different hash, which is detected as violating the key groups.

In fact, your program was probably behaving wrong before, but now you get a message for the error...



On Tue, Feb 21, 2017 at 3:14 PM, Stefan Richter <[hidden email]> wrote:
Hi,

if you key is a double[], even if the field is a final double[], it is mutable because the array entries can be mutated and maybe that is what happened? You can check if the following two points are in sync, hash-wise: KeyGroupStreamPartitioner::selectChannels and AbstractKeyedStateBackend::setCurrentKey. The first method basically determines to which parallel operator a tuple is routed in a keyed stream. The second is determining the tuple’s key group for the backend. Both must be in sync w.r.t. their result of the key-group that is determined for the tuple. And this assignment is done based on the hash of the key. Therefore, the hash of the tuple’s key should never change and must be immutable. If you can notice a change in hash code, that change is what breaks your code. I am pretty sure that Flink 1.1.x might just silently accept a mutation of the key, but actually this is arguably incorrect.

Best,
Stefan

> Am 21.02.2017 um 14:51 schrieb Steffen Hausmann <[hidden email]>:
>
> Thanks for these pointers, Stefan.
>
> I've started a fresh job and didn't migrate any state from previous execution. Moreover, all the fields of all the events I'm using are declared final.
>
> I've set a breakpoint to figure out what event is causing the problem, and it turns out that Flink starts processing the incoming events for some time and only when a certain window triggers an exception is thrown. The specific code that causes the exception is as follows:
>
>> DataStream<IdleDuration> idleDuration = cleanedTrips
>>        .keyBy("license")
>>        .flatMap(new DetermineIdleDuration())
>>        .filter(duration -> duration.avg_idle_duration >= 0 && duration.avg_idle_duration <= 240)
>>        .keyBy("location")
>>        .timeWindow(Time.minutes(10))
>>        .apply((Tuple tuple, TimeWindow window, Iterable<IdleDuration> input, Collector<IdleDuration> out) -> {
>>            double[] location = Iterables.get(input, 0).location;
>>            double avgDuration = StreamSupport
>>                    .stream(input.spliterator(), false)
>>                    .mapToDouble(idle -> idle.avg_idle_duration)
>>                    .average()
>>                    .getAsDouble();
>>
>>            out.collect(new IdleDuration(location, avgDuration, window.maxTimestamp()));
>>        });
>
> If the apply statement is removed, there is no exception during runtime.
>
> The location field that is referenced by the keyBy statement is actually a double[]. May this cause the problems I'm experiencing?
>
> You can find some more code for additional context in the attached document.
>
> Thanks for looking into this!
>
> Steffen
>
>
>
> On 20/02/2017 15:22, Stefan Richter wrote:
>> Hi,
>>
>> Flink 1.2 is partitioning all keys into key-groups, the atomic units for rescaling. This partitioning is done by hash partitioning and is also in sync with the routing of tuples to operator instances (each parallel instance of a keyed operator is responsible for some range of key groups). This exception means that Flink detected a tuple in the state backend of a parallel operator instance that should not be there because, by its key hash, it belongs to a different key-group. Or phrased differently, this tuple belongs to a different parallel operator instance. If this is a Flink bug or user code bug is very hard to tell, the log also does not provide additional insights. I could see this happen in case that your keys are mutable and your code makes some changes to the object that change the hash code. Another question is also: did you migrate your job from Flink 1.1.3 through an old savepoint or did you do a fresh start. Other than that, I can recommend to check your code for mutating of keys. If this fails deterministically, you could also try to set a breakpoint for the line of the exception and take a look if the key that is about to be inserted is somehow special.
>>
>> Best,
>> Stefan
>>
>>
>>> Am 20.02.2017 um 14:32 schrieb Steffen Hausmann <[hidden email]>:
>>>
>>> Hi there,
>>>
>>> I’m having problems running a job on Flink 1.2.0 that successfully executes on Flink 1.1.3. The job is supposed to read events from a Kinesis stream and to send outputs to Elasticsearch and it actually initiates successfully on a Flink 1.2.0 cluster running on YARN, but as soon as I start to ingest events into the Kinesis stream, the job fails (see the attachment for more information):
>>>
>>> java.lang.RuntimeException: Unexpected key group index. This indicates a bug.
>>>
>>> at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)
>>>
>>> at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
>>>
>>> at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)
>>>
>>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
>>>
>>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>>>
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
>>>
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Any ideas what’s going wrong here? The job executes successfully when it’s compiled against the Flink 1.1.3 artifacts and run on a Flink 1.1.3 cluster. Does this indicate a bug in my code or is this rather a bug in Flink? How can I further debug this?
>>>
>>> Any guidance is highly appreciated.
>>>
>>> Thanks,
>>>
>>> Steffen
>>>
>>> <log>
>>
> <snipplet.java>



Reply | Threaded
Open this post in threaded view
|

Re: Previously working job fails on Flink 1.2.0

Steffen Hausmann
Thanks Stefan and Stephan for your comments. I changed the type of the field and now the job seems to be running again.

And thanks Robert for filing the Jira!

Cheers,
Steffen


Am 21. Februar 2017 18:36:41 MEZ schrieb Robert Metzger <[hidden email]>:

>I've filed a JIRA for the problem:
>https://issues.apache.org/jira/browse/FLINK-5874
>
>On Tue, Feb 21, 2017 at 4:09 PM, Stephan Ewen <[hidden email]> wrote:
>
>> @Steffen
>>
>> Yes, you can currently not use arrays as keys. There is a check
>missing
>> that gives you a proper error message for that.
>>
>> The double[] is hashed on the sender side before sending it. Java's
>hash
>> over an array does not take its contents into account, but the
>array's
>> memory address, which makes it a non-deterministic hash.
>> When the double is re-hashed on the receiver, you get a different
>hash,
>> which is detected as violating the key groups.
>>
>> In fact, your program was probably behaving wrong before, but now you
>get
>> a message for the error...
>>
>>
>>
>> On Tue, Feb 21, 2017 at 3:14 PM, Stefan Richter <
>> [hidden email]> wrote:
>>
>>> Hi,
>>>
>>> if you key is a double[], even if the field is a final double[], it
>is
>>> mutable because the array entries can be mutated and maybe that is
>what
>>> happened? You can check if the following two points are in sync,
>hash-wise:
>>> KeyGroupStreamPartitioner::selectChannels and
>>> AbstractKeyedStateBackend::setCurrentKey. The first method basically
>>> determines to which parallel operator a tuple is routed in a keyed
>stream.
>>> The second is determining the tuple’s key group for the backend.
>Both must
>>> be in sync w.r.t. their result of the key-group that is determined
>for the
>>> tuple. And this assignment is done based on the hash of the key.
>Therefore,
>>> the hash of the tuple’s key should never change and must be
>immutable. If
>>> you can notice a change in hash code, that change is what breaks
>your code.
>>> I am pretty sure that Flink 1.1.x might just silently accept a
>mutation of
>>> the key, but actually this is arguably incorrect.
>>>
>>> Best,
>>> Stefan
>>>
>>> > Am 21.02.2017 um 14:51 schrieb Steffen Hausmann <
>>> [hidden email]>:
>>> >
>>> > Thanks for these pointers, Stefan.
>>> >
>>> > I've started a fresh job and didn't migrate any state from
>previous
>>> execution. Moreover, all the fields of all the events I'm using are
>>> declared final.
>>> >
>>> > I've set a breakpoint to figure out what event is causing the
>problem,
>>> and it turns out that Flink starts processing the incoming events
>for some
>>> time and only when a certain window triggers an exception is thrown.
>The
>>> specific code that causes the exception is as follows:
>>> >
>>> >> DataStream<IdleDuration> idleDuration = cleanedTrips
>>> >>        .keyBy("license")
>>> >>        .flatMap(new DetermineIdleDuration())
>>> >>        .filter(duration -> duration.avg_idle_duration >= 0 &&
>>> duration.avg_idle_duration <= 240)
>>> >>        .keyBy("location")
>>> >>        .timeWindow(Time.minutes(10))
>>> >>        .apply((Tuple tuple, TimeWindow window,
>Iterable<IdleDuration>
>>> input, Collector<IdleDuration> out) -> {
>>> >>            double[] location = Iterables.get(input, 0).location;
>>> >>            double avgDuration = StreamSupport
>>> >>                    .stream(input.spliterator(), false)
>>> >>                    .mapToDouble(idle -> idle.avg_idle_duration)
>>> >>                    .average()
>>> >>                    .getAsDouble();
>>> >>
>>> >>            out.collect(new IdleDuration(location, avgDuration,
>>> window.maxTimestamp()));
>>> >>        });
>>> >
>>> > If the apply statement is removed, there is no exception during
>runtime.
>>> >
>>> > The location field that is referenced by the keyBy statement is
>>> actually a double[]. May this cause the problems I'm experiencing?
>>> >
>>> > You can find some more code for additional context in the attached
>>> document.
>>> >
>>> > Thanks for looking into this!
>>> >
>>> > Steffen
>>> >
>>> >
>>> >
>>> > On 20/02/2017 15:22, Stefan Richter wrote:
>>> >> Hi,
>>> >>
>>> >> Flink 1.2 is partitioning all keys into key-groups, the atomic
>units
>>> for rescaling. This partitioning is done by hash partitioning and is
>also
>>> in sync with the routing of tuples to operator instances (each
>parallel
>>> instance of a keyed operator is responsible for some range of key
>groups).
>>> This exception means that Flink detected a tuple in the state
>backend of a
>>> parallel operator instance that should not be there because, by its
>key
>>> hash, it belongs to a different key-group. Or phrased differently,
>this
>>> tuple belongs to a different parallel operator instance. If this is
>a Flink
>>> bug or user code bug is very hard to tell, the log also does not
>provide
>>> additional insights. I could see this happen in case that your keys
>are
>>> mutable and your code makes some changes to the object that change
>the hash
>>> code. Another question is also: did you migrate your job from Flink
>1.1.3
>>> through an old savepoint or did you do a fresh start. Other than
>that, I
>>> can recommend to check your code for mutating of keys. If this fails
>>> deterministically, you could also try to set a breakpoint for the
>line of
>>> the exception and take a look if the key that is about to be
>inserted is
>>> somehow special.
>>> >>
>>> >> Best,
>>> >> Stefan
>>> >>
>>> >>
>>> >>> Am 20.02.2017 um 14:32 schrieb Steffen Hausmann <
>>> [hidden email]>:
>>> >>>
>>> >>> Hi there,
>>> >>>
>>> >>> I’m having problems running a job on Flink 1.2.0 that
>successfully
>>> executes on Flink 1.1.3. The job is supposed to read events from a
>Kinesis
>>> stream and to send outputs to Elasticsearch and it actually
>initiates
>>> successfully on a Flink 1.2.0 cluster running on YARN, but as soon
>as I
>>> start to ingest events into the Kinesis stream, the job fails (see
>the
>>> attachment for more information):
>>> >>>
>>> >>> java.lang.RuntimeException: Unexpected key group index. This
>>> indicates a bug.
>>> >>>
>>> >>> at org.apache.flink.runtime.state.heap.StateTable.set(StateTabl
>>> e.java:57)
>>> >>>
>>> >>> at org.apache.flink.runtime.state.heap.HeapListState.add(HeapLi
>>> stState.java:98)
>>> >>>
>>> >>> at org.apache.flink.streaming.runtime.operators.windowing.Windo
>>> wOperator.processElement(WindowOperator.java:372)
>>> >>>
>>> >>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>>> rocessInput(StreamInputProcessor.java:185)
>>> >>>
>>> >>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>>> run(OneInputStreamTask.java:63)
>>> >>>
>>> >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:272)
>>> >>>
>>> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>> >>>
>>> >>> at java.lang.Thread.run(Thread.java:745)
>>> >>>
>>> >>> Any ideas what’s going wrong here? The job executes successfully
>when
>>> it’s compiled against the Flink 1.1.3 artifacts and run on a Flink
>1.1.3
>>> cluster. Does this indicate a bug in my code or is this rather a bug
>in
>>> Flink? How can I further debug this?
>>> >>>
>>> >>> Any guidance is highly appreciated.
>>> >>>
>>> >>> Thanks,
>>> >>>
>>> >>> Steffen
>>> >>>
>>> >>> <log>
>>> >>
>>> > <snipplet.java>
>>>
>>>
>>