multiple jobs in same flink app

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

multiple jobs in same flink app

Qihua Yang
Hi,
Does anyone know how to run multiple jobs in same flink application?
I did a simple test.  First job was started. I did see the log message, but I didn't see the second job was started, even I saw the log message.

public void testJobs() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream1 = env.addSource(new SourceFunction<String>());
stream1.addSink(new FlinkKafkaProducer());
printf("### first job");
env.execute("Job 1");

env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream2 = env.addSource(new SourceFunction<String>());
stream2.addSink(new DiscardingSink<>());
printf("### second job");
    env.execute("Job 2");
}

Here is the log:
### first job
INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job 00000000000000000000000000000000 is submitted.
INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Submitting Job with JobId=00000000000000000000000000000000.
INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received JobGraph submission 00000000000000000000000000000000 (job1).
INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Submitting job 00000000000000000000000000000000 (job1).

INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting execution of job job1 (00000000000000000000000000000000) under job master id b03cde9dc2aebdb39c46cda4c2a94c07.
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job job1 (00000000000000000000000000000000) switched from state CREATED to RUNNING.

### second job
WARN  com.doordash.flink.common.utils.LoggerUtil  - Class - IndexWriter : ### second job
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved ResourceManager address, beginning registration
INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService /leader/00000000000000000000000000000000/job_manager_lock.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering job manager [hidden email]://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job 00000000000000000000000000000000.
INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job 00000000000000000000000000000000 was recovered successfully.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registered job manager [hidden email]://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job 00000000000000000000000000000000.
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager successfully registered at ResourceManager, leader id: 956d4431ca90d45d92c027046cd0404e.
INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and profile ResourceProfile{UNKNOWN} from resource manager.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{UNKNOWN} for job 00000000000000000000000000000000 with allocation id 21134414fc60d4ef3e940609cef960b6.
INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and profile ResourceProfile{UNKNOWN} from resource manager.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{UNKNOWN} for job 00000000000000000000000000000000 with allocation id 650bd9100a35ef5086fd4614f5253b55.
Reply | Threaded
Open this post in threaded view
|

Re: multiple jobs in same flink app

Arvid Heise-4
Hi,

env.execute("Job 1"); is a blocking call. You either have to use executeAsync or use a separate thread to submit the second job. If Job 1 finishes then this would also work by having sequential execution.

However, I think what you actually want to do is to use the same env with 2 topologies and 1 single execute like this.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream1 = env.addSource(new SourceFunction<String>());
stream1.addSink(new FlinkKafkaProducer());
DataStream<String> stream2 = env.addSource(new SourceFunction<String>());
stream2.addSink(new DiscardingSink<>());
env.execute("Job 1+2");

On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <[hidden email]> wrote:
Hi,
Does anyone know how to run multiple jobs in same flink application?
I did a simple test.  First job was started. I did see the log message, but I didn't see the second job was started, even I saw the log message.

public void testJobs() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream1 = env.addSource(new SourceFunction<String>());
stream1.addSink(new FlinkKafkaProducer());
printf("### first job");
env.execute("Job 1");

env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream2 = env.addSource(new SourceFunction<String>());
stream2.addSink(new DiscardingSink<>());
printf("### second job");
    env.execute("Job 2");
}

Here is the log:
### first job
INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job 00000000000000000000000000000000 is submitted.
INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Submitting Job with JobId=00000000000000000000000000000000.
INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received JobGraph submission 00000000000000000000000000000000 (job1).
INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Submitting job 00000000000000000000000000000000 (job1).

INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting execution of job job1 (00000000000000000000000000000000) under job master id b03cde9dc2aebdb39c46cda4c2a94c07.
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job job1 (00000000000000000000000000000000) switched from state CREATED to RUNNING.

### second job
WARN  com.doordash.flink.common.utils.LoggerUtil  - Class - IndexWriter : ### second job
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved ResourceManager address, beginning registration
INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService /leader/00000000000000000000000000000000/job_manager_lock.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering job manager [hidden email]://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job 00000000000000000000000000000000.
INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job 00000000000000000000000000000000 was recovered successfully.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registered job manager [hidden email]://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job 00000000000000000000000000000000.
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager successfully registered at ResourceManager, leader id: 956d4431ca90d45d92c027046cd0404e.
INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and profile ResourceProfile{UNKNOWN} from resource manager.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{UNKNOWN} for job 00000000000000000000000000000000 with allocation id 21134414fc60d4ef3e940609cef960b6.
INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and profile ResourceProfile{UNKNOWN} from resource manager.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{UNKNOWN} for job 00000000000000000000000000000000 with allocation id 650bd9100a35ef5086fd4614f5253b55.
Reply | Threaded
Open this post in threaded view
|

