Hello, I need to chain processing in DataSet API, so I am launching severals jobs, with multiple env.execute() : topology1.define(); env.execute; topogy2.define(); env.execute; This is working fine when I am running it within IntellIiJ But when I am deploying it into my cluster, it only launch the first topology.. Could you please shed some light on this issue? Regards, Bastien |
The problem is that the REST API block on env.execute.
If you want to run your Flink job you have to submit it using the CLI client. As a workaround we wrote a Spring REST API that to run a job open an SSH connection to the job manager and execute the bin/flink run command.. If you're interested in I can share some code.. On Fri, Nov 23, 2018 at 4:32 PM bastien dine <[hidden email]> wrote:
|
Oh god, if we have some code with Accumulator after the env.execute(), this will not be executed on the JobManager too ? Thanks, I would be interested indeed ! Le ven. 23 nov. 2018 à 16:37, Flavio Pompermaier <[hidden email]> a écrit :
|
We solved this issue (of read the value of an accumulator) by calling a REST endpoint after the job end, in order to store the value associated to the accumulator in some database. This is very awful but I didn't find any better solution.. You need to import the following Java lib first: <dependency> <groupId>org.apache.sshd</groupId> <artifactId>sshd-core</artifactId> <version>2.1.0</version> </dependency> -------------------- FlinkSshJobRun.java import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; import org.apache.sshd.client.SshClient; import org.apache.sshd.client.session.ClientSession; import org.apache.sshd.common.config.keys.FilePasswordProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class FlinkSshJobRun implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(FlinkSshJobRun.class); private final Integer sshPort; private final String sshUser; private final String sshKeyPassword; private final String command; private final Integer sshTimeoutMillis; private final String targetHost; private final Charset encoding; public FlinkSshJobRun(FlinkProperties flinkProps, String jarFile, String entryClass, String argsStr, Integer parallelism) throws MalformedURLException { final URL jobManagerUrl = new URL(flinkProps.getJobManagerUrl()); this.targetHost = jobManagerUrl.getHost(); this.sshUser = flinkProps.getSsh().getUser(); this.sshPort = flinkProps.getSsh().getPort(); this.sshKeyPassword = flinkProps.getSsh().getKeyPassword(); this.sshTimeoutMillis = flinkProps.getSsh().getTimeoutMs(); String flinkBinClient = flinkProps.getSsh().getFlinkDistDir() + "/bin/flink"; if (parallelism != null) { flinkBinClient += " -p " + parallelism; } final String flinkUploadDir = flinkProps.getSsh().getFlinkJarUploadDir(); final Path jarPathOnServer = Paths.get(flinkUploadDir, jarFile);// this.command = flinkBinClient + " run -c " + entryClass // + " " + jarPathOnServer // + " " + (argsStr == null ? "" : argsStr); this.encoding = Charset.forName(StandardCharsets.UTF_8.name()); } @Override public void run() { ClientSession session = null; final ByteArrayOutputStream stdErr = new ByteArrayOutputStream(); final ByteArrayOutputStream stdOut = new ByteArrayOutputStream(); try (SshClient cl = SshClient.setUpDefaultClient();) { cl.setFilePasswordProvider(FilePasswordProvider.EMPTY); if (sshKeyPassword != null) { cl.setFilePasswordProvider(file -> sshKeyPassword); } cl.start(); session = cl.connect(sshUser, targetHost, sshPort)// .verify(sshTimeoutMillis)// .getSession(); session.auth().verify(Math.multiplyExact(sshTimeoutMillis, 4)); LOG.info("Executing SSH: {}@{}:{} -> {}", sshUser, targetHost, sshPort, command); session.executeRemoteCommand(command, stdOut, stdErr, encoding); LOG.info("SSH successfully executed {}@{}:{} -> {}", sshUser, targetHost, sshPort, command); final String stdOutTxt = getOsContentOnException(true, stdOut); LOG.info("SSH stdout for {}@{}:{} -> {}\n{}", sshUser, targetHost, sshPort, command, stdOutTxt); } catch (IOException ex) { final String stdOutTxt = getOsContentOnException(true, stdOut); final String stdErrTxt = getOsContentOnException(false, stdErr); final String errorMsg = String.format( "Error during SSH execution %s@%s:%s -> %s%nSTDOUT:%s%n%nSTDERR:%s", sshUser, targetHost, sshPort, command, stdOutTxt, stdErrTxt); LOG.error(errorMsg, ex); } finally { try { stdErr.close(); } catch (IOException ex) { LOG.error("Error during STDERR buffer close", ex); } try { stdOut.close(); } catch (IOException ex) { LOG.error("Error during STDOUT buffer close", ex); } if (session != null) { try { session.close(); } catch (IOException ex) { LOG.error("Error during SSH session close", ex); } } } } private String getOsContentOnException(boolean stdOut, final ByteArrayOutputStream os) { final String targetOutStream = stdOut ? "STDOUT" : "STDERR"; String ret = String.format("<Error while retrieving %s>", targetOutStream); try { ret = os.toString(StandardCharsets.UTF_8.name()); } catch (UnsupportedEncodingException ex2) { LOG.error("Error while reading " + targetOutStream + " after exception", ex2); } return ret; } } -------------------FlinkProperties.java package it.okkam.datalinks.job.api.config; import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties(prefix = "flink", ignoreUnknownFields = true) public class FlinkProperties { private String jobManagerUrl; private final Ssh ssh = new Ssh(); public String getJobManagerUrl() { return jobManagerUrl; } public void setJobManagerUrl(String jobManagerUrl) { this.jobManagerUrl = jobManagerUrl; } public Ssh getSsh() { return ssh; } public static class Async { private Integer corePoolSize; private Integer maxPoolSize; private Integer queueCapacity; public Integer getCorePoolSize() { return corePoolSize; } public void setCorePoolSize(final Integer corePoolSize) { this.corePoolSize = corePoolSize; } public Integer getMaxPoolSize() { return maxPoolSize; } public void setMaxPoolSize(final Integer maxPoolSize) { this.maxPoolSize = maxPoolSize; } public Integer getQueueCapacity() { return queueCapacity; } public void setQueueCapacity(final Integer queueCapacity) { this.queueCapacity = queueCapacity; } } public static class Ssh { private Integer port; private Integer timeoutMs; private String user; private String keyPassword; private String flinkDistDir; private String flinkJarUploadDir; private final Async async = new Async(); public Async getAsync() { return async; } public Integer getPort() { return port; } public void setPort(Integer port) { this.port = port; } public String getUser() { return user; } public void setUser(String user) { this.user = user; } public Integer getTimeoutMs() { return timeoutMs; } public void setTimeoutMs(Integer timeoutMs) { this.timeoutMs = timeoutMs; } public String getFlinkDistDir() { return flinkDistDir; } public void setFlinkDistDir(String flinkDistDir) { this.flinkDistDir = flinkDistDir; } public String getFlinkJarUploadDir() { return flinkJarUploadDir; } public void setFlinkJarUploadDir(String flinkJarUploadDir) { this.flinkJarUploadDir = flinkJarUploadDir; } public String getKeyPassword() { return keyPassword; } public void setKeyPassword(String keyPassword) { this.keyPassword = keyPassword; } } } ------------------- Rest service (via Spring boot) private TaskExecutor taskExecutor; @Override @GetMapping(METHOD_JOB_RUN + "/{" + PARAMS_JARFILE + ":.+}") @ResponseStatus(HttpStatus.OK) public JobRun runJob(// @PathVariable(name = PARAMS_JARFILE) String jarFile, // @RequestParam(name = "entry-class", required = false) String entryClass, @RequestParam(name = "program-args", required = false) String argsStr, @RequestParam(name = "parallelism", required = false) Integer parallelism) throws IOException { final long start = System.currentTimeMillis(); final String jobId = "OK"; //it could be set if Flink REST API would work as expected.. taskExecutor.execute(new FlinkSshJobRun(flinkProperties, jarFile, entryClass, argsStr, parallelism)); final long elapsed = System.currentTimeMillis() - start; return new JobRun(elapsed, null, jobId); } On Fri, Nov 23, 2018 at 4:47 PM bastien dine <[hidden email]> wrote:
Flavio Pompermaier Development Department OKKAM S.r.l. Tel. +(39) 0461 041809 |
Free forum by Nabble | Edit this page |