Hi Yun,
Thanks for the help after applying your recommendation, I am getting the same issue aka very long checkpoints and then timeout
Now My guess is maybe the datagen source is pushing the checkpoint via the network to JM is there a way to double check?
IF that is the case is there a way to exclude the source operators from the checkpoints ?
Thanks
Please find the attached logs:
1 I checked the shared folder and it has the shared operator state.
2 I did set the value of fs-memory-threshold to 1kb
This the source of the SQL testing job
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
f_random_str_4 STRING,
f_random_str_3 STRING,
f_random_str_2 STRING,
f_random_str_1 STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
-- optional options --
'rows-per-second'='500000',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='200000000',
'fields.f_random.min'='1',
'fields.f_random.max'='100',
'fields.f_random_str.length'='100000',
'fields.f_random_str_4.length'='100000',
'fields.f_random_str_3.length'='100000',
'fields.f_random_str_2.length'='100000',
'fields.f_random_str_1.length'='100000'
);
---------------------------------------
With more debugging I see this exception stack on the job manager
java.io.IOException: The rpc invocation size 199965215 exceeds the maximum akka framesize.
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79) [flink-dist_2.11-1.11.1.jar:1.11.1]
at com.sun.proxy.$Proxy25.acknowledgeCheckpoint(Unknown Source) [?:?]
at org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder.acknowledgeCheckpoint(RpcCheckpointResponder.java:46) [flink-dist_2.11-1.11.1.jar:1.1
.1[]
at org.apache.flink.runtime.state.TaskStateManagerImpl.reportTaskStateSnapshots(TaskStateManagerImpl.java:117) [flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.reportCompletedSnapshotStates(AsyncCheckpointRunnable.java:160) [flink-dist_2.11-1.11
1.jar:1.11.1[]
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:121) [flink-dist_2.11-1.11.1.jar:1.11.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_172]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_172]
----------------------------------------------
And sometime the JM dies with this OOM
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236) ~[?:1.8.0_172]
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) ~[?:1.8.0_172]
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) ~[?:1.8.0_172]
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) ~[?:1.8.0_172]
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) ~[?:1.8.0_172]
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) ~[?:1.8.0_172]
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) ~[?:1.8.0_172]
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) ~[?:1.8.0_172]
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:324) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:324) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:324) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:324) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:53) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:906) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:906) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:905) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:793) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.remote.EndpointWriter.delegate$1(Endpoint.scala:682) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.remote.EndpointWriter.writeLoop$1(Endpoint.scala:693) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.remote.EndpointWriter.sendBufferedMessages(Endpoint.scala:706) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.remote.EndpointWriter$$anonfun$3.applyOrElse(Endpoint.scala:637) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:458) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.11-1.11.1.jar:1.11.1]