ConcurrentModificationException while accessing managed keyed state

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

ConcurrentModificationException while accessing managed keyed state

Garvit Sharma
Hi,

I have a use case where I am keeping the keyed state in ProcessFunction. 

Key: Integer personId;
/**
 * The data type stored in the state
 */
public class PersonDetails {
    public long count;
    public long lastModified;
}

I have encountered a lot of ConcurrentModificationException. 

I thought Flink processes all the operators on a keyed stream in a single thread. It seems like operators being accessed through multiple threads.

If I get such exception then the data coming from Kafka would be consumed without making an update to the internal state. Making me lose the data.

Please help me in handling the case according to my use case.

Thanks,

--

Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination that makes him master.
Reply | Threaded
Open this post in threaded view
|

Re: ConcurrentModificationException while accessing managed keyed state

gerryzhou

Hi Garvit,

this is unexpected, could you please provide more information about this?

- which flink version are you using?
- what state backend are you using?
- are you using the incremental checkpoint?(in case you used the rocksdb backend)
- did you create the customer thread to operate the state

and the exception log would also definitely help a lot if you could share with us.

Best, Sihua


On 06/02/2018 12:08, [hidden email] wrote:
Hi,

I have a use case where I am keeping the keyed state in ProcessFunction. 

Key: Integer personId;/** * The data type stored in the state */ public class PersonDetails { public long count; public long lastModified; }

I have encountered a lot of ConcurrentModificationException. 

I thought Flink processes all the operators on a keyed stream in a single thread. It seems like operators being accessed through multiple threads.

If I get such exception then the data coming from Kafka would be consumed without making an update to the internal state. Making me lose the data.

Please help me in handling the case according to my use case.

Thanks,

--

Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination that makes him master.
Reply | Threaded
Open this post in threaded view
|

Re: ConcurrentModificationException while accessing managed keyed state

aitozi
In reply to this post by Garvit Sharma
Hi Garvit Sharma,

Flink run with per parallel with a single thread. Can you show a little code
about how you use the keyed state in processFunction

Garvit Sharma wrote

> Hi,
>
> I have a use case where I am keeping the keyed state in ProcessFunction.
>
> Key: Integer personId;
>
> /**
>  * The data type stored in the state
>  */public class PersonDetails {
>     public long count;
>     public long lastModified;}
>
>
> I have encountered a lot of ConcurrentModificationException.
>
> I thought Flink processes all the operators on a keyed stream in a single
> thread. It seems like operators being accessed through multiple threads.
>
> If I get such exception then the data coming from Kafka would be consumed
> without making an update to the internal state. Making me lose the data.
>
> Please help me in handling the case according to my use case.
>
> Thanks,
>
> --
>
> Garvit Sharma
> github.com/garvitlnmiit/
>
> No Body is a Scholar by birth, its only hard work and strong determination
> that makes him master.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: ConcurrentModificationException while accessing managed keyed state

Garvit Sharma
Hi,

Sorry guys for the delay. I was trying to reproduce the complete error on my local machine but could not get it though. I will try again with actual traffic and let you the exception Stacktrace.

For now, I have the following details available to me.

Flink version: 1.4.1
State backend: Heap

I am not creating a custom thread to operate the state.


I will reply back with the Stacktrace soon.

Thanks,



On Sat, Jun 2, 2018 at 6:49 PM, aitozi <[hidden email]> wrote:
Hi Garvit Sharma,

Flink run with per parallel with a single thread. Can you show a little code
about how you use the keyed state in processFunction

Garvit Sharma wrote
> Hi,
>
> I have a use case where I am keeping the keyed state in ProcessFunction.
>
> Key: Integer personId;
>
> /**
>  * The data type stored in the state
>  */public class PersonDetails {
>     public long count;
>     public long lastModified;}
>
>
> I have encountered a lot of ConcurrentModificationException.
>
> I thought Flink processes all the operators on a keyed stream in a single
> thread. It seems like operators being accessed through multiple threads.
>
> If I get such exception then the data coming from Kafka would be consumed
> without making an update to the internal state. Making me lose the data.
>
> Please help me in handling the case according to my use case.
>
> Thanks,
>
> --
>
> Garvit Sharma
> github.com/garvitlnmiit/
>
> No Body is a Scholar by birth, its only hard work and strong determination
> that makes him master.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



--

Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination that makes him master.
Reply | Threaded
Open this post in threaded view
|

Re: ConcurrentModificationException while accessing managed keyed state

aitozi
Hi ,Garvit Sharma

