Hi all,
I'm trying to add mini-batch optimizations for Regular Join (flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java) in Blink planner. And there're some test cases that failed, such as AggregateITCase.testGroupBySingleValue. After debugging, I found the corresponding heap memory for BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from [0,0,0,0,0,0,0,0] to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong key. However, my mini-batch code doesn't have any low-level operators with MemorySegment. I only buffered some records (RowData) in a Map just like AbstractMapBundleOperator did. Object reuse was also disabled by env.getConfig.disableObjectReuse(). It looks like there's something wrong when StreamOneInputProcessor.processInput changed the memory segments that do not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW instead). The debugging page with more information was attached. I'm not familiar with org.apache.flink.core.memory.MemorySegment or sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any ideas about why it happens or where to check next? Thank you. Smile <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
This is a bit concerning. Could you re-run your test with enabled
assertions and/or modify BinaryRowData#assertIndexIsValid to always throw an error if one of the 2 assertions is not met? On 5/11/2021 9:37 AM, Smile wrote: > Hi all, > > I'm trying to add mini-batch optimizations for Regular Join > (flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java) > in Blink planner. And there're some test cases that failed, such as > AggregateITCase.testGroupBySingleValue. > > After debugging, I found the corresponding heap memory for > BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from [0,0,0,0,0,0,0,0] > to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong key. > > However, my mini-batch code doesn't have any low-level operators with > MemorySegment. I only buffered some records (RowData) in a Map just like > AbstractMapBundleOperator did. Object reuse was also disabled by > env.getConfig.disableObjectReuse(). It looks like there's something wrong > when StreamOneInputProcessor.processInput changed the memory segments that > do not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW instead). The > debugging page with more information was attached. > > I'm not familiar with org.apache.flink.core.memory.MemorySegment or > sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any ideas > about why it happens or where to check next? > > Thank you. > > Smile > > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png> > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Chesnay Schepler,
Thank you for your reply. I found the problem just now. My code will modify the key got from KeySelector by updating its RowKind. Some key selectors such as BinaryRowDataKeySelector returns a copy of a key[1], but EmptyRowDataKeySelector always returns the same object[2]. The test case AggregateITCase.testGroupBySingleValue with SQL Query "SELECT * FROM T2 WHERE T2.a < (SELECT count(*) * 0.3 FROM T1)" is indeed a global join without a key, thus when I perform mykey.setRowKind(RowKind.DELETE), the object of BinaryRowDataUtil.EMPTY_ROW changed, and all those records with an empty key got the wrong key. Should EmptyRowDataKeySelector also returns a copy of BinaryRowDataUtil.EMPTY_ROW? Otherwise, the key should never be changed because it may also be used by other records. Smile [1]. https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/BinaryRowDataKeySelector.java#L49 [2]. https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/EmptyRowDataKeySelector.java#L36 Chesnay Schepler wrote > This is a bit concerning. Could you re-run your test with enabled > assertions and/or modify BinaryRowData#assertIndexIsValid to always > throw an error if one of the 2 assertions is not met? > > On 5/11/2021 9:37 AM, Smile wrote: >> Hi all, >> >> I'm trying to add mini-batch optimizations for Regular Join >> (flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java) >> in Blink planner. And there're some test cases that failed, such as >> AggregateITCase.testGroupBySingleValue. >> >> After debugging, I found the corresponding heap memory for >> BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from >> [0,0,0,0,0,0,0,0] >> to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong key. >> >> However, my mini-batch code doesn't have any low-level operators with >> MemorySegment. I only buffered some records (RowData) in a Map just like >> AbstractMapBundleOperator did. Object reuse was also disabled by >> env.getConfig.disableObjectReuse(). It looks like there's something wrong >> when StreamOneInputProcessor.processInput changed the memory segments >> that >> do not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW instead). The >> debugging page with more information was attached. >> >> I'm not familiar with org.apache.flink.core.memory.MemorySegment or >> sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any >> ideas >> about why it happens or where to check next? >> >> Thank you. >> >> Smile >> >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png> >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I'm not so sure myself, so I'm pulling in Timo to help out.
I would think that we shouldn't create copies of EMPTY_ROW because it is a nice optimization. I think the deeper issue is that, while all other mutating values check for the arity (implicitly preventing modifications to EMPTY_ROW), setRowKind() does not. So maybe this case just wasn't considered. Conceptually it doesn't make sense to me to allow modifications of the EMPTY_ROW RowKind. On 5/11/2021 1:09 PM, Smile wrote: > Hi Chesnay Schepler, > > Thank you for your reply. > I found the problem just now. > > My code will modify the key got from KeySelector by updating its RowKind. > Some key selectors such as BinaryRowDataKeySelector returns a copy of a > key[1], but EmptyRowDataKeySelector always returns the same object[2]. > > The test case AggregateITCase.testGroupBySingleValue with SQL Query "SELECT > * FROM T2 WHERE T2.a < (SELECT count(*) * 0.3 FROM T1)" is indeed a global > join without a key, thus when I perform mykey.setRowKind(RowKind.DELETE), > the object of BinaryRowDataUtil.EMPTY_ROW changed, and all those records > with an empty key got the wrong key. > > Should EmptyRowDataKeySelector also returns a copy of > BinaryRowDataUtil.EMPTY_ROW? Otherwise, the key should never be changed > because it may also be used by other records. > > Smile > > [1]. > https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/BinaryRowDataKeySelector.java#L49 > [2]. > https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/EmptyRowDataKeySelector.java#L36 > > > > Chesnay Schepler wrote >> This is a bit concerning. Could you re-run your test with enabled >> assertions and/or modify BinaryRowData#assertIndexIsValid to always >> throw an error if one of the 2 assertions is not met? >> >> On 5/11/2021 9:37 AM, Smile wrote: >>> Hi all, >>> >>> I'm trying to add mini-batch optimizations for Regular Join >>> (flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java) >>> in Blink planner. And there're some test cases that failed, such as >>> AggregateITCase.testGroupBySingleValue. >>> >>> After debugging, I found the corresponding heap memory for >>> BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from >>> [0,0,0,0,0,0,0,0] >>> to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong key. >>> >>> However, my mini-batch code doesn't have any low-level operators with >>> MemorySegment. I only buffered some records (RowData) in a Map just like >>> AbstractMapBundleOperator did. Object reuse was also disabled by >>> env.getConfig.disableObjectReuse(). It looks like there's something wrong >>> when StreamOneInputProcessor.processInput changed the memory segments >>> that >>> do not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW instead). The >>> debugging page with more information was attached. >>> >>> I'm not familiar with org.apache.flink.core.memory.MemorySegment or >>> sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any >>> ideas >>> about why it happens or where to check next? >>> >>> Thank you. >>> >>> Smile >>> >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png> >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
/cc Timo
On 5/11/2021 1:25 PM, Chesnay Schepler wrote: > I'm not so sure myself, so I'm pulling in Timo to help out. > > I would think that we shouldn't create copies of EMPTY_ROW because it > is a nice optimization. I think the deeper issue is that, while all > other mutating values check for the arity (implicitly preventing > modifications to EMPTY_ROW), setRowKind() does not. So maybe this case > just wasn't considered. Conceptually it doesn't make sense to me to > allow modifications of the EMPTY_ROW RowKind. > > On 5/11/2021 1:09 PM, Smile wrote: >> Hi Chesnay Schepler, >> >> Thank you for your reply. >> I found the problem just now. >> >> My code will modify the key got from KeySelector by updating its >> RowKind. >> Some key selectors such as BinaryRowDataKeySelector returns a copy of a >> key[1], but EmptyRowDataKeySelector always returns the same object[2]. >> >> The test case AggregateITCase.testGroupBySingleValue with SQL Query >> "SELECT >> * FROM T2 WHERE T2.a < (SELECT count(*) * 0.3 FROM T1)" is indeed a >> global >> join without a key, thus when I perform >> mykey.setRowKind(RowKind.DELETE), >> the object of BinaryRowDataUtil.EMPTY_ROW changed, and all those records >> with an empty key got the wrong key. >> >> Should EmptyRowDataKeySelector also returns a copy of >> BinaryRowDataUtil.EMPTY_ROW? Otherwise, the key should never be changed >> because it may also be used by other records. >> >> Smile >> >> [1]. >> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/BinaryRowDataKeySelector.java#L49 >> >> [2]. >> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/EmptyRowDataKeySelector.java#L36 >> >> >> >> >> Chesnay Schepler wrote >>> This is a bit concerning. Could you re-run your test with enabled >>> assertions and/or modify BinaryRowData#assertIndexIsValid to always >>> throw an error if one of the 2 assertions is not met? >>> >>> On 5/11/2021 9:37 AM, Smile wrote: >>>> Hi all, >>>> >>>> I'm trying to add mini-batch optimizations for Regular Join >>>> (flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java) >>>> >>>> in Blink planner. And there're some test cases that failed, such as >>>> AggregateITCase.testGroupBySingleValue. >>>> >>>> After debugging, I found the corresponding heap memory for >>>> BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from >>>> [0,0,0,0,0,0,0,0] >>>> to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong >>>> key. >>>> >>>> However, my mini-batch code doesn't have any low-level operators with >>>> MemorySegment. I only buffered some records (RowData) in a Map just >>>> like >>>> AbstractMapBundleOperator did. Object reuse was also disabled by >>>> env.getConfig.disableObjectReuse(). It looks like there's something >>>> wrong >>>> when StreamOneInputProcessor.processInput changed the memory segments >>>> that >>>> do not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW >>>> instead). The >>>> debugging page with more information was attached. >>>> >>>> I'm not familiar with org.apache.flink.core.memory.MemorySegment or >>>> sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any >>>> ideas >>>> about why it happens or where to check next? >>>> >>>> Thank you. >>>> >>>> Smile >>>> >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png> >>>> >>>> >>>> >>>> >>>> -- >>>> Sent from: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > > |
Free forum by Nabble | Edit this page |