Re: multiple jobs in same flink app

Qihua Yang
Hi,

Thank you for your reply. What I want is flink app has multiple jobs, each job manage a stream. Currently our flink app has only 1 job that manage multiple streams.
I did try env.executeAsyc(), but it still doesn't work. From the log, when the second executeAsync() was called, it shows " Job 00000000000000000000000000000000 was recovered successfully."
Looks like the second executeAsync() recover the first job. Not start a second job.

Thanks,
Qihua


On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <[hidden email]> wrote:
Hi,

env.execute("Job 1"); is a blocking call. You either have to use executeAsync or use a separate thread to submit the second job. If Job 1 finishes then this would also work by having sequential execution.

However, I think what you actually want to do is to use the same env with 2 topologies and 1 single execute like this.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream1 = env.addSource(new SourceFunction<String>());
stream1.addSink(new FlinkKafkaProducer());
DataStream<String> stream2 = env.addSource(new SourceFunction<String>());
stream2.addSink(new DiscardingSink<>());
env.execute("Job 1+2");

On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <[hidden email]> wrote:
Hi,
Does anyone know how to run multiple jobs in same flink application?
I did a simple test.  First job was started. I did see the log message, but I didn't see the second job was started, even I saw the log message.

public void testJobs() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream1 = env.addSource(new SourceFunction<String>());
stream1.addSink(new FlinkKafkaProducer());
printf("### first job");
env.execute("Job 1");

env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream2 = env.addSource(new SourceFunction<String>());
stream2.addSink(new DiscardingSink<>());
printf("### second job");
    env.execute("Job 2");
}

Here is the log:
### first job
INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job 00000000000000000000000000000000 is submitted.
INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Submitting Job with JobId=00000000000000000000000000000000.
INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received JobGraph submission 00000000000000000000000000000000 (job1).
INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Submitting job 00000000000000000000000000000000 (job1).

INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting execution of job job1 (00000000000000000000000000000000) under job master id b03cde9dc2aebdb39c46cda4c2a94c07.
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job job1 (00000000000000000000000000000000) switched from state CREATED to RUNNING.

### second job
WARN  com.doordash.flink.common.utils.LoggerUtil  - Class - IndexWriter : ### second job
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved ResourceManager address, beginning registration
INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService /leader/00000000000000000000000000000000/job_manager_lock.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering job manager [hidden email]://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job 00000000000000000000000000000000.
INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job 00000000000000000000000000000000 was recovered successfully.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registered job manager [hidden email]://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job 00000000000000000000000000000000.
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager successfully registered at ResourceManager, leader id: 956d4431ca90d45d92c027046cd0404e.
INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and profile ResourceProfile{UNKNOWN} from resource manager.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{UNKNOWN} for job 00000000000000000000000000000000 with allocation id 21134414fc60d4ef3e940609cef960b6.
INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and profile ResourceProfile{UNKNOWN} from resource manager.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{UNKNOWN} for job 00000000000000000000000000000000 with allocation id 650bd9100a35ef5086fd4614f5253b55.
Reply | Threaded
Open this post in threaded view
|

Re: multiple jobs in same flink app

Arvid Heise-4
Hi Qihua,

Which execution mode are you using?

On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang <[hidden email]> wrote:
Hi,

Thank you for your reply. What I want is flink app has multiple jobs, each job manage a stream. Currently our flink app has only 1 job that manage multiple streams.
I did try env.executeAsyc(), but it still doesn't work. From the log, when the second executeAsync() was called, it shows " Job 00000000000000000000000000000000 was recovered successfully."
Looks like the second executeAsync() recover the first job. Not start a second job.

Thanks,
Qihua


On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <[hidden email]> wrote:
Hi,

env.execute("Job 1"); is a blocking call. You either have to use executeAsync or use a separate thread to submit the second job. If Job 1 finishes then this would also work by having sequential execution.

However, I think what you actually want to do is to use the same env with 2 topologies and 1 single execute like this.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream1 = env.addSource(new SourceFunction<String>());
stream1.addSink(new FlinkKafkaProducer());
DataStream<String> stream2 = env.addSource(new SourceFunction<String>());
stream2.addSink(new DiscardingSink<>());
env.execute("Job 1+2");

On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <[hidden email]> wrote:
Hi,
Does anyone know how to run multiple jobs in same flink application?
I did a simple test.  First job was started. I did see the log message, but I didn't see the second job was started, even I saw the log message.

public void testJobs() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream1 = env.addSource(new SourceFunction<String>());
stream1.addSink(new FlinkKafkaProducer());
printf("### first job");
env.execute("Job 1");

env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream2 = env.addSource(new SourceFunction<String>());
stream2.addSink(new DiscardingSink<>());
printf("### second job");
    env.execute("Job 2");
}

