I am using Roaring64NavigableMap to compute uv. It is ok to us flink planner and not ok with blink planner. The SQL is as following:
SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, A, B, C, D, The udf is as following: public static class Bitmap extends AggregateFunction<Roaring64NavigableMap, Roaring64NavigableMap> { public static class UV extends ScalarFunction { The error is as following: 2020-04-20 16:37:13,868 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [flink-akka.actor.default-dispatcher-40] - GroupWindowAggregate(groupBy=[brand, platform, channel, versionName, appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand, platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS curTimestamp, brand, platform, channel, versionName, appMajorVersion, uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink: Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING to FAILED. java.lang.ArrayIndexOutOfBoundsException: -1 at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262) at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62) at org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37) at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150) at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117) at org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50) at org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297) at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244) at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138) at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73) at org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:745) Do I need register Roaring64NavigableMap somewhere? Anyone can help me? Thank you. |
You can read this for this type error. I would suggest you set break points in your code. Step through the code, this method should show you which array variable is being passed a null argument when the array variable is not null able. On Mon, 20 Apr 2020, 10:07 刘建刚, <[hidden email]> wrote:
|
Can you reproduce this in a local program with mini-cluster? Best, Kurt On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman <[hidden email]> wrote:
|
Hi, Are you using versions < 1.9.2? From the exception stack, it looks like caused by FLINK-13702, which is already fixed in 1.9.2 and 1.10.0. Could you try it using 1.9.2? Best, Jark On Mon, 20 Apr 2020 at 21:00, Kurt Young <[hidden email]> wrote:
|
Thank you. It is an online job and my input is huge. I check the trace and find that the array is resized when the array is not enough. The code is as below:
public void add (int value) { Only blink planner has this error. Can it be a thread-safe problem or something else? I will try to reproduce it locally.
|
Thanks, once you can reproduce this issue locally, please open a jira with your testing program. Best, Kurt On Tue, Apr 21, 2020 at 8:48 AM 刘建刚 <[hidden email]> wrote:
|
Hi Kurt: I had the same mistake. sql: insert into dw_access_log
select
get_json_value(query_nor, query_nor_counter) as `value`
from
ods_access_log_source
group by
tumble (time_key, interval '1' MINUTE),
group_key get_json_value public class GetJsonValue extends AggregateFunction<String, Map<String, Long>> { Best forideal At 2020-04-21 10:05:05, "Kurt Young" <[hidden email]> wrote:
|
Hi, Just like Jark said, it may be FLINK-13702[1]. Has been fixed in 1.9.2 and later versions. > Can it be a thread-safe problem or something else? Yes, it is a thread-safe problem with lazy materialization. Best, Jingsong Lee On Tue, Apr 21, 2020 at 1:21 PM forideal <[hidden email]> wrote:
Best, Jingsong Lee |
Hi, Sorry for the mistake, [1] is related, but this bug has been fixed totally in [2], so the safe version should be 1.9.3+ and 1.10.1+, so there is no safe released version now. 1.10.1 will been released very soon. On Wed, Apr 22, 2020 at 4:50 PM Jingsong Li <[hidden email]> wrote:
Best, Jingsong Lee |
Thank you very much. It solved my problem.
|
Free forum by Nabble | Edit this page |