Hi,
I have a use case where I am keeping the keyed state in ProcessFunction.
I have encountered a lot of ConcurrentModificationException. I thought Flink processes all the operators on a keyed stream in a single thread. It seems like operators being accessed through multiple threads. If I get such exception then the data coming from Kafka would be consumed without making an update to the internal state. Making me lose the data. Please help me in handling the case according to my use case. Thanks, Garvit Sharma github.com/garvitlnmiit/ No Body is a Scholar by birth, its only hard work and strong determination that makes him master. |
Hi Garvit, this is unexpected, could you please provide more information about this? - which flink version are you using? - what state backend are you using? - are you using the incremental checkpoint?(in case you used the rocksdb backend) - did you create the customer thread to operate the state and the exception log would also definitely help a lot if you could share with us. Best, Sihua On 06/02/2018 12:08, [hidden email] wrote: Hi, I have a use case where I am keeping the keyed state in ProcessFunction. Key: Integer personId;/** * The data type stored in the state */ public class PersonDetails { public long count; public long lastModified; } I have encountered a lot of ConcurrentModificationException. I thought Flink processes all the operators on a keyed stream in a single thread. It seems like operators being accessed through multiple threads. If I get such exception then the data coming from Kafka would be consumed without making an update to the internal state. Making me lose the data. Please help me in handling the case according to my use case. Thanks, -- Garvit Sharma github.com/garvitlnmiit/ No Body is a Scholar by birth, its only hard work and strong determination that makes him master. |
In reply to this post by Garvit Sharma
Hi Garvit Sharma,
Flink run with per parallel with a single thread. Can you show a little code about how you use the keyed state in processFunction Garvit Sharma wrote > Hi, > > I have a use case where I am keeping the keyed state in ProcessFunction. > > Key: Integer personId; > > /** > * The data type stored in the state > */public class PersonDetails { > public long count; > public long lastModified;} > > > I have encountered a lot of ConcurrentModificationException. > > I thought Flink processes all the operators on a keyed stream in a single > thread. It seems like operators being accessed through multiple threads. > > If I get such exception then the data coming from Kafka would be consumed > without making an update to the internal state. Making me lose the data. > > Please help me in handling the case according to my use case. > > Thanks, > > -- > > Garvit Sharma > github.com/garvitlnmiit/ > > No Body is a Scholar by birth, its only hard work and strong determination > that makes him master. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Sorry guys for the delay. I was trying to reproduce the complete error on my local machine but could not get it though. I will try again with actual traffic and let you the exception Stacktrace. For now, I have the following details available to me. Flink version: 1.4.1 State backend: Heap I am not creating a custom thread to operate the state. I will reply back with the Stacktrace soon. Thanks, On Sat, Jun 2, 2018 at 6:49 PM, aitozi <[hidden email]> wrote: Hi Garvit Sharma, Garvit Sharma github.com/garvitlnmiit/ No Body is a Scholar by birth, its only hard work and strong determination that makes him master. |
Hi ,Garvit Sharma
Just read your code snippet, I think it look ok. May need your reproduce senior and exception stackstrace then. Best wishes Aitozi Garvit Sharma wrote > Hi, > > Sorry guys for the delay. I was trying to reproduce the complete error on > my local machine but could not get it though. I will try again with actual > traffic and let you the exception Stacktrace. > > For now, I have the following details available to me. > > Flink version: 1.4.1 > State backend: Heap > > I am not creating a custom thread to operate the state. > > Code snippet : > https://gist.github.com/garvitlnmiit/10db9d4b6eb41135332fba13d908e36c > > I will reply back with the Stacktrace soon. > > Thanks, > > > > On Sat, Jun 2, 2018 at 6:49 PM, aitozi < > gjying1314@ > > wrote: > >> Hi Garvit Sharma, >> >> Flink run with per parallel with a single thread. Can you show a little >> code >> about how you use the keyed state in processFunction >> >> Garvit Sharma wrote >> > Hi, >> > >> > I have a use case where I am keeping the keyed state in >> ProcessFunction. >> > >> > Key: Integer personId; >> > >> > /** >> > * The data type stored in the state >> > */public class PersonDetails { >> > public long count; >> > public long lastModified;} >> > >> > >> > I have encountered a lot of ConcurrentModificationException. >> > >> > I thought Flink processes all the operators on a keyed stream in a >> single >> > thread. It seems like operators being accessed through multiple >> threads. >> > >> > If I get such exception then the data coming from Kafka would be >> consumed >> > without making an update to the internal state. Making me lose the >> data. >> > >> > Please help me in handling the case according to my use case. >> > >> > Thanks, >> > >> > -- >> > >> > Garvit Sharma >> > github.com/garvitlnmiit/ >> > >> > No Body is a Scholar by birth, its only hard work and strong >> determination >> > that makes him master. >> >> >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050. >> n4.nabble.com/ >> > > > > -- > > Garvit Sharma > github.com/garvitlnmiit/ > > No Body is a Scholar by birth, its only hard work and strong determination > that makes him master. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Garvit Sharma
Hi Garvit, thanks for you feedback, I see you are using the 1.4.1 with Heap state backend, and there are actually two bugs in 1.4.1 related to the kryo serializer and DefaultOperateStateBackend which may cause the ConcurrentModificationException(when checkpointing), they both have been fixed in 1.5. The related issue is https://issues.apache.org/jira/browse/FLINK-8836 and https://issues.apache.org/jira/browse/FLINK-9263. If your case is only caused by the FLINK-8836 and you still wish to use the 1.4.1, then a work around would be to use the RocksDB backend instead. But if it caused by FLINK-9263, I think the best choice might be to upgrade the Flink version to 1.5. Hope these could help you. Best, Sihua On 06/2/2018 22:40,[hidden email] wrote:
|
Free forum by Nabble | Edit this page |