Blink SQL java.lang.ArrayIndexOutOfBoundsException

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Blink SQL java.lang.ArrayIndexOutOfBoundsException

liujiangang
       I am using Roaring64NavigableMap to compute uv. It is ok to us flink planner and not ok with blink planner. The SQL is as following:
SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, A, B, C, D,
E, uv(bitmap(id)) as bmp
FROM person
GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E

      The udf is as following:
public static class Bitmap extends AggregateFunction<Roaring64NavigableMap, Roaring64NavigableMap> {
@Override
public Roaring64NavigableMap createAccumulator() {
return new Roaring64NavigableMap();
}

@Override
public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) {
return accumulator;
}

public void accumulate(Roaring64NavigableMap bitmap, long id) {
bitmap.add(id);
}
}
public static class UV extends ScalarFunction {
public long eval(Roaring64NavigableMap bitmap) {
return bitmap.getLongCardinality();
}
}
      The error is as following:

2020-04-20 16:37:13,868 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph      [flink-akka.actor.default-dispatcher-40]  - GroupWindowAggregate(groupBy=[brand, platform, channel, versionName, appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand, platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS curTimestamp, brand, platform, channel, versionName, appMajorVersion, uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink: Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING to FAILED.
java.lang.ArrayIndexOutOfBoundsException: -1
  at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
  at org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
  at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
  at org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
  at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
  at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
  at java.lang.Thread.run(Thread.java:745)

      Do I need register Roaring64NavigableMap somewhere? Anyone can help me? Thank you.

Reply | Threaded
Open this post in threaded view
|

Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

Zahid Rahman
You can read this for this type error.


I would suggest you set break points  in your code. Step through the code, this  method should show you which array variable is being passed a null argument when the array variable is not null able.




On Mon, 20 Apr 2020, 10:07 刘建刚, <[hidden email]> wrote:
       I am using Roaring64NavigableMap to compute uv. It is ok to us flink planner and not ok with blink planner. The SQL is as following:
SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, A, B, C, D,
E, uv(bitmap(id)) as bmp
FROM person
GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E

      The udf is as following:
public static class Bitmap extends AggregateFunction<Roaring64NavigableMap, Roaring64NavigableMap> {
@Override
public Roaring64NavigableMap createAccumulator() {
return new Roaring64NavigableMap();
}

@Override
public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) {
return accumulator;
}

public void accumulate(Roaring64NavigableMap bitmap, long id) {
bitmap.add(id);
}
}
public static class UV extends ScalarFunction {
public long eval(Roaring64NavigableMap bitmap) {
return bitmap.getLongCardinality();
}
}
      The error is as following:

2020-04-20 16:37:13,868 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph      [flink-akka.actor.default-dispatcher-40]  - GroupWindowAggregate(groupBy=[brand, platform, channel, versionName, appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand, platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS curTimestamp, brand, platform, channel, versionName, appMajorVersion, uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink: Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING to FAILED.
java.lang.ArrayIndexOutOfBoundsException: -1
  at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
  at org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
  at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
  at org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
  at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
  at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
  at java.lang.Thread.run(Thread.java:745)

      Do I need register Roaring64NavigableMap somewhere? Anyone can help me? Thank you.

Reply | Threaded
Open this post in threaded view
|

Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

Kurt Young
Can you reproduce this in a local program with mini-cluster?

Best,
Kurt


On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman <[hidden email]> wrote:
You can read this for this type error.


I would suggest you set break points  in your code. Step through the code, this  method should show you which array variable is being passed a null argument when the array variable is not null able.




On Mon, 20 Apr 2020, 10:07 刘建刚, <[hidden email]> wrote:
       I am using Roaring64NavigableMap to compute uv. It is ok to us flink planner and not ok with blink planner. The SQL is as following:
SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, A, B, C, D,
E, uv(bitmap(id)) as bmp
FROM person
GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E

      The udf is as following:
public static class Bitmap extends AggregateFunction<Roaring64NavigableMap, Roaring64NavigableMap> {
@Override
public Roaring64NavigableMap createAccumulator() {
return new Roaring64NavigableMap();
}

@Override
public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) {
return accumulator;
}

public void accumulate(Roaring64NavigableMap bitmap, long id) {
bitmap.add(id);
}
}
public static class UV extends ScalarFunction {
public long eval(Roaring64NavigableMap bitmap) {
return bitmap.getLongCardinality();
}
}
      The error is as following:

2020-04-20 16:37:13,868 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph      [flink-akka.actor.default-dispatcher-40]  - GroupWindowAggregate(groupBy=[brand, platform, channel, versionName, appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand, platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS curTimestamp, brand, platform, channel, versionName, appMajorVersion, uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink: Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING to FAILED.
java.lang.ArrayIndexOutOfBoundsException: -1
  at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
  at org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
  at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
  at org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
  at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
  at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
  at java.lang.Thread.run(Thread.java:745)

      Do I need register Roaring64NavigableMap somewhere? Anyone can help me? Thank you.

Reply | Threaded
Open this post in threaded view
|

Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

Jark Wu-3
Hi,

Are you using versions < 1.9.2? From the exception stack, it looks like caused by FLINK-13702, which is already fixed in 1.9.2 and 1.10.0.
Could you try it using 1.9.2?

Best,
Jark

On Mon, 20 Apr 2020 at 21:00, Kurt Young <[hidden email]> wrote:
Can you reproduce this in a local program with mini-cluster?

Best,
Kurt


On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman <[hidden email]> wrote:
You can read this for this type error.


I would suggest you set break points  in your code. Step through the code, this  method should show you which array variable is being passed a null argument when the array variable is not null able.




On Mon, 20 Apr 2020, 10:07 刘建刚, <[hidden email]> wrote:
       I am using Roaring64NavigableMap to compute uv. It is ok to us flink planner and not ok with blink planner. The SQL is as following:
SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, A, B, C, D,
E, uv(bitmap(id)) as bmp
FROM person
GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E

      The udf is as following:
public static class Bitmap extends AggregateFunction<Roaring64NavigableMap, Roaring64NavigableMap> {
@Override
public Roaring64NavigableMap createAccumulator() {
return new Roaring64NavigableMap();
}

@Override
public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) {
return accumulator;
}

public void accumulate(Roaring64NavigableMap bitmap, long id) {
bitmap.add(id);
}
}
public static class UV extends ScalarFunction {
public long eval(Roaring64NavigableMap bitmap) {
return bitmap.getLongCardinality();
}
}
      The error is as following:

2020-04-20 16:37:13,868 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph      [flink-akka.actor.default-dispatcher-40]  - GroupWindowAggregate(groupBy=[brand, platform, channel, versionName, appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand, platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS curTimestamp, brand, platform, channel, versionName, appMajorVersion, uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink: Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING to FAILED.
java.lang.ArrayIndexOutOfBoundsException: -1
  at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
  at org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
  at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
  at org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
  at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
  at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
  at java.lang.Thread.run(Thread.java:745)

      Do I need register Roaring64NavigableMap somewhere? Anyone can help me? Thank you.

Reply | Threaded
Open this post in threaded view
|

Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

liujiangang
Thank you. It is an online job and my input is huge. I check the trace and find that the array is resized when the array is not enough. The code is as below:

public void add (int value) {
int[] items = this.items;
if (size == items.length) items = resize(Math.max(8, (int)(size * 1.75f)));
items[size++] = value;
}

Only blink planner has this error. Can it be a thread-safe problem or something else? I will try to reproduce it locally. 

2020年4月21日 上午12:20,Jark Wu-3 [via Apache Flink User Mailing List archive.] <[hidden email]> 写道:

Hi,

Are you using versions < 1.9.2? From the exception stack, it looks like caused by FLINK-13702, which is already fixed in 1.9.2 and 1.10.0.
Could you try it using 1.9.2?

Best,
Jark

On Mon, 20 Apr 2020 at 21:00, Kurt Young <<a href="x-msg://8/user/SendEmail.jtp?type=node&amp;node=34491&amp;i=0" target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:
Can you reproduce this in a local program with mini-cluster?

Best,
Kurt


On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman <<a href="x-msg://8/user/SendEmail.jtp?type=node&amp;node=34491&amp;i=1" target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:
You can read this for this type error.


I would suggest you set break points  in your code. Step through the code, this  method should show you which array variable is being passed a null argument when the array variable is not null able.




On Mon, 20 Apr 2020, 10:07 刘建刚, <<a href="x-msg://8/user/SendEmail.jtp?type=node&amp;node=34491&amp;i=2" target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:
       I am using Roaring64NavigableMap to compute uv. It is ok to us flink planner and not ok with blink planner. The SQL is as following:
SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, A, B, C, D,
E, uv(bitmap(id)) as bmp
FROM person
GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E

      The udf is as following:
public static class Bitmap extends AggregateFunction<Roaring64NavigableMap, Roaring64NavigableMap> {
@Override
public Roaring64NavigableMap createAccumulator() {
return new Roaring64NavigableMap();
}

@Override
public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) {
return accumulator;
}

public void accumulate(Roaring64NavigableMap bitmap, long id) {
bitmap.add(id);
}
}
public static class UV extends ScalarFunction {
public long eval(Roaring64NavigableMap bitmap) {
return bitmap.getLongCardinality();
}
}
      The error is as following:

2020-04-20 16:37:13,868 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph      [flink-akka.actor.default-dispatcher-40]  - GroupWindowAggregate(groupBy=[brand, platform, channel, versionName, appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand, platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS curTimestamp, brand, platform, channel, versionName, appMajorVersion, uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink: Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING to FAILED.
java.lang.ArrayIndexOutOfBoundsException: -1
  at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
  at org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
  at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
  at org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
  at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
  at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
  at java.lang.Thread.run(Thread.java:745)

      Do I need register Roaring64NavigableMap somewhere? Anyone can help me? Thank you.




To start a new topic under Apache Flink User Mailing List archive., email [hidden email] 
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

Kurt Young
Thanks, once you can reproduce this issue locally, please open a jira with your testing program.

Best,
Kurt


On Tue, Apr 21, 2020 at 8:48 AM 刘建刚 <[hidden email]> wrote:
Thank you. It is an online job and my input is huge. I check the trace and find that the array is resized when the array is not enough. The code is as below:

public void add (int value) {
int[] items = this.items;
if (size == items.length) items = resize(Math.max(8, (int)(size * 1.75f)));
items[size++] = value;
}

Only blink planner has this error. Can it be a thread-safe problem or something else? I will try to reproduce it locally. 

2020年4月21日 上午12:20,Jark Wu-3 [via Apache Flink User Mailing List archive.] <[hidden email]> 写道:

Hi,

Are you using versions < 1.9.2? From the exception stack, it looks like caused by FLINK-13702, which is already fixed in 1.9.2 and 1.10.0.
Could you try it using 1.9.2?

Best,
Jark

On Mon, 20 Apr 2020 at 21:00, Kurt Young <[hidden email]> wrote:
Can you reproduce this in a local program with mini-cluster?

Best,
Kurt


On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman <[hidden email]> wrote:
You can read this for this type error.


I would suggest you set break points  in your code. Step through the code, this  method should show you which array variable is being passed a null argument when the array variable is not null able.




On Mon, 20 Apr 2020, 10:07 刘建刚, <[hidden email]> wrote:
       I am using Roaring64NavigableMap to compute uv. It is ok to us flink planner and not ok with blink planner. The SQL is as following:
SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, A, B, C, D,
E, uv(bitmap(id)) as bmp
FROM person
GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E

      The udf is as following:
public static class Bitmap extends AggregateFunction<Roaring64NavigableMap, Roaring64NavigableMap> {
@Override
public Roaring64NavigableMap createAccumulator() {
return new Roaring64NavigableMap();
}

@Override
public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) {
return accumulator;
}

public void accumulate(Roaring64NavigableMap bitmap, long id) {
bitmap.add(id);
}
}
public static class UV extends ScalarFunction {
public long eval(Roaring64NavigableMap bitmap) {
return bitmap.getLongCardinality();
}
}
      The error is as following:

2020-04-20 16:37:13,868 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph      [flink-akka.actor.default-dispatcher-40]  - GroupWindowAggregate(groupBy=[brand, platform, channel, versionName, appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand, platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS curTimestamp, brand, platform, channel, versionName, appMajorVersion, uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink: Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING to FAILED.
java.lang.ArrayIndexOutOfBoundsException: -1
  at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
  at org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
  at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
  at org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
  at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
  at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
  at java.lang.Thread.run(Thread.java:745)

      Do I need register Roaring64NavigableMap somewhere? Anyone can help me? Thank you.




To start a new topic under Apache Flink User Mailing List archive., email [hidden email] 
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re:Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

forideal
Hi Kurt:
    I had the same mistake.
    
   sql:
   insert into
dw_access_log select get_json_value(query_nor, query_nor_counter) as `value` from ods_access_log_source group by tumble (time_key, interval '1' MINUTE),
group_key

get_json_value
public class GetJsonValue extends AggregateFunction<String, Map<String, Long>> {
@Override
public boolean isDeterministic() {
return false;
}

@Override
public Map<String, Long> createAccumulator() {
return new HashMap<>();
}

@Override
public void open(FunctionContext context) throws Exception {

}

public void accumulate(Map<String, Long> datas, String key, long value) {
datas.put(key, value);
}

@Override
public String getValue(Map<String, Long> acc) {
return JSON.toJSONString(acc);
}


@Override
public TypeInformation getResultType() {
return Types.STRING;
}

}

Best forideal




At 2020-04-21 10:05:05, "Kurt Young" <[hidden email]> wrote:

Thanks, once you can reproduce this issue locally, please open a jira with your testing program.

Best,
Kurt


On Tue, Apr 21, 2020 at 8:48 AM 刘建刚 <[hidden email]> wrote:
Thank you. It is an online job and my input is huge. I check the trace and find that the array is resized when the array is not enough. The code is as below:

public void add (int value) {
int[] items = this.items;
if (size == items.length) items = resize(Math.max(8, (int)(size * 1.75f)));
items[size++] = value;
}

Only blink planner has this error. Can it be a thread-safe problem or something else? I will try to reproduce it locally. 

2020年4月21日 上午12:20,Jark Wu-3 [via Apache Flink User Mailing List archive.] <[hidden email]> 写道:

Hi,

Are you using versions < 1.9.2? From the exception stack, it looks like caused by FLINK-13702, which is already fixed in 1.9.2 and 1.10.0.
Could you try it using 1.9.2?

Best,
Jark

On Mon, 20 Apr 2020 at 21:00, Kurt Young <[hidden email]> wrote:
Can you reproduce this in a local program with mini-cluster?

Best,
Kurt


On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman <[hidden email]> wrote:
You can read this for this type error.


I would suggest you set break points  in your code. Step through the code, this  method should show you which array variable is being passed a null argument when the array variable is not null able.




On Mon, 20 Apr 2020, 10:07 刘建刚, <[hidden email]> wrote:
       I am using Roaring64NavigableMap to compute uv. It is ok to us flink planner and not ok with blink planner. The SQL is as following:
SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, A, B, C, D,
E, uv(bitmap(id)) as bmp
FROM person
GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E

      The udf is as following:
public static class Bitmap extends AggregateFunction<Roaring64NavigableMap, Roaring64NavigableMap> {
@Override
public Roaring64NavigableMap createAccumulator() {
return new Roaring64NavigableMap();
}

@Override
public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) {
return accumulator;
}

public void accumulate(Roaring64NavigableMap bitmap, long id) {
bitmap.add(id);
}
}
public static class UV extends ScalarFunction {
public long eval(Roaring64NavigableMap bitmap) {
return bitmap.getLongCardinality();
}
}
      The error is as following:

2020-04-20 16:37:13,868 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph      [flink-akka.actor.default-dispatcher-40]  - GroupWindowAggregate(groupBy=[brand, platform, channel, versionName, appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand, platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS curTimestamp, brand, platform, channel, versionName, appMajorVersion, uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink: Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING to FAILED.
java.lang.ArrayIndexOutOfBoundsException: -1
  at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
  at org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
  at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
  at org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
  at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
  at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
  at java.lang.Thread.run(Thread.java:745)

      Do I need register Roaring64NavigableMap somewhere? Anyone can help me? Thank you.




To start a new topic under Apache Flink User Mailing List archive., email [hidden email] 
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML



 

Reply | Threaded
Open this post in threaded view
|

Re: Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

Jingsong Li
Hi,

Just like Jark said, it may be FLINK-13702[1]. Has been fixed in 1.9.2 and later versions.

> Can it be a thread-safe problem or something else?

Yes, it is a thread-safe problem with lazy materialization.


Best,
Jingsong Lee

On Tue, Apr 21, 2020 at 1:21 PM forideal <[hidden email]> wrote:
Hi Kurt:
    I had the same mistake.
    
   sql:
   insert into
dw_access_log select get_json_value(query_nor, query_nor_counter) as `value` from ods_access_log_source group by tumble (time_key, interval '1' MINUTE),
group_key

get_json_value
public class GetJsonValue extends AggregateFunction<String, Map<String, Long>> {
@Override
public boolean isDeterministic() {
return false;
}

@Override
public Map<String, Long> createAccumulator() {
return new HashMap<>();
}

@Override
public void open(FunctionContext context) throws Exception {

}

public void accumulate(Map<String, Long> datas, String key, long value) {
datas.put(key, value);
}

@Override
public String getValue(Map<String, Long> acc) {
return JSON.toJSONString(acc);
}


@Override
public TypeInformation getResultType() {
return Types.STRING;
}

}

Best forideal




At 2020-04-21 10:05:05, "Kurt Young" <[hidden email]> wrote:

Thanks, once you can reproduce this issue locally, please open a jira with your testing program.

Best,
Kurt


On Tue, Apr 21, 2020 at 8:48 AM 刘建刚 <[hidden email]> wrote:
Thank you. It is an online job and my input is huge. I check the trace and find that the array is resized when the array is not enough. The code is as below:

public void add (int value) {
int[] items = this.items;
if (size == items.length) items = resize(Math.max(8, (int)(size * 1.75f)));
items[size++] = value;
}

Only blink planner has this error. Can it be a thread-safe problem or something else? I will try to reproduce it locally. 

2020年4月21日 上午12:20,Jark Wu-3 [via Apache Flink User Mailing List archive.] <[hidden email]> 写道:

Hi,

Are you using versions < 1.9.2? From the exception stack, it looks like caused by FLINK-13702, which is already fixed in 1.9.2 and 1.10.0.
Could you try it using 1.9.2?

Best,
Jark

On Mon, 20 Apr 2020 at 21:00, Kurt Young <[hidden email]> wrote:
Can you reproduce this in a local program with mini-cluster?

Best,
Kurt


On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman <[hidden email]> wrote:
You can read this for this type error.


I would suggest you set break points  in your code. Step through the code, this  method should show you which array variable is being passed a null argument when the array variable is not null able.




On Mon, 20 Apr 2020, 10:07 刘建刚, <[hidden email]> wrote:
       I am using Roaring64NavigableMap to compute uv. It is ok to us flink planner and not ok with blink planner. The SQL is as following:
SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, A, B, C, D,
E, uv(bitmap(id)) as bmp
FROM person
GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E

      The udf is as following:
public static class Bitmap extends AggregateFunction<Roaring64NavigableMap, Roaring64NavigableMap> {
@Override
public Roaring64NavigableMap createAccumulator() {
return new Roaring64NavigableMap();
}

@Override
public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) {
return accumulator;
}

public void accumulate(Roaring64NavigableMap bitmap, long id) {
bitmap.add(id);
}
}
public static class UV extends ScalarFunction {
public long eval(Roaring64NavigableMap bitmap) {
return bitmap.getLongCardinality();
}
}
      The error is as following:

2020-04-20 16:37:13,868 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph      [flink-akka.actor.default-dispatcher-40]  - GroupWindowAggregate(groupBy=[brand, platform, channel, versionName, appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand, platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS curTimestamp, brand, platform, channel, versionName, appMajorVersion, uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink: Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING to FAILED.
java.lang.ArrayIndexOutOfBoundsException: -1
  at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
  at org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
  at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
  at org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
  at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
  at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
  at java.lang.Thread.run(Thread.java:745)

      Do I need register Roaring64NavigableMap somewhere? Anyone can help me? Thank you.




To start a new topic under Apache Flink User Mailing List archive., email [hidden email] 
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML



 



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

Jingsong Li
Hi,

Sorry for the mistake, [1] is related, but this bug has been fixed totally in [2], so the safe version should be 1.9.3+ and 1.10.1+, so there is no safe released version now.

1.10.1 will been released very soon.


On Wed, Apr 22, 2020 at 4:50 PM Jingsong Li <[hidden email]> wrote:
Hi,

Just like Jark said, it may be FLINK-13702[1]. Has been fixed in 1.9.2 and later versions.

> Can it be a thread-safe problem or something else?

Yes, it is a thread-safe problem with lazy materialization.


Best,
Jingsong Lee

On Tue, Apr 21, 2020 at 1:21 PM forideal <[hidden email]> wrote:
Hi Kurt:
    I had the same mistake.
    
   sql:
   insert into
dw_access_log select get_json_value(query_nor, query_nor_counter) as `value` from ods_access_log_source group by tumble (time_key, interval '1' MINUTE),
group_key

get_json_value
public class GetJsonValue extends AggregateFunction<String, Map<String, Long>> {
@Override
public boolean isDeterministic() {
return false;
}

@Override
public Map<String, Long> createAccumulator() {
return new HashMap<>();
}

@Override
public void open(FunctionContext context) throws Exception {

}

public void accumulate(Map<String, Long> datas, String key, long value) {
datas.put(key, value);
}

@Override
public String getValue(Map<String, Long> acc) {
return JSON.toJSONString(acc);
}


@Override
public TypeInformation getResultType() {
return Types.STRING;
}

}

Best forideal




At 2020-04-21 10:05:05, "Kurt Young" <[hidden email]> wrote:

Thanks, once you can reproduce this issue locally, please open a jira with your testing program.

Best,
Kurt


On Tue, Apr 21, 2020 at 8:48 AM 刘建刚 <[hidden email]> wrote:
Thank you. It is an online job and my input is huge. I check the trace and find that the array is resized when the array is not enough. The code is as below:

public void add (int value) {
int[] items = this.items;
if (size == items.length) items = resize(Math.max(8, (int)(size * 1.75f)));
items[size++] = value;
}

Only blink planner has this error. Can it be a thread-safe problem or something else? I will try to reproduce it locally. 

2020年4月21日 上午12:20,Jark Wu-3 [via Apache Flink User Mailing List archive.] <[hidden email]> 写道:

Hi,

Are you using versions < 1.9.2? From the exception stack, it looks like caused by FLINK-13702, which is already fixed in 1.9.2 and 1.10.0.
Could you try it using 1.9.2?

Best,
Jark

On Mon, 20 Apr 2020 at 21:00, Kurt Young <[hidden email]> wrote:
Can you reproduce this in a local program with mini-cluster?

Best,
Kurt


On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman <[hidden email]> wrote:
You can read this for this type error.


I would suggest you set break points  in your code. Step through the code, this  method should show you which array variable is being passed a null argument when the array variable is not null able.




On Mon, 20 Apr 2020, 10:07 刘建刚, <[hidden email]> wrote:
       I am using Roaring64NavigableMap to compute uv. It is ok to us flink planner and not ok with blink planner. The SQL is as following:
SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, A, B, C, D,
E, uv(bitmap(id)) as bmp
FROM person
GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E

      The udf is as following:
public static class Bitmap extends AggregateFunction<Roaring64NavigableMap, Roaring64NavigableMap> {
@Override
public Roaring64NavigableMap createAccumulator() {
return new Roaring64NavigableMap();
}

@Override
public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) {
return accumulator;
}

public void accumulate(Roaring64NavigableMap bitmap, long id) {
bitmap.add(id);
}
}
public static class UV extends ScalarFunction {
public long eval(Roaring64NavigableMap bitmap) {
return bitmap.getLongCardinality();
}
}
      The error is as following:

2020-04-20 16:37:13,868 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph      [flink-akka.actor.default-dispatcher-40]  - GroupWindowAggregate(groupBy=[brand, platform, channel, versionName, appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand, platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS curTimestamp, brand, platform, channel, versionName, appMajorVersion, uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink: Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING to FAILED.
java.lang.ArrayIndexOutOfBoundsException: -1
  at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
  at org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
  at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
  at org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
  at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
  at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
  at java.lang.Thread.run(Thread.java:745)

      Do I need register Roaring64NavigableMap somewhere? Anyone can help me? Thank you.




To start a new topic under Apache Flink User Mailing List archive., email [hidden email] 
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML



 



--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

liujiangang
      Thank you  very much. It solved my problem.

2020年4月22日 下午5:15,Jingsong Li [via Apache Flink User Mailing List archive.] <[hidden email]> 写道:

Hi,

Sorry for the mistake, [1] is related, but this bug has been fixed totally in [2], so the safe version should be 1.9.3+ and 1.10.1+, so there is no safe released version now.

1.10.1 will been released very soon.


On Wed, Apr 22, 2020 at 4:50 PM Jingsong Li <<a href="x-msg://13/user/SendEmail.jtp?type=node&amp;node=34569&amp;i=0" target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:
Hi,

Just like Jark said, it may be FLINK-13702[1]. Has been fixed in 1.9.2 and later versions.

> Can it be a thread-safe problem or something else?

Yes, it is a thread-safe problem with lazy materialization.


Best,
Jingsong Lee

On Tue, Apr 21, 2020 at 1:21 PM forideal <<a href="x-msg://13/user/SendEmail.jtp?type=node&amp;node=34569&amp;i=1" target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:
Hi Kurt:
    I had the same mistake.
    
   sql:
   insert into
dw_access_log select get_json_value(query_nor, query_nor_counter) as `value` from ods_access_log_source group by tumble (time_key, interval '1' MINUTE),
group_key

get_json_value
public class GetJsonValue extends AggregateFunction<String, Map<String, Long>> {
@Override
public boolean isDeterministic() {
return false;
}

@Override
public Map<String, Long> createAccumulator() {
return new HashMap<>();
}

@Override
public void open(FunctionContext context) throws Exception {

}

public void accumulate(Map<String, Long> datas, String key, long value) {
datas.put(key, value);
}

@Override
public String getValue(Map<String, Long> acc) {
return JSON.toJSONString(acc);
}


@Override
public TypeInformation getResultType() {
return Types.STRING;
}

}

Best forideal



At 2020-04-21 10:05:05, "Kurt Young" <<a href="x-msg://13/user/SendEmail.jtp?type=node&amp;node=34569&amp;i=2" target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:

Thanks, once you can reproduce this issue locally, please open a jira with your testing program.

Best,
Kurt


On Tue, Apr 21, 2020 at 8:48 AM 刘建刚 <<a href="x-msg://13/user/SendEmail.jtp?type=node&amp;node=34569&amp;i=3" target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:
Thank you. It is an online job and my input is huge. I check the trace and find that the array is resized when the array is not enough. The code is as below:

public void add (int value) {
int[] items = this.items;
if (size == items.length) items = resize(Math.max(8, (int)(size * 1.75f)));
items[size++] = value;
}

Only blink planner has this error. Can it be a thread-safe problem or something else? I will try to reproduce it locally. 

2020年4月21日 上午12:20,Jark Wu-3 [via Apache Flink User Mailing List archive.] <<a href="x-msg://13/user/SendEmail.jtp?type=node&amp;node=34569&amp;i=4" target="_top" rel="nofollow" link="external" class="">[hidden email]> 写道:

Hi,

Are you using versions < 1.9.2? From the exception stack, it looks like caused by FLINK-13702, which is already fixed in 1.9.2 and 1.10.0.
Could you try it using 1.9.2?

Best,
Jark

On Mon, 20 Apr 2020 at 21:00, Kurt Young <[hidden email]> wrote:
Can you reproduce this in a local program with mini-cluster?

Best,
Kurt


On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman <[hidden email]> wrote:
You can read this for this type error.


I would suggest you set break points  in your code. Step through the code, this  method should show you which array variable is being passed a null argument when the array variable is not null able.




On Mon, 20 Apr 2020, 10:07 刘建刚, <[hidden email]> wrote:
       I am using Roaring64NavigableMap to compute uv. It is ok to us flink planner and not ok with blink planner. The SQL is as following:
SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, A, B, C, D,
E, uv(bitmap(id)) as bmp
FROM person
GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E

      The udf is as following:
public static class Bitmap extends AggregateFunction<Roaring64NavigableMap, Roaring64NavigableMap> {
@Override
public Roaring64NavigableMap createAccumulator() {
return new Roaring64NavigableMap();
}

@Override
public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) {
return accumulator;
}

public void accumulate(Roaring64NavigableMap bitmap, long id) {
bitmap.add(id);
}
}
public static class UV extends ScalarFunction {
public long eval(Roaring64NavigableMap bitmap) {
return bitmap.getLongCardinality();
}
}
      The error is as following:

2020-04-20 16:37:13,868 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph      [flink-akka.actor.default-dispatcher-40]  - GroupWindowAggregate(groupBy=[brand, platform, channel, versionName, appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand, platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS curTimestamp, brand, platform, channel, versionName, appMajorVersion, uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink: Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING to FAILED.
java.lang.ArrayIndexOutOfBoundsException: -1
  at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
  at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
  at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
  at org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
  at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
  at org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
  at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
  at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
  at java.lang.Thread.run(Thread.java:745)

      Do I need register Roaring64NavigableMap somewhere? Anyone can help me? Thank you.




To start a new topic under Apache Flink User Mailing List archive., email <a href="x-msg://13/user/SendEmail.jtp?type=node&amp;node=34569&amp;i=5" target="_top" rel="nofollow" link="external" class="">[hidden email] 
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML



 


-- 
Best, Jingsong Lee


-- 
Best, Jingsong Lee



To start a new topic under Apache Flink User Mailing List archive., email [hidden email] 
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML