Executing Flink server From IntelliJ

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Executing Flink server From IntelliJ

Boris Lublinsky
Hi, 
I am trying to upgrade my project from Flink 1.2 to 1.3 and getting problems while trying to run Flink server from my Intellij project. The code

// Execute on the local Flink server - to test queariable state
def executeServer() : Unit = {

// We use a mini cluster here for sake of simplicity, because I don't want
// to require a Flink installation to run this demo. Everything should be
// contained in this JAR.

val port = 6124
val parallelism = 4
val config = new Configuration()
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism)
// In a non MiniCluster setup queryable state is enabled by default.
config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true)

// Create a local Flink server
val flinkCluster = new LocalFlinkMiniCluster(config, false)
try {
// Start server and create environment
flinkCluster.start(true);
val env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", port, parallelism)
// Build Graph
buildGraph(env)
env.execute()
val jobGraph = env.getStreamGraph.getJobGraph
// Submit to the server and wait for completion
flinkCluster.submitJobAndWait(jobGraph, false)
} catch {
case e: Exception => e.printStackTrace()
}
}
Worked on version 1.2, but on 1.3 I am getting

08:41:29,179 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster         - Starting FlinkMiniCluster.
08:41:29,431 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
08:41:29,498 INFO  Remoting                                                      - Starting remoting
08:41:29,730 INFO  Remoting                                                      - Remoting started; listening on addresses :[<a href="akka.tcp://flink@localhost:6124" class="">akka.tcp://flink@localhost:6124]
08:41:29,762 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T/blobStore-4e626961-9155-47e9-b1b8-f835a8435cfc
08:41:29,765 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:54319 - max concurrent requests: 50 - max backlog: 1000
08:41:29,775 INFO  org.apache.flink.runtime.metrics.MetricRegistry               - No metrics reporter configured, no metrics will be exposed/reported.
08:41:29,781 INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist           - Started memory archivist <a href="akka://flink/user/archive" class="">akka://flink/user/archive
08:41:29,786 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Starting JobManager at <a href="akka.tcp://flink@localhost:6124/user/jobmanager" class="">akka.tcp://flink@localhost:6124/user/jobmanager.
08:41:29,787 INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender org.apache.flink.runtime.jobmanager.JobManager@59cd5ef5 @ <a href="akka.tcp://flink@localhost:6124/user/jobmanager" class="">akka.tcp://flink@localhost:6124/user/jobmanager
08:41:29,796 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
08:41:29,804 INFO  Remoting                                                      - Starting remoting
08:41:29,813 INFO  Remoting                                                      - Remoting started; listening on addresses :[<a href="akka.tcp://flink@localhost:54320" class="">akka.tcp://flink@localhost:54320]
08:41:29,825 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
08:41:29,830 INFO  Remoting                                                      - Starting remoting
08:41:29,836 INFO  Remoting                                                      - Remoting started; listening on addresses :[<a href="akka.tcp://flink@localhost:54321" class="">akka.tcp://flink@localhost:54321]
08:41:29,846 INFO  org.apache.flink.runtime.jobmanager.JobManager                - JobManager <a href="akka.tcp://flink@localhost:6124/user/jobmanager" class="">akka.tcp://flink@localhost:6124/user/jobmanager was granted leadership with leader session ID Some(61d3ed9b-1c24-4bbf-99ef-c2a891613473).
08:41:29,847 INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader <a href="akka.tcp://flink@localhost:6124/user/jobmanager" class="">akka.tcp://flink@localhost:6124/user/jobmanager , session=61d3ed9b-1c24-4bbf-99ef-c2a891613473
08:41:29,850 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have a max timeout of 10000 ms
08:41:29,851 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - Received leader address but not running in leader ActorSystem. Cancelling registration.
08:41:29,855 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices     - Temporary file directory '/var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T': total 464 GB, usable 353 GB (76.08% usable)
08:41:30,493 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 363 MB for network buffer pool (number of memory segments: 11634, bytes per segment: 32768).
08:41:30,506 INFO  org.apache.flink.runtime.io.network.NetworkEnvironment        - Starting the network environment and its components.
08:41:30,508 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices     - Limiting managed memory to 1145 MB, memory will be allocated lazily.
08:41:30,512 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T/flink-io-9ec461ff-086d-45cd-b69c-e9890217d8fc for spill files.
08:41:30,514 INFO  org.apache.flink.runtime.metrics.MetricRegistry               - No metrics reporter configured, no metrics will be exposed/reported.
08:41:30,561 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T/flink-dist-cache-4c4e9bcf-5a66-43e7-b2e9-244f310c3c4c
08:41:30,570 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T/flink-dist-cache-687f7d57-33d7-4df3-915f-481008043fef
08:41:30,575 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Starting TaskManager actor at <a href="akka://flink/user/taskmanager#-1401663761" class="">akka://flink/user/taskmanager#-1401663761.
08:41:30,576 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager data connection information: 15d5b91a66be806304e6fe15fde8c0fe @ localhost (dataPort=-1)
08:41:30,576 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager has 4 task slot(s).
08:41:30,578 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 391/838/3641 MB, NON HEAP: 25/26/-1 MB (used/committed/max)]
08:41:30,582 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Trying to register at JobManager <a href="akka.tcp://flink@localhost:6124/user/jobmanager" class="">akka.tcp://flink@localhost:6124/user/jobmanager (attempt 1, timeout: 500 milliseconds)
08:41:30,729 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Task Manager Registration but not connected to ResourceManager
08:41:30,732 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at localhost (<a href="akka.tcp://flink@localhost:54321/user/taskmanager" class="">akka.tcp://flink@localhost:54321/user/taskmanager) as 38047c3fc643910d58ecc414e8233f78. Current number of registered hosts is 1. Current number of alive task slots is 4.
08:41:30,741 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Successful registration at JobManager (<a href="akka.tcp://flink@localhost:6124/user/jobmanager" class="">akka.tcp://flink@localhost:6124/user/jobmanager), starting network stack and library cache.
08:41:30,743 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Determined BLOB server address to be localhost/127.0.0.1:54319. Starting BLOB cache.
08:41:30,745 INFO  org.apache.flink.runtime.blob.BlobCache                       - Created BLOB cache storage directory /var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T/blobStore-122d4c13-34d1-4c01-8a50-f6dfdae8b06b
08:41:30,996 INFO  org.apache.flink.streaming.api.environment.RemoteStreamEnvironment  - Running remotely at localhost:6124
08:41:31,085 INFO  org.apache.flink.client.program.StandaloneClusterClient       - Starting client actor system.
08:41:31,087 INFO  org.apache.flink.runtime.util.LeaderRetrievalUtils            - Trying to select the network interface and address to use by connecting to the leading JobManager.
08:41:31,087 INFO  org.apache.flink.runtime.util.LeaderRetrievalUtils            - TaskManager will try to connect for 10000 milliseconds before falling back to heuristics
08:41:31,088 INFO  org.apache.flink.runtime.net.ConnectionUtils                  - Retrieved new target address localhost/127.0.0.1:6124.
08:41:31,100 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
08:41:31,103 INFO  Remoting                                                      - Starting remoting
08:41:31,108 INFO  Remoting                                                      - Remoting started; listening on addresses :[<a href="akka.tcp://flink@localhost:54324" class="">akka.tcp://flink@localhost:54324]
08:41:31,108 INFO  org.apache.flink.client.program.StandaloneClusterClient       - Submitting job with JobID: 74abb7674b9522ad3a204a1315cf609e. Waiting for job completion.
Submitting job with JobID: 74abb7674b9522ad3a204a1315cf609e. Waiting for job completion.
08:41:31,113 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Disconnect from JobManager null.
08:41:31,116 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Received SubmitJobAndWait(JobGraph(jobId: 74abb7674b9522ad3a204a1315cf609e)) but there is no connection to a JobManager yet.
08:41:31,116 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Received job Flink Streaming Job (74abb7674b9522ad3a204a1315cf609e).
08:41:31,125 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Connect to JobManager Actor[<a href="akka.tcp://flink@localhost:6124/user/jobmanager#-297192771" class="">akka.tcp://flink@localhost:6124/user/jobmanager#-297192771].
08:41:31,126 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Connected to JobManager at Actor[<a href="akka.tcp://flink@localhost:6124/user/jobmanager#-297192771" class="">akka.tcp://flink@localhost:6124/user/jobmanager#-297192771] with leader session id 00000000-0000-0000-0000-000000000000.
Connected to JobManager at Actor[<a href="akka.tcp://flink@localhost:6124/user/jobmanager#-297192771" class="">akka.tcp://flink@localhost:6124/user/jobmanager#-297192771] with leader session id 00000000-0000-0000-0000-000000000000.
08:41:31,126 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Sending message to JobManager <a href="akka.tcp://flink@localhost:6124/user/jobmanager" class="">akka.tcp://flink@localhost:6124/user/jobmanager to submit job Flink Streaming Job (74abb7674b9522ad3a204a1315cf609e) and wait for progress
08:41:31,128 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Upload jar files to job manager <a href="akka.tcp://flink@localhost:6124/user/jobmanager" class="">akka.tcp://flink@localhost:6124/user/jobmanager.
08:41:31,129 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Submit job to the job manager <a href="akka.tcp://flink@localhost:6124/user/jobmanager" class="">akka.tcp://flink@localhost:6124/user/jobmanager.
08:41:31,146 WARN  org.apache.flink.runtime.jobmanager.JobManager                - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: 74abb7674b9522ad3a204a1315cf609e),EXECUTION_RESULT_AND_STATE_CHANGES)) because the expected leader session ID 61d3ed9b-1c24-4bbf-99ef-c2a891613473 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.
08:42:30,381 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Terminate JobClientActor.
08:42:30,382 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Disconnect from JobManager Actor[<a href="akka.tcp://flink@localhost:6124/user/jobmanager#-297192771" class="">akka.tcp://flink@localhost:6124/user/jobmanager#-297192771].
08:42:30,391 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
08:42:30,392 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
08:42:30,411 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434)
at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212)
at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1499)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
at com.lightbend.modelServer.ModelServingKeyedJob$.executeServer(ModelServingKeyedJob.scala:66)
at com.lightbend.modelServer.ModelServingKeyedJob$.main(ModelServingKeyedJob.scala:39)
at com.lightbend.modelServer.ModelServingKeyedJob.main(ModelServingKeyedJob.scala)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
... 10 more
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
08:42:30,424 INFO  org.apache.flink.runtime.blob.BlobCache                       - Shutting down BlobCache
08:42:30,433 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped BLOB server at 0.0.0.0:54319
08:42:30,434 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager removed spill file directory /var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T/flink-io-9ec461ff-086d-45cd-b69c-e9890217d8fc

Process finished with exit code 0

Any help will be appreciated



Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

Reply | Threaded
Open this post in threaded view
|

Re: Executing Flink server From IntelliJ

Chesnay Schepler
Hello,

this problem is described in https://issues.apache.org/jira/browse/FLINK-6689.

Basically, if you want to use the LocalFlinkMiniCluster you should use a TestStreamEnvironment instead.
The RemoteStreamEnvironment only works with a proper Flink cluster.

Regards,
Chesnay

On 14.07.2017 15:43, Boris Lublinsky wrote:
Hi, 
I am trying to upgrade my project from Flink 1.2 to 1.3 and getting problems while trying to run Flink server from my Intellij project. The code

// Execute on the local Flink server - to test queariable state
def executeServer() : Unit = {

  // We use a mini cluster here for sake of simplicity, because I don't want
  // to require a Flink installation to run this demo. Everything should be
  // contained in this JAR.

  val port = 6124
  val parallelism = 4
  val config = new Configuration()
  config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port)
  config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
  config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism)
  // In a non MiniCluster setup queryable state is enabled by default.
  config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true)

  // Create a local Flink server
  val flinkCluster = new LocalFlinkMiniCluster(config, false)
  try {
    // Start server and create environment
    flinkCluster.start(true);
    val env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", port, parallelism)
    // Build Graph
    buildGraph(env)
    env.execute()
    val jobGraph = env.getStreamGraph.getJobGraph
    // Submit to the server and wait for completion
    flinkCluster.submitJobAndWait(jobGraph, false)
  } catch {
    case e: Exception => e.printStackTrace()
  }
}
Worked on version 1.2, but on 1.3 I am getting

08:41:29,179 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster         - Starting FlinkMiniCluster.
08:41:29,431 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
08:41:29,498 INFO  Remoting                                                      - Starting remoting
08:41:29,730 INFO  Remoting                                                      - Remoting started; listening on addresses :[<a href="akka.tcp://flink@localhost:6124" class="" moz-do-not-send="true">akka.tcp://flink@localhost:6124]
08:41:29,762 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T/blobStore-4e626961-9155-47e9-b1b8-f835a8435cfc
08:41:29,765 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:54319 - max concurrent requests: 50 - max backlog: 1000
08:41:29,775 INFO  org.apache.flink.runtime.metrics.MetricRegistry               - No metrics reporter configured, no metrics will be exposed/reported.
08:41:29,781 INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist           - Started memory archivist <a href="akka://flink/user/archive" class="" moz-do-not-send="true">akka://flink/user/archive
08:41:29,786 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Starting JobManager at <a href="akka.tcp://flink@localhost:6124/user/jobmanager" class="" moz-do-not-send="true">akka.tcp://flink@localhost:6124/user/jobmanager.
08:41:29,787 INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender org.apache.flink.runtime.jobmanager.JobManager@59cd5ef5 @ <a href="akka.tcp://flink@localhost:6124/user/jobmanager" class="" moz-do-not-send="true">akka.tcp://flink@localhost:6124/user/jobmanager
08:41:29,796 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
08:41:29,804 INFO  Remoting                                                      - Starting remoting
08:41:29,813 INFO  Remoting                                                      - Remoting started; listening on addresses :[<a href="akka.tcp://flink@localhost:54320" class="" moz-do-not-send="true">akka.tcp://flink@localhost:54320]
08:41:29,825 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
08:41:29,830 INFO  Remoting                                                      - Starting remoting
08:41:29,836 INFO  Remoting                                                      - Remoting started; listening on addresses :[<a href="akka.tcp://flink@localhost:54321" class="" moz-do-not-send="true">akka.tcp://flink@localhost:54321]
08:41:29,846 INFO  org.apache.flink.runtime.jobmanager.JobManager                - JobManager <a href="akka.tcp://flink@localhost:6124/user/jobmanager" class="" moz-do-not-send="true">akka.tcp://flink@localhost:6124/user/jobmanager was granted leadership with leader session ID Some(61d3ed9b-1c24-4bbf-99ef-c2a891613473).
08:41:29,847 INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader <a href="akka.tcp://flink@localhost:6124/user/jobmanager" class="" moz-do-not-send="true">akka.tcp://flink@localhost:6124/user/jobmanager , session=61d3ed9b-1c24-4bbf-99ef-c2a891613473
08:41:29,850 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have a max timeout of 10000 ms
08:41:29,851 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - Received leader address but not running in leader ActorSystem. Cancelling registration.
08:41:29,855 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices     - Temporary file directory '/var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T': total 464 GB, usable 353 GB (76.08% usable)
08:41:30,493 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 363 MB for network buffer pool (number of memory segments: 11634, bytes per segment: 32768).
08:41:30,506 INFO  org.apache.flink.runtime.io.network.NetworkEnvironment        - Starting the network environment and its components.
08:41:30,508 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices     - Limiting managed memory to 1145 MB, memory will be allocated lazily.
08:41:30,512 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T/flink-io-9ec461ff-086d-45cd-b69c-e9890217d8fc for spill files.
08:41:30,514 INFO  org.apache.flink.runtime.metrics.MetricRegistry               - No metrics reporter configured, no metrics will be exposed/reported.
08:41:30,561 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T/flink-dist-cache-4c4e9bcf-5a66-43e7-b2e9-244f310c3c4c
08:41:30,570 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T/flink-dist-cache-687f7d57-33d7-4df3-915f-481008043fef
08:41:30,575 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Starting TaskManager actor at <a href="akka://flink/user/taskmanager#-1401663761" class="" moz-do-not-send="true">akka://flink/user/taskmanager#-1401663761.
08:41:30,576 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager data connection information: 15d5b91a66be806304e6fe15fde8c0fe @ localhost (dataPort=-1)
08:41:30,576 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager has 4 task slot(s).
08:41:30,578 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 391/838/3641 MB, NON HEAP: 25/26/-1 MB (used/committed/max)]
08:41:30,582 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Trying to register at JobManager <a href="akka.tcp://flink@localhost:6124/user/jobmanager" class="" moz-do-not-send="true">akka.tcp://flink@localhost:6124/user/jobmanager (attempt 1, timeout: 500 milliseconds)
08:41:30,729 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Task Manager Registration but not connected to ResourceManager
08:41:30,732 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at localhost (<a href="akka.tcp://flink@localhost:54321/user/taskmanager" class="" moz-do-not-send="true">akka.tcp://flink@localhost:54321/user/taskmanager) as 38047c3fc643910d58ecc414e8233f78. Current number of registered hosts is 1. Current number of alive task slots is 4.
08:41:30,741 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Successful registration at JobManager (<a href="akka.tcp://flink@localhost:6124/user/jobmanager" class="" moz-do-not-send="true">akka.tcp://flink@localhost:6124/user/jobmanager), starting network stack and library cache.
08:41:30,743 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Determined BLOB server address to be localhost/127.0.0.1:54319. Starting BLOB cache.
08:41:30,745 INFO  org.apache.flink.runtime.blob.BlobCache                       - Created BLOB cache storage directory /var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T/blobStore-122d4c13-34d1-4c01-8a50-f6dfdae8b06b
08:41:30,996 INFO  org.apache.flink.streaming.api.environment.RemoteStreamEnvironment  - Running remotely at localhost:6124
08:41:31,085 INFO  org.apache.flink.client.program.StandaloneClusterClient       - Starting client actor system.
08:41:31,087 INFO  org.apache.flink.runtime.util.LeaderRetrievalUtils            - Trying to select the network interface and address to use by connecting to the leading JobManager.
08:41:31,087 INFO  org.apache.flink.runtime.util.LeaderRetrievalUtils            - TaskManager will try to connect for 10000 milliseconds before falling back to heuristics
08:41:31,088 INFO  org.apache.flink.runtime.net.ConnectionUtils                  - Retrieved new target address localhost/127.0.0.1:6124.
08:41:31,100 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
08:41:31,103 INFO  Remoting                                                      - Starting remoting
08:41:31,108 INFO  Remoting                                                      - Remoting started; listening on addresses :[<a href="akka.tcp://flink@localhost:54324" class="" moz-do-not-send="true">akka.tcp://flink@localhost:54324]
08:41:31,108 INFO  org.apache.flink.client.program.StandaloneClusterClient       - Submitting job with JobID: 74abb7674b9522ad3a204a1315cf609e. Waiting for job completion.
Submitting job with JobID: 74abb7674b9522ad3a204a1315cf609e. Waiting for job completion.
08:41:31,113 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Disconnect from JobManager null.
08:41:31,116 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Received SubmitJobAndWait(JobGraph(jobId: 74abb7674b9522ad3a204a1315cf609e)) but there is no connection to a JobManager yet.
08:41:31,116 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Received job Flink Streaming Job (74abb7674b9522ad3a204a1315cf609e).
08:41:31,125 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Connect to JobManager Actor[<a href="akka.tcp://flink@localhost:6124/user/jobmanager#-297192771" class="" moz-do-not-send="true">akka.tcp://flink@localhost:6124/user/jobmanager#-297192771].
08:41:31,126 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Connected to JobManager at Actor[<a href="akka.tcp://flink@localhost:6124/user/jobmanager#-297192771" class="" moz-do-not-send="true">akka.tcp://flink@localhost:6124/user/jobmanager#-297192771] with leader session id 00000000-0000-0000-0000-000000000000.
Connected to JobManager at Actor[<a href="akka.tcp://flink@localhost:6124/user/jobmanager#-297192771" class="" moz-do-not-send="true">akka.tcp://flink@localhost:6124/user/jobmanager#-297192771] with leader session id 00000000-0000-0000-0000-000000000000.
08:41:31,126 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Sending message to JobManager <a href="akka.tcp://flink@localhost:6124/user/jobmanager" class="" moz-do-not-send="true">akka.tcp://flink@localhost:6124/user/jobmanager to submit job Flink Streaming Job (74abb7674b9522ad3a204a1315cf609e) and wait for progress
08:41:31,128 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Upload jar files to job manager <a href="akka.tcp://flink@localhost:6124/user/jobmanager" class="" moz-do-not-send="true">akka.tcp://flink@localhost:6124/user/jobmanager.
08:41:31,129 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Submit job to the job manager <a href="akka.tcp://flink@localhost:6124/user/jobmanager" class="" moz-do-not-send="true">akka.tcp://flink@localhost:6124/user/jobmanager.
08:41:31,146 WARN  org.apache.flink.runtime.jobmanager.JobManager                - Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: 74abb7674b9522ad3a204a1315cf609e),EXECUTION_RESULT_AND_STATE_CHANGES)) because the expected leader session ID 61d3ed9b-1c24-4bbf-99ef-c2a891613473 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.
08:42:30,381 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Terminate JobClientActor.
08:42:30,382 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor      - Disconnect from JobManager Actor[<a href="akka.tcp://flink@localhost:6124/user/jobmanager#-297192771" class="" moz-do-not-send="true">akka.tcp://flink@localhost:6124/user/jobmanager#-297192771].
08:42:30,391 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
08:42:30,392 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
08:42:30,411 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434)
at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212)
at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1499)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
at com.lightbend.modelServer.ModelServingKeyedJob$.executeServer(ModelServingKeyedJob.scala:66)
at com.lightbend.modelServer.ModelServingKeyedJob$.main(ModelServingKeyedJob.scala:39)
at com.lightbend.modelServer.ModelServingKeyedJob.main(ModelServingKeyedJob.scala)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
... 10 more
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
08:42:30,424 INFO  org.apache.flink.runtime.blob.BlobCache                       - Shutting down BlobCache
08:42:30,433 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped BLOB server at 0.0.0.0:54319
08:42:30,434 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager removed spill file directory /var/folders/3m/52z04fgs3hq88mzft9l0fsrm0000gn/T/flink-io-9ec461ff-086d-45cd-b69c-e9890217d8fc

Process finished with exit code 0

Any help will be appreciated



Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/