Multiple env.execute() into one Flink batch job

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Multiple env.execute() into one Flink batch job

bastien dine
Hello,

I need to chain processing in DataSet API, so I am launching severals jobs, with multiple env.execute() :

topology1.define();
env.execute;

topogy2.define();
env.execute;

This is working fine when I am running it within IntellIiJ
But when I am deploying it into my cluster, it only launch the first topology..

Could you please shed some light on this issue?

Regards,
Bastien
Reply | Threaded
Open this post in threaded view
|

Re: Multiple env.execute() into one Flink batch job

Flavio Pompermaier
The problem is that the REST API block on env.execute.
If you want to run your Flink job you have to submit it using the CLI client.
As a workaround we wrote a Spring REST API that to run a job open an SSH connection to the job manager and execute the bin/flink run command..

If you're interested in I can share some code..



On Fri, Nov 23, 2018 at 4:32 PM bastien dine <[hidden email]> wrote:
Hello,

I need to chain processing in DataSet API, so I am launching severals jobs, with multiple env.execute() :

topology1.define();
env.execute;

topogy2.define();
env.execute;

This is working fine when I am running it within IntellIiJ
But when I am deploying it into my cluster, it only launch the first topology..

Could you please shed some light on this issue?

Regards,
Bastien


Reply | Threaded
Open this post in threaded view
|

Re: Multiple env.execute() into one Flink batch job

bastien dine
Oh god, if we have some code with Accumulator after the env.execute(), this will not be executed on the JobManager too ?
Thanks, I would be interested indeed !

------------------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le ven. 23 nov. 2018 à 16:37, Flavio Pompermaier <[hidden email]> a écrit :
The problem is that the REST API block on env.execute.
If you want to run your Flink job you have to submit it using the CLI client.
As a workaround we wrote a Spring REST API that to run a job open an SSH connection to the job manager and execute the bin/flink run command..

If you're interested in I can share some code..



On Fri, Nov 23, 2018 at 4:32 PM bastien dine <[hidden email]> wrote:
Hello,

I need to chain processing in DataSet API, so I am launching severals jobs, with multiple env.execute() :

topology1.define();
env.execute;

topogy2.define();
env.execute;

This is working fine when I am running it within IntellIiJ
But when I am deploying it into my cluster, it only launch the first topology..

Could you please shed some light on this issue?

Regards,
Bastien


Reply | Threaded
Open this post in threaded view
|

Re: Multiple env.execute() into one Flink batch job

Flavio Pompermaier
We solved this issue (of read the value of an accumulator) by calling a REST endpoint after the job end, in order to store the value associated to the accumulator in some database.
This is very awful but I didn't find any better solution..

This is the code that runs the job (of course its not complete but it could help you to get some insight):

You need to import the following Java lib first:
<dependency>
<groupId>org.apache.sshd</groupId>
<artifactId>sshd-core</artifactId>
<version>2.1.0</version>
</dependency>