Here is the log:
### first job
INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job 00000000000000000000000000000000 is submitted.
INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Submitting Job with JobId=00000000000000000000000000000000.
INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received JobGraph submission 00000000000000000000000000000000 (job1).
INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Submitting job 00000000000000000000000000000000 (job1).

INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting execution of job job1 (00000000000000000000000000000000) under job master id b03cde9dc2aebdb39c46cda4c2a94c07.
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job job1 (00000000000000000000000000000000) switched from state CREATED to RUNNING.

### second job
WARN  com.doordash.flink.common.utils.LoggerUtil  - Class - IndexWriter : ### second job
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved ResourceManager address, beginning registration
INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService /leader/00000000000000000000000000000000/job_manager_lock.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering job manager [hidden email]://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job 00000000000000000000000000000000.
INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job 00000000000000000000000000000000 was recovered successfully.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registered job manager [hidden email]://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job 00000000000000000000000000000000.
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager successfully registered at ResourceManager, leader id: 956d4431ca90d45d92c027046cd0404e.
INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and profile ResourceProfile{UNKNOWN} from resource manager.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{UNKNOWN} for job 00000000000000000000000000000000 with allocation id 21134414fc60d4ef3e940609cef960b6.
INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and profile ResourceProfile{UNKNOWN} from resource manager.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{UNKNOWN} for job 00000000000000000000000000000000 with allocation id 650bd9100a35ef5086fd4614f5253b55.
Reply | Threaded
Open this post in threaded view
|

Re: multiple jobs in same flink app

Qihua Yang
Hi,

I am using application mode.

Thanks,
Qihua

On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise <[hidden email]> wrote:
Hi Qihua,

Which execution mode are you using?

On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang <[hidden email]> wrote:
Hi,

Thank you for your reply. What I want is flink app has multiple jobs, each job manage a stream. Currently our flink app has only 1 job that manage multiple streams.
I did try env.executeAsyc(), but it still doesn't work. From the log, when the second executeAsync() was called, it shows " Job 00000000000000000000000000000000 was recovered successfully."
Looks like the second executeAsync() recover the first job. Not start a second job.

Thanks,
Qihua


On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <[hidden email]> wrote:
Hi,

env.execute("Job 1"); is a blocking call. You either have to use executeAsync or use a separate thread to submit the second job. If Job 1 finishes then this would also work by having sequential execution.

However, I think what you actually want to do is to use the same env with 2 topologies and 1 single execute like this.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream1 = env.addSource(new SourceFunction<String>());
stream1.addSink(new FlinkKafkaProducer());
DataStream<String> stream2 = env.addSource(new SourceFunction<String>());
stream2.addSink(new DiscardingSink<>());
env.execute("Job 1+2");

On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <[hidden email]> wrote:
Hi,
Does anyone know how to run multiple jobs in same flink application?
I did a simple test.  First job was started. I did see the log message, but I didn't see the second job was started, even I saw the log message.

public void testJobs() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream1 = env.addSource(new SourceFunction<String>());
stream1.addSink(new FlinkKafkaProducer());
printf("### first job");
env.execute("Job 1");

env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream2 = env.addSource(new SourceFunction<String>());
stream2.addSink(new DiscardingSink<>());
printf("### second job");
    env.execute("Job 2");
}

Here is the log:
### first job
INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job 00000000000000000000000000000000 is submitted.
INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Submitting Job with JobId=00000000000000000000000000000000.
INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received JobGraph submission 00000000000000000000000000000000 (job1).
INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Submitting job 00000000000000000000000000000000 (job1).

INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting execution of job job1 (00000000000000000000000000000000) under job master id b03cde9dc2aebdb39c46cda4c2a94c07.
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job job1 (00000000000000000000000000000000) switched from state CREATED to RUNNING.

### second job
WARN  com.doordash.flink.common.utils.LoggerUtil  - Class - IndexWriter : ### second job
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved ResourceManager address, beginning registration
INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService /leader/00000000000000000000000000000000/job_manager_lock.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering job manager [hidden email]://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job 00000000000000000000000000000000.
INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job 00000000000000000000000000000000 was recovered successfully.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registered job manager [hidden email]://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job 00000000000000000000000000000000.
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager successfully registered at ResourceManager, leader id: 956d4431ca90d45d92c027046cd0404e.
INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and profile ResourceProfile{UNKNOWN} from resource manager.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{UNKNOWN} for job 00000000000000000000000000000000 with allocation id 21134414fc60d4ef3e940609cef960b6.
INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and profile ResourceProfile{UNKNOWN} from resource manager.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{UNKNOWN} for job 00000000000000000000000000000000 with allocation id 650bd9100a35ef5086fd4614f5253b55.