Just read your code snippet, I think it look ok. May need your reproduce
senior and exception stackstrace then.

Best wishes
Aitozi


Garvit Sharma wrote

> Hi,
>
> Sorry guys for the delay. I was trying to reproduce the complete error on
> my local machine but could not get it though. I will try again with actual
> traffic and let you the exception Stacktrace.
>
> For now, I have the following details available to me.
>
> Flink version: 1.4.1
> State backend: Heap
>
> I am not creating a custom thread to operate the state.
>
> Code snippet :
> https://gist.github.com/garvitlnmiit/10db9d4b6eb41135332fba13d908e36c
>
> I will reply back with the Stacktrace soon.
>
> Thanks,
>
>
>
> On Sat, Jun 2, 2018 at 6:49 PM, aitozi &lt;

> gjying1314@

> &gt; wrote:
>
>> Hi Garvit Sharma,
>>
>> Flink run with per parallel with a single thread. Can you show a little
>> code
>> about how you use the keyed state in processFunction
>>
>> Garvit Sharma wrote
>> > Hi,
>> >
>> > I have a use case where I am keeping the keyed state in
>> ProcessFunction.
>> >
>> > Key: Integer personId;
>> >
>> > /**
>> >  * The data type stored in the state
>> >  */public class PersonDetails {
>> >     public long count;
>> >     public long lastModified;}
>> >
>> >
>> > I have encountered a lot of ConcurrentModificationException.
>> >
>> > I thought Flink processes all the operators on a keyed stream in a
>> single
>> > thread. It seems like operators being accessed through multiple
>> threads.
>> >
>> > If I get such exception then the data coming from Kafka would be
>> consumed
>> > without making an update to the internal state. Making me lose the
>> data.
>> >
>> > Please help me in handling the case according to my use case.
>> >
>> > Thanks,
>> >
>> > --
>> >
>> > Garvit Sharma
>> > github.com/garvitlnmiit/
>> >
>> > No Body is a Scholar by birth, its only hard work and strong
>> determination
>> > that makes him master.
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/
>>
>
>
>
> --
>
> Garvit Sharma
> github.com/garvitlnmiit/
>
> No Body is a Scholar by birth, its only hard work and strong determination
> that makes him master.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: ConcurrentModificationException while accessing managed keyed state

gerryzhou
In reply to this post by Garvit Sharma
Hi Garvit,

thanks for you feedback, I see you are using the 1.4.1 with Heap state backend, and there are actually two bugs in 1.4.1 related to the kryo serializer and DefaultOperateStateBackend which may cause the ConcurrentModificationException(when checkpointing), they both have been fixed in 1.5. The related issue is https://issues.apache.org/jira/browse/FLINK-8836 and https://issues.apache.org/jira/browse/FLINK-9263. If your case is only caused by the FLINK-8836 and you still wish to use the 1.4.1, then a work around would be to use the RocksDB backend instead. But if it caused by FLINK-9263, I think the best choice might be to upgrade the Flink version to 1.5.

Hope these could help you.

Best, Sihua






On 06/2/2018 22:40[hidden email] wrote:
Hi,

Sorry guys for the delay. I was trying to reproduce the complete error on my local machine but could not get it though. I will try again with actual traffic and let you the exception Stacktrace.

For now, I have the following details available to me.

Flink version: 1.4.1
State backend: Heap

I am not creating a custom thread to operate the state.


I will reply back with the Stacktrace soon.

Thanks,



On Sat, Jun 2, 2018 at 6:49 PM, aitozi <[hidden email]> wrote:
Hi Garvit Sharma,

Flink run with per parallel with a single thread. Can you show a little code
about how you use the keyed state in processFunction

Garvit Sharma wrote
> Hi,
>
> I have a use case where I am keeping the keyed state in ProcessFunction.
>
> Key: Integer personId;
>
> /**
>  * The data type stored in the state
>  */public class PersonDetails {
>     public long count;
>     public long lastModified;}
>
>
> I have encountered a lot of ConcurrentModificationException.
>
> I thought Flink processes all the operators on a keyed stream in a single
> thread. It seems like operators being accessed through multiple threads.
>
> If I get such exception then the data coming from Kafka would be consumed
> without making an update to the internal state. Making me lose the data.
>
> Please help me in handling the case according to my use case.
>
> Thanks,
>
> --
>
> Garvit Sharma
> github.com/garvitlnmiit/
>
> No Body is a Scholar by birth, its only hard work and strong determination
> that makes him master.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



--

Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination that makes him master.