-------------------- FlinkSshJobRun.java
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.config.keys.FilePasswordProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSshJobRun implements Runnable {
  private static final Logger LOG = LoggerFactory.getLogger(FlinkSshJobRun.class);

  private final Integer sshPort;
  private final String sshUser;
  private final String sshKeyPassword;
  private final String command;
  private final Integer sshTimeoutMillis;
  private final String targetHost;
  private final Charset encoding;

  public FlinkSshJobRun(FlinkProperties flinkProps,
      String jarFile, String entryClass, String argsStr, Integer parallelism)
      throws MalformedURLException {
    final URL jobManagerUrl = new URL(flinkProps.getJobManagerUrl());
    this.targetHost = jobManagerUrl.getHost();
    this.sshUser = flinkProps.getSsh().getUser();
    this.sshPort = flinkProps.getSsh().getPort();
    this.sshKeyPassword = flinkProps.getSsh().getKeyPassword();
    this.sshTimeoutMillis = flinkProps.getSsh().getTimeoutMs();
    String flinkBinClient = flinkProps.getSsh().getFlinkDistDir() + "/bin/flink";
    if (parallelism != null) {
      flinkBinClient += " -p " + parallelism;
    }
    final String flinkUploadDir = flinkProps.getSsh().getFlinkJarUploadDir();
    final Path jarPathOnServer = Paths.get(flinkUploadDir, jarFile);//
    this.command = flinkBinClient + " run -c " + entryClass //
        + " " + jarPathOnServer //
        + " " + (argsStr == null ? "" : argsStr);
    this.encoding = Charset.forName(StandardCharsets.UTF_8.name());
  }

  @Override
  public void run() {
    ClientSession session = null;
    final ByteArrayOutputStream stdErr = new ByteArrayOutputStream();
    final ByteArrayOutputStream stdOut = new ByteArrayOutputStream();
    try (SshClient cl = SshClient.setUpDefaultClient();) {

      cl.setFilePasswordProvider(FilePasswordProvider.EMPTY);
      if (sshKeyPassword != null) {
        cl.setFilePasswordProvider(file -> sshKeyPassword);
      }
      cl.start();
      session = cl.connect(sshUser, targetHost, sshPort)//
          .verify(sshTimeoutMillis)//
          .getSession();
      session.auth().verify(Math.multiplyExact(sshTimeoutMillis, 4));
      LOG.info("Executing SSH: {}@{}:{} -> {}", sshUser, targetHost, sshPort, command);
      session.executeRemoteCommand(command, stdOut, stdErr, encoding);
      LOG.info("SSH successfully executed {}@{}:{} -> {}", sshUser, targetHost, sshPort, command);
      final String stdOutTxt = getOsContentOnException(true, stdOut);
      LOG.info("SSH stdout for {}@{}:{} -> {}\n{}", sshUser, targetHost, sshPort, command,
          stdOutTxt);
    } catch (IOException ex) {
      final String stdOutTxt = getOsContentOnException(true, stdOut);
      final String stdErrTxt = getOsContentOnException(false, stdErr);
      final String errorMsg = String.format(
          "Error during SSH execution  %s@%s:%s -> %s%nSTDOUT:%s%n%nSTDERR:%s", sshUser, targetHost,
          sshPort, command, stdOutTxt, stdErrTxt);
      LOG.error(errorMsg, ex);
    } finally {
      try {
        stdErr.close();
      } catch (IOException ex) {
        LOG.error("Error during STDERR buffer close", ex);
      }
      try {
        stdOut.close();
      } catch (IOException ex) {
        LOG.error("Error during STDOUT buffer close", ex);
      }
      if (session != null) {
        try {
          session.close();
        } catch (IOException ex) {
          LOG.error("Error during SSH session close", ex);
        }
      }
    }
  }

  private String getOsContentOnException(boolean stdOut, final ByteArrayOutputStream os) {
    final String targetOutStream = stdOut ? "STDOUT" : "STDERR";
    String ret = String.format("<Error while retrieving %s>", targetOutStream);
    try {
      ret = os.toString(StandardCharsets.UTF_8.name());
    } catch (UnsupportedEncodingException ex2) {
      LOG.error("Error while reading " + targetOutStream + " after exception", ex2);
    }
    return ret;
  }
}

-------------------FlinkProperties.java

package it.okkam.datalinks.job.api.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "flink", ignoreUnknownFields = true)
public class FlinkProperties {
  private String jobManagerUrl;
  private final Ssh ssh = new Ssh();

  public String getJobManagerUrl() {
    return jobManagerUrl;
  }

  public void setJobManagerUrl(String jobManagerUrl) {
    this.jobManagerUrl = jobManagerUrl;
  }

  public Ssh getSsh() {
    return ssh;
  }

  public static class Async {

    private Integer corePoolSize;
    private Integer maxPoolSize;
    private Integer queueCapacity;

