s3 statebackend user state size

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

s3 statebackend user state size

Chen Qin
Hi there,

With S3 as state backend, as well as keeping a large chunk of user state on heap. I can see task manager starts to fail without showing OOM exception. Instead, it shows a generic error message (below) when checkpoint triggered. I assume this has something to do with how state were kept in buffer and flush to s3 when checkpoint triggered. 

Future, to keep large key/value space, wiki point out using rocksdb as backend. My understanding is using rocksdb will write to local file systems instead of sync to s3. Does flink support memory->rocksdb(local disk)->s3 checkpoint state split yet? Or would implement kvstate interface makes flink take care of large state problem?

Chen

java.lang.Exception: The slot in which the task was executed has been released. Probably loss of TaskManager eddbcda03a61f61210063a7cd2148b36 @ 10.163.98.18 - 24 slots - URL: akka.tcp://flink@10.163.98.18:6124/user/taskmanager at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156) at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215) at org.apache.flink.runtime.jobmanager.JobManager$anonfun$handleMessage$1.applyOrElse(JobManager.scala:847) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) at akka.actor.ActorCell.invoke(ActorCell.scala:486) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Reply | Threaded
Open this post in threaded view
|

Re: s3 statebackend user state size

Ufuk Celebi
On Tue, May 10, 2016 at 5:07 PM, Chen Qin <[hidden email]> wrote:
> Future, to keep large key/value space, wiki point out using rocksdb as
> backend. My understanding is using rocksdb will write to local file systems
> instead of sync to s3. Does flink support memory->rocksdb(local disk)->s3
> checkpoint state split yet? Or would implement kvstate interface makes flink
> take care of large state problem?

Hey Chen,

when you use RocksDB, you only need to explicitly configure the file
system checkpoint directory, for which you can use S3:

new RocksDBStateBackend(new URI("s3://..."))

The local disk path are configured via the general Flink temp
directory configuration (see taskmanager.tmp.dirs in
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html,
default is /tmp).

State is written to the local RocksDB instance and the RocksDB files
are copied to S3 on checkpoints.

Does this help?

– Ufuk
Reply | Threaded
Open this post in threaded view
|

Re: s3 statebackend user state size

Chen Qin
Hi Ufuk,

Yes, it does help with Rocksdb backend!
After tune checkpoint frequency align with network throughput, task manager released and job get cancelled are gone.

Chen


> On May 10, 2016, at 10:33 AM, Ufuk Celebi <[hidden email]> wrote:
>
>> On Tue, May 10, 2016 at 5:07 PM, Chen Qin <[hidden email]> wrote:
>> Future, to keep large key/value space, wiki point out using rocksdb as
>> backend. My understanding is using rocksdb will write to local file systems
>> instead of sync to s3. Does flink support memory->rocksdb(local disk)->s3
>> checkpoint state split yet? Or would implement kvstate interface makes flink
>> take care of large state problem?
>
> Hey Chen,
>
> when you use RocksDB, you only need to explicitly configure the file
> system checkpoint directory, for which you can use S3:
>
> new RocksDBStateBackend(new URI("s3://..."))
>
> The local disk path are configured via the general Flink temp
> directory configuration (see taskmanager.tmp.dirs in
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html,
> default is /tmp).
>
> State is written to the local RocksDB instance and the RocksDB files
> are copied to S3 on checkpoints.
>
> Does this help?
>
> – Ufuk