BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly

Smile
Hi all,

I'm trying to add mini-batch optimizations for Regular Join
(flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java)
in Blink planner. And there're some test cases that failed, such as
AggregateITCase.testGroupBySingleValue.

After debugging, I found the corresponding heap memory for
BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from [0,0,0,0,0,0,0,0]
to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong key.

However, my mini-batch code doesn't have any low-level operators with
MemorySegment. I only buffered some records (RowData) in a Map just like
AbstractMapBundleOperator did. Object reuse was also disabled by
env.getConfig.disableObjectReuse(). It looks like there's something wrong
when StreamOneInputProcessor.processInput changed the memory segments that
do not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW instead). The
debugging page with more information was attached.

I'm not familiar with org.apache.flink.core.memory.MemorySegment or
sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any ideas
about why it happens or where to check next?

Thank you.

Smile

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png>



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly

Chesnay Schepler
This is a bit concerning. Could you re-run your test with enabled
assertions and/or modify BinaryRowData#assertIndexIsValid to always
throw an error if one of the 2 assertions is not met?

On 5/11/2021 9:37 AM, Smile wrote:

> Hi all,
>
> I'm trying to add mini-batch optimizations for Regular Join
> (flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java)
> in Blink planner. And there're some test cases that failed, such as
> AggregateITCase.testGroupBySingleValue.
>
> After debugging, I found the corresponding heap memory for
> BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from [0,0,0,0,0,0,0,0]
> to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong key.
>
> However, my mini-batch code doesn't have any low-level operators with
> MemorySegment. I only buffered some records (RowData) in a Map just like
> AbstractMapBundleOperator did. Object reuse was also disabled by
> env.getConfig.disableObjectReuse(). It looks like there's something wrong
> when StreamOneInputProcessor.processInput changed the memory segments that
> do not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW instead). The
> debugging page with more information was attached.
>
> I'm not familiar with org.apache.flink.core.memory.MemorySegment or
> sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any ideas
> about why it happens or where to check next?
>
> Thank you.
>
> Smile
>
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly

Smile
Hi Chesnay Schepler,

Thank you for your reply.
I found the problem just now.

My code will modify the key got from KeySelector by updating its RowKind.
Some key selectors such as BinaryRowDataKeySelector returns a copy of a
key[1], but EmptyRowDataKeySelector always returns the same object[2].

The test case AggregateITCase.testGroupBySingleValue with SQL Query "SELECT
* FROM T2 WHERE T2.a < (SELECT count(*) * 0.3 FROM T1)" is indeed a global
join without a key, thus when I perform mykey.setRowKind(RowKind.DELETE),
the object of BinaryRowDataUtil.EMPTY_ROW changed, and all those records
with an empty key got the wrong key.

Should EmptyRowDataKeySelector also returns a copy of
BinaryRowDataUtil.EMPTY_ROW? Otherwise, the key should never be changed
because it may also be used by other records.

Smile

[1].
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/BinaryRowDataKeySelector.java#L49
[2].
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/EmptyRowDataKeySelector.java#L36



Chesnay Schepler wrote

> This is a bit concerning. Could you re-run your test with enabled
> assertions and/or modify BinaryRowData#assertIndexIsValid to always
> throw an error if one of the 2 assertions is not met?
>
> On 5/11/2021 9:37 AM, Smile wrote:
>> Hi all,
>>
>> I'm trying to add mini-batch optimizations for Regular Join
>> (flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java)
>> in Blink planner. And there're some test cases that failed, such as
>> AggregateITCase.testGroupBySingleValue.
>>
>> After debugging, I found the corresponding heap memory for
>> BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from
>> [0,0,0,0,0,0,0,0]
>> to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong key.
>>
>> However, my mini-batch code doesn't have any low-level operators with
>> MemorySegment. I only buffered some records (RowData) in a Map just like
>> AbstractMapBundleOperator did. Object reuse was also disabled by
>> env.getConfig.disableObjectReuse(). It looks like there's something wrong
>> when StreamOneInputProcessor.processInput changed the memory segments
>> that
>> do not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW instead). The
>> debugging page with more information was attached.
>>
>> I'm not familiar with org.apache.flink.core.memory.MemorySegment or
>> sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any
>> ideas
>> about why it happens or where to check next?
>>
>> Thank you.
>>
>> Smile
>>
>> &lt;http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png&gt;
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly

Chesnay Schepler
I'm not so sure myself, so I'm pulling in Timo to help out.

I would think that we shouldn't create copies of EMPTY_ROW because it is
a nice optimization. I think the deeper issue is that, while all other
mutating values check for the arity (implicitly preventing modifications
to EMPTY_ROW), setRowKind() does not. So maybe this case just wasn't
considered. Conceptually it doesn't make sense to me to allow
modifications of the EMPTY_ROW RowKind.

On 5/11/2021 1:09 PM, Smile wrote:

> Hi Chesnay Schepler,
>
> Thank you for your reply.
> I found the problem just now.
>
> My code will modify the key got from KeySelector by updating its RowKind.
> Some key selectors such as BinaryRowDataKeySelector returns a copy of a
> key[1], but EmptyRowDataKeySelector always returns the same object[2].
>
> The test case AggregateITCase.testGroupBySingleValue with SQL Query "SELECT
> * FROM T2 WHERE T2.a < (SELECT count(*) * 0.3 FROM T1)" is indeed a global
> join without a key, thus when I perform mykey.setRowKind(RowKind.DELETE),
> the object of BinaryRowDataUtil.EMPTY_ROW changed, and all those records
> with an empty key got the wrong key.
>
> Should EmptyRowDataKeySelector also returns a copy of
> BinaryRowDataUtil.EMPTY_ROW? Otherwise, the key should never be changed
> because it may also be used by other records.
>
> Smile
>
> [1].
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/BinaryRowDataKeySelector.java#L49
> [2].
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/EmptyRowDataKeySelector.java#L36
>
>
>
> Chesnay Schepler wrote
>> This is a bit concerning. Could you re-run your test with enabled
>> assertions and/or modify BinaryRowData#assertIndexIsValid to always
>> throw an error if one of the 2 assertions is not met?
>>
>> On 5/11/2021 9:37 AM, Smile wrote:
>>> Hi all,
>>>
>>> I'm trying to add mini-batch optimizations for Regular Join
>>> (flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java)
>>> in Blink planner. And there're some test cases that failed, such as
>>> AggregateITCase.testGroupBySingleValue.
>>>
>>> After debugging, I found the corresponding heap memory for
>>> BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from
>>> [0,0,0,0,0,0,0,0]
>>> to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong key.
>>>
>>> However, my mini-batch code doesn't have any low-level operators with
>>> MemorySegment. I only buffered some records (RowData) in a Map just like
>>> AbstractMapBundleOperator did. Object reuse was also disabled by
>>> env.getConfig.disableObjectReuse(). It looks like there's something wrong
>>> when StreamOneInputProcessor.processInput changed the memory segments
>>> that
>>> do not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW instead). The
>>> debugging page with more information was attached.
>>>
>>> I'm not familiar with org.apache.flink.core.memory.MemorySegment or
>>> sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any
>>> ideas
>>> about why it happens or where to check next?
>>>
>>> Thank you.
>>>
>>> Smile
>>>
>>> &lt;http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png&gt;
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly

Chesnay Schepler
/cc Timo

On 5/11/2021 1:25 PM, Chesnay Schepler wrote:

> I'm not so sure myself, so I'm pulling in Timo to help out.
>
> I would think that we shouldn't create copies of EMPTY_ROW because it
> is a nice optimization. I think the deeper issue is that, while all
> other mutating values check for the arity (implicitly preventing
> modifications to EMPTY_ROW), setRowKind() does not. So maybe this case
> just wasn't considered. Conceptually it doesn't make sense to me to
> allow modifications of the EMPTY_ROW RowKind.
>
> On 5/11/2021 1:09 PM, Smile wrote:
>> Hi Chesnay Schepler,
>>
>> Thank you for your reply.
>> I found the problem just now.
>>
>> My code will modify the key got from KeySelector by updating its
>> RowKind.
>> Some key selectors such as BinaryRowDataKeySelector returns a copy of a
>> key[1], but EmptyRowDataKeySelector always returns the same object[2].
>>
>> The test case AggregateITCase.testGroupBySingleValue with SQL Query
>> "SELECT
>> * FROM T2 WHERE T2.a < (SELECT count(*) * 0.3 FROM T1)" is indeed a
>> global
>> join without a key, thus when I perform
>> mykey.setRowKind(RowKind.DELETE),
>> the object of BinaryRowDataUtil.EMPTY_ROW changed, and all those records
>> with an empty key got the wrong key.
>>
>> Should EmptyRowDataKeySelector also returns a copy of
>> BinaryRowDataUtil.EMPTY_ROW? Otherwise, the key should never be changed
>> because it may also be used by other records.
>>
>> Smile
>>
>> [1].
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/BinaryRowDataKeySelector.java#L49 
>>
>> [2].
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/EmptyRowDataKeySelector.java#L36 
>>
>>
>>
>>
>> Chesnay Schepler wrote
>>> This is a bit concerning. Could you re-run your test with enabled
>>> assertions and/or modify BinaryRowData#assertIndexIsValid to always
>>> throw an error if one of the 2 assertions is not met?
>>>
>>> On 5/11/2021 9:37 AM, Smile wrote:
>>>> Hi all,
>>>>
>>>> I'm trying to add mini-batch optimizations for Regular Join
>>>> (flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java)
>>>>
>>>> in Blink planner. And there're some test cases that failed, such as
>>>> AggregateITCase.testGroupBySingleValue.
>>>>
>>>> After debugging, I found the corresponding heap memory for
>>>> BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from
>>>> [0,0,0,0,0,0,0,0]
>>>> to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong
>>>> key.
>>>>
>>>> However, my mini-batch code doesn't have any low-level operators with
>>>> MemorySegment. I only buffered some records (RowData) in a Map just
>>>> like
>>>> AbstractMapBundleOperator did. Object reuse was also disabled by
>>>> env.getConfig.disableObjectReuse(). It looks like there's something
>>>> wrong
>>>> when StreamOneInputProcessor.processInput changed the memory segments
>>>> that
>>>> do not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW
>>>> instead). The
>>>> debugging page with more information was attached.
>>>>
>>>> I'm not familiar with org.apache.flink.core.memory.MemorySegment or
>>>> sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any
>>>> ideas
>>>> about why it happens or where to check next?
>>>>
>>>> Thank you.
>>>>
>>>> Smile
>>>>
>>>> &lt;http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png&gt; 
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>