    public Integer getCorePoolSize() {
      return corePoolSize;
    }

    public void setCorePoolSize(final Integer corePoolSize) {
      this.corePoolSize = corePoolSize;
    }

    public Integer getMaxPoolSize() {
      return maxPoolSize;
    }

    public void setMaxPoolSize(final Integer maxPoolSize) {
      this.maxPoolSize = maxPoolSize;
    }

    public Integer getQueueCapacity() {
      return queueCapacity;
    }

    public void setQueueCapacity(final Integer queueCapacity) {
      this.queueCapacity = queueCapacity;
    }
  }

  public static class Ssh {
    private Integer port;
    private Integer timeoutMs;
    private String user;
    private String keyPassword;
    private String flinkDistDir;
    private String flinkJarUploadDir;

    private final Async async = new Async();

    public Async getAsync() {
      return async;
    }

    public Integer getPort() {
      return port;
    }

    public void setPort(Integer port) {
      this.port = port;
    }

    public String getUser() {
      return user;
    }

    public void setUser(String user) {
      this.user = user;
    }

    public Integer getTimeoutMs() {
      return timeoutMs;
    }

    public void setTimeoutMs(Integer timeoutMs) {
      this.timeoutMs = timeoutMs;
    }

    public String getFlinkDistDir() {
      return flinkDistDir;
    }

    public void setFlinkDistDir(String flinkDistDir) {
      this.flinkDistDir = flinkDistDir;
    }

    public String getFlinkJarUploadDir() {
      return flinkJarUploadDir;
    }

    public void setFlinkJarUploadDir(String flinkJarUploadDir) {
      this.flinkJarUploadDir = flinkJarUploadDir;
    }

    public String getKeyPassword() {
      return keyPassword;
    }

    public void setKeyPassword(String keyPassword) {
      this.keyPassword = keyPassword;
    }
  }
}

------------------- Rest service (via Spring boot)

private TaskExecutor taskExecutor;
  @Override
  @GetMapping(METHOD_JOB_RUN + "/{" + PARAMS_JARFILE + ":.+}")
  @ResponseStatus(HttpStatus.OK)
  public JobRun runJob(//
      @PathVariable(name = PARAMS_JARFILE) String jarFile, //
      @RequestParam(name = "entry-class", required = false) String entryClass,
      @RequestParam(name = "program-args", required = false) String argsStr,
      @RequestParam(name = "parallelism", required = false) Integer parallelism)
      throws IOException {

    final long start = System.currentTimeMillis();
    final String jobId = "OK"; //it could be set if Flink REST API would work as expected..
    taskExecutor.execute(new FlinkSshJobRun(flinkProperties, jarFile, entryClass, argsStr, parallelism));    
    final long elapsed = System.currentTimeMillis() - start;
    return new JobRun(elapsed, null, jobId);
  }

On Fri, Nov 23, 2018 at 4:47 PM bastien dine <[hidden email]> wrote:
Oh god, if we have some code with Accumulator after the env.execute(), this will not be executed on the JobManager too ?
Thanks, I would be interested indeed !

------------------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le ven. 23 nov. 2018 à 16:37, Flavio Pompermaier <[hidden email]> a écrit :
The problem is that the REST API block on env.execute.
If you want to run your Flink job you have to submit it using the CLI client.
As a workaround we wrote a Spring REST API that to run a job open an SSH connection to the job manager and execute the bin/flink run command..

If you're interested in I can share some code..



On Fri, Nov 23, 2018 at 4:32 PM bastien dine <[hidden email]> wrote:
Hello,

I need to chain processing in DataSet API, so I am launching severals jobs, with multiple env.execute() :

topology1.define();
env.execute;

topogy2.define();
env.execute;

This is working fine when I am running it within IntellIiJ
But when I am deploying it into my cluster, it only launch the first topology..

Could you please shed some light on this issue?

Regards,
Bastien




--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809