Submitting jobs via Java code

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Submitting jobs via Java code

Luigi Sgaglione
Hi,

I am a beginner in Flink and I'm trying to deploy a simple example using a java client in a remote Flink server (1.4.0).

I'm using org.apache.flink.client.program.Client

this is the used code:
Configuration config = new Configuration();
config.setString("jobmanager.rpc.address", "192.168.149.130");
config.setInteger("jobmanager.rpc.port", 6123);

Client c = new Client(config);

PackagedProgram prg = new PackagedProgram(new File("target/Test-1.jar"));
c.runDetached(prg, 1);

but when I try to deploy the jar I receive the following error:

16:03:20,035 INFO  org.apache.flink.client.program.Client                        - Looking up JobManager
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Failed to retrieve the JobManager gateway.
at org.apache.flink.client.program.Client.runDetached(Client.java:380)
at org.apache.flink.client.program.Client.runDetached(Client.java:355)
at org.apache.flink.client.program.Client.runDetached(Client.java:340)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:74)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:804)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1495)
at flink.Job.main(Job.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
at org.apache.flink.client.program.Client.runDetached(Client.java:279)
at flink.DeployJob.main(DeployJob.java:24)
Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader gateway
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:102)
at org.apache.flink.client.program.Client.getJobManagerGateway(Client.java:567)
at org.apache.flink.client.program.Client.runDetached(Client.java:378)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:116)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:116)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:100)
... 17 more



Maybe I missed some configuration of the client.
Can you help me to solve the problem?

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Submitting jobs via Java code

Timo Walther
Hi Luigi,

can you try to load an entire configuration file via GlobalConfiguration.loadConfiguration(flinkConfigDir). Maybe you tell us a little bit what you want to achieve?

Is the programmatic submission a requirement for you? Did you consider using the RemoteStreamEnvironment?

Regards,
Timo


Am 1/17/18 um 5:08 PM schrieb Luigi Sgaglione:
Hi,

I am a beginner in Flink and I'm trying to deploy a simple example using a java client in a remote Flink server (1.4.0).

I'm using org.apache.flink.client.program.Client

this is the used code:
Configuration config = new Configuration();
config.setString("jobmanager.rpc.address", "192.168.149.130");
config.setInteger("jobmanager.rpc.port", 6123);

Client c = new Client(config);

PackagedProgram prg = new PackagedProgram(new File("target/Test-1.jar"));
c.runDetached(prg, 1);

but when I try to deploy the jar I receive the following error:

16:03:20,035 INFO  org.apache.flink.client.program.Client                        - Looking up JobManager
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Failed to retrieve the JobManager gateway.
at org.apache.flink.client.program.Client.runDetached(Client.java:380)
at org.apache.flink.client.program.Client.runDetached(Client.java:355)
at org.apache.flink.client.program.Client.runDetached(Client.java:340)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:74)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:804)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1495)
at flink.Job.main(Job.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
at org.apache.flink.client.program.Client.runDetached(Client.java:279)
at flink.DeployJob.main(DeployJob.java:24)
Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader gateway
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:102)
at org.apache.flink.client.program.Client.getJobManagerGateway(Client.java:567)
at org.apache.flink.client.program.Client.runDetached(Client.java:378)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:116)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:116)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:100)
... 17 more



Maybe I missed some configuration of the client.
Can you help me to solve the problem?

Thanks


Reply | Threaded
Open this post in threaded view
|

Re: Submitting jobs via Java code

Luigi Sgaglione
Hi Timo,
my objective is to create a web interface that allows me to edit and deploy jobs on Flink.

To do so I'm evaluating all possibilities provided by Flink APIs.

What do you think that is the best solution?

Thanks

2018-01-18 9:39 GMT+01:00 Timo Walther <[hidden email]>:
Hi Luigi,

can you try to load an entire configuration file via GlobalConfiguration.loadConfiguration(flinkConfigDir). Maybe you tell us a little bit what you want to achieve?

Is the programmatic submission a requirement for you? Did you consider using the RemoteStreamEnvironment?

Regards,
Timo


Am 1/17/18 um 5:08 PM schrieb Luigi Sgaglione:
Hi,

I am a beginner in Flink and I'm trying to deploy a simple example using a java client in a remote Flink server (1.4.0).

I'm using org.apache.flink.client.program.Client

this is the used code:
Configuration config = new Configuration();
config.setString("jobmanager.rpc.address", "192.168.149.130");
config.setInteger("jobmanager.rpc.port", 6123);

Client c = new Client(config);

PackagedProgram prg = new PackagedProgram(new File("target/Test-1.jar"));
c.runDetached(prg, 1);

but when I try to deploy the jar I receive the following error:

16:03:20,035 INFO  org.apache.flink.client.program.Client                        - Looking up JobManager
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Failed to retrieve the JobManager gateway.
at org.apache.flink.client.program.Client.runDetached(Client.java:380)
at org.apache.flink.client.program.Client.runDetached(Client.java:355)
at org.apache.flink.client.program.Client.runDetached(Client.java:340)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:74)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:804)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1495)
at flink.Job.main(Job.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
at org.apache.flink.client.program.Client.runDetached(Client.java:279)
at flink.DeployJob.main(DeployJob.java:24)
Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader gateway
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:102)
at org.apache.flink.client.program.Client.getJobManagerGateway(Client.java:567)
at org.apache.flink.client.program.Client.runDetached(Client.java:378)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:116)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:116)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:100)
... 17 more



Maybe I missed some configuration of the client.
Can you help me to solve the problem?

Thanks



Reply | Threaded
Open this post in threaded view
|

Re: Submitting jobs via Java code

Timo Walther
In reply to this post by Timo Walther
Hi Luigi,

I'm also working on a solution for submitting jobs programmatically. You can look into my working branch [1]. As far as I know, the best and most stable solution is using the ClusterClient. But this is internal API and might change.

You could also use Flink's REST API for submitting a job [2].

Regards,
Timo

[1] https://github.com/twalthr/flink/blob/FLINK-7594_rebased/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#submitting-programs

Am 1/18/18 um 11:41 AM schrieb Luigi Sgaglione:
Hi Timo,
my objective is to create a web interface that allows me to edit and deploy jobs on Flink.

To do so I'm evaluating all possibilities provided by Flink APIs.

What do you think that is the best solution?

Thanks
Luigi

Il 18/gen/2018 09:39, "Timo Walther" <[hidden email]> ha scritto:
Hi Luigi,

can you try to load an entire configuration file via GlobalConfiguration.loadConfiguration(flinkConfigDir). Maybe you tell us a little bit what you want to achieve?

Is the programmatic submission a requirement for you? Did you consider using the RemoteStreamEnvironment?

Regards,
Timo


Am 1/17/18 um 5:08 PM schrieb Luigi Sgaglione:
Hi,

I am a beginner in Flink and I'm trying to deploy a simple example using a java client in a remote Flink server (1.4.0).

I'm using org.apache.flink.client.program.Client

this is the used code:
Configuration config = new Configuration();
config.setString("jobmanager.rpc.address", "192.168.149.130");
config.setInteger("jobmanager.rpc.port", 6123);

Client c = new Client(config);

PackagedProgram prg = new PackagedProgram(new File("target/Test-1.jar"));
c.runDetached(prg, 1);

but when I try to deploy the jar I receive the following error:

16:03:20,035 INFO  org.apache.flink.client.program.Client                        - Looking up JobManager
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Failed to retrieve the JobManager gateway.
at org.apache.flink.client.program.Client.runDetached(Client.java:380)
at org.apache.flink.client.program.Client.runDetached(Client.java:355)
at org.apache.flink.client.program.Client.runDetached(Client.java:340)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:74)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:804)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1495)
at flink.Job.main(Job.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
at org.apache.flink.client.program.Client.runDetached(Client.java:279)
at flink.DeployJob.main(DeployJob.java:24)
Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader gateway
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:102)
at org.apache.flink.client.program.Client.getJobManagerGateway(Client.java:567)
at org.apache.flink.client.program.Client.runDetached(Client.java:378)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:116)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:116)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:100)
... 17 more



Maybe I missed some configuration of the client.
Can you help me to solve the problem?

Thanks



Reply | Threaded
Open this post in threaded view
|

Re: Submitting jobs via Java code

Luigi Sgaglione
Hi Timo,

I think that the REST API is the most suitable solution. Thanks.

So, I'm trying to use the Flink REST API and I'm able to perform get request but not the post one.

In particular when I issue a post to upload the jar I receive this error form the server: {"error": "Failed to upload the file."}

this is the used code:

     URL serverUrl = new URL("http://192.168.149.130:8081/jars/upload");
HttpURLConnection urlConnection = (HttpURLConnection) serverUrl.openConnection();
 
String boundaryString = "------BoundaryXXXX";
String crlf = "\r\n";
String fileUrl = "Test-1.jar";
File jarToUpload = new File(fileUrl);
 
urlConnection.setDoOutput(true);
urlConnection.setRequestMethod("POST");
urlConnection.setRequestProperty("Connection", "Keep-Alive");
urlConnection.setRequestProperty("Cache-Control", "no-cache");
urlConnection.addRequestProperty("Content-Type", "multipart/form-data; boundary=" + boundaryString);
 
OutputStream outputStreamToRequestBody = urlConnection.getOutputStream();
BufferedWriter httpRequestBodyWriter =
    new BufferedWriter(new OutputStreamWriter(outputStreamToRequestBody));

String payloadString = boundaryString + crlf + "Content-Disposition: form-data;"
        + " name=\"jarfile\";"
        + " filename=\"Test-1.jar\""+crlf
        + "Content-Type: application/x-java-archive"+crlf+crlf;
System.out.println(payloadString);
httpRequestBodyWriter.write(payloadString);
httpRequestBodyWriter.flush();

// Write the actual file contents
FileInputStream inputStream = new FileInputStream(jarToUpload);
 
int bytesRead;
byte[] dataBuffer = new byte[1024];
while((bytesRead = inputStream.read(dataBuffer)) != -1) {
    outputStreamToRequestBody.write(dataBuffer, 0, bytesRead);
}
 
outputStreamToRequestBody.flush();
 
httpRequestBodyWriter.write(boundaryString +crlf);
httpRequestBodyWriter.flush();
 
inputStream.close();
outputStreamToRequestBody.close();
httpRequestBodyWriter.close();
BufferedReader httpResponseReader =
    new BufferedReader(new InputStreamReader(urlConnection.getInputStream()));
String lineRead;
while((lineRead = httpResponseReader.readLine()) != null) {
    System.out.println(lineRead);
}

The documentation of Flink REST API is not so detailed, or better it doesn't include a clear example.

Do you have any idea to solve the error?


thanks

2018-01-18 12:54 GMT+01:00 Timo Walther <[hidden email]>:
Hi Luigi,

I'm also working on a solution for submitting jobs programmatically. You can look into my working branch [1]. As far as I know, the best and most stable solution is using the ClusterClient. But this is internal API and might change.

You could also use Flink's REST API for submitting a job [2].

Regards,
Timo

[1] https://github.com/twalthr/flink/blob/FLINK-7594_rebased/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#submitting-programs

Am 1/18/18 um 11:41 AM schrieb Luigi Sgaglione:
Hi Timo,
my objective is to create a web interface that allows me to edit and deploy jobs on Flink.

To do so I'm evaluating all possibilities provided by Flink APIs.

What do you think that is the best solution?

Thanks
Luigi

Il 18/gen/2018 09:39, "Timo Walther" <[hidden email]> ha scritto:
Hi Luigi,

can you try to load an entire configuration file via GlobalConfiguration.loadConfiguration(flinkConfigDir). Maybe you tell us a little bit what you want to achieve?

Is the programmatic submission a requirement for you? Did you consider using the RemoteStreamEnvironment?

Regards,
Timo


Am 1/17/18 um 5:08 PM schrieb Luigi Sgaglione:
Hi,

I am a beginner in Flink and I'm trying to deploy a simple example using a java client in a remote Flink server (1.4.0).

I'm using org.apache.flink.client.program.Client

this is the used code:
Configuration config = new Configuration();
config.setString("jobmanager.rpc.address", "192.168.149.130");
config.setInteger("jobmanager.rpc.port", 6123);

Client c = new Client(config);

PackagedProgram prg = new PackagedProgram(new File("target/Test-1.jar"));
c.runDetached(prg, 1);

but when I try to deploy the jar I receive the following error:

16:03:20,035 INFO  org.apache.flink.client.program.Client                        - Looking up JobManager
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Failed to retrieve the JobManager gateway.
at org.apache.flink.client.program.Client.runDetached(Client.java:380)
at org.apache.flink.client.program.Client.runDetached(Client.java:355)
at org.apache.flink.client.program.Client.runDetached(Client.java:340)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:74)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:804)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1495)
at flink.Job.main(Job.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
at org.apache.flink.client.program.Client.runDetached(Client.java:279)
at flink.DeployJob.main(DeployJob.java:24)
Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader gateway
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:102)
at org.apache.flink.client.program.Client.getJobManagerGateway(Client.java:567)
at org.apache.flink.client.program.Client.runDetached(Client.java:378)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:116)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:116)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:100)
... 17 more



Maybe I missed some configuration of the client.
Can you help me to solve the problem?

Thanks




Reply | Threaded
Open this post in threaded view
|

Re: Submitting jobs via Java code

Luigi Sgaglione
Solved.

this is the corret code to deploy a Job programmatically via REST API.

Thanks

URL serverUrl = new URL("http://192.168.149.130:8081/jars/upload");
HttpURLConnection urlConnection = (HttpURLConnection) serverUrl.openConnection();
 
String boundaryString = "------BoundaryXXXX";
String crlf = "\r\n";
String fileUrl = "Test-1.jar";
File jarToUpload = new File(fileUrl);
 
urlConnection.setUseCaches(false);
urlConnection.setDoOutput(true); 
urlConnection.setDoInput(true);
        
urlConnection.setRequestMethod("POST");
urlConnection.setRequestProperty("Connection", "Keep-Alive");
urlConnection.setRequestProperty("Cache-Control", "no-cache");
urlConnection.addRequestProperty("Content-Type", "multipart/form-data; boundary=" + boundaryString);
 
OutputStream outputStreamToRequestBody = urlConnection.getOutputStream();
BufferedWriter httpRequestBodyWriter =
    new BufferedWriter(new OutputStreamWriter(outputStreamToRequestBody));
 
// Include the section to describe the file

String payloadString = "--"+boundaryString + crlf + "Content-Disposition: form-data;"
        + " name=\"jarfile\";"
        + " filename=\"Test-1.jar\""+crlf
        + "Content-Type: application/x-java-archive"+crlf+crlf;
System.out.println(payloadString);
httpRequestBodyWriter.write(payloadString);
httpRequestBodyWriter.flush();

// Write the actual file contents
FileInputStream inputStream = new FileInputStream(jarToUpload);
 
int bytesRead;
byte[] dataBuffer = new byte[1024];
while((bytesRead = inputStream.read(dataBuffer)) != -1) {
    outputStreamToRequestBody.write(dataBuffer, 0, bytesRead);
}
 
outputStreamToRequestBody.flush();
 
httpRequestBodyWriter.write(crlf+"--"+boundaryString+"--" +crlf);
httpRequestBodyWriter.flush();
 
inputStream.close();
outputStreamToRequestBody.close();
httpRequestBodyWriter.close();
BufferedReader httpResponseReader =
    new BufferedReader(new InputStreamReader(urlConnection.getInputStream()));
String lineRead;
while((lineRead = httpResponseReader.readLine()) != null) {
    System.out.println(lineRead);
}

}

Best Regards
Luigi 

2018-01-18 17:02 GMT+01:00 Luigi Sgaglione <[hidden email]>:
Hi Timo,

I think that the REST API is the most suitable solution. Thanks.

So, I'm trying to use the Flink REST API and I'm able to perform get request but not the post one.

In particular when I issue a post to upload the jar I receive this error form the server: {"error": "Failed to upload the file."}

this is the used code:

     URL serverUrl = new URL("http://192.168.149.130:8081/jars/upload");
HttpURLConnection urlConnection = (HttpURLConnection) serverUrl.openConnection();
 
String boundaryString = "------BoundaryXXXX";
String crlf = "\r\n";
String fileUrl = "Test-1.jar";
File jarToUpload = new File(fileUrl);
 
urlConnection.setDoOutput(true);
urlConnection.setRequestMethod("POST");
urlConnection.setRequestProperty("Connection", "Keep-Alive");
urlConnection.setRequestProperty("Cache-Control", "no-cache");
urlConnection.addRequestProperty("Content-Type", "multipart/form-data; boundary=" + boundaryString);
 
OutputStream outputStreamToRequestBody = urlConnection.getOutputStream();
BufferedWriter httpRequestBodyWriter =
    new BufferedWriter(new OutputStreamWriter(outputStreamToRequestBody));

String payloadString = boundaryString + crlf + "Content-Disposition: form-data;"
        + " name=\"jarfile\";"
        + " filename=\"Test-1.jar\""+crlf
        + "Content-Type: application/x-java-archive"+crlf+crlf;
System.out.println(payloadString);
httpRequestBodyWriter.write(payloadString);
httpRequestBodyWriter.flush();

// Write the actual file contents
FileInputStream inputStream = new FileInputStream(jarToUpload);
 
int bytesRead;
byte[] dataBuffer = new byte[1024];
while((bytesRead = inputStream.read(dataBuffer)) != -1) {
    outputStreamToRequestBody.write(dataBuffer, 0, bytesRead);
}
 
outputStreamToRequestBody.flush();
 
httpRequestBodyWriter.write(boundaryString +crlf);
httpRequestBodyWriter.flush();
 
inputStream.close();
outputStreamToRequestBody.close();
httpRequestBodyWriter.close();
BufferedReader httpResponseReader =
    new BufferedReader(new InputStreamReader(urlConnection.getInputStream()));
String lineRead;
while((lineRead = httpResponseReader.readLine()) != null) {
    System.out.println(lineRead);
}

The documentation of Flink REST API is not so detailed, or better it doesn't include a clear example.

Do you have any idea to solve the error?


thanks

2018-01-18 12:54 GMT+01:00 Timo Walther <[hidden email]>:
Hi Luigi,

I'm also working on a solution for submitting jobs programmatically. You can look into my working branch [1]. As far as I know, the best and most stable solution is using the ClusterClient. But this is internal API and might change.

You could also use Flink's REST API for submitting a job [2].

Regards,
Timo

[1] https://github.com/twalthr/flink/blob/FLINK-7594_rebased/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#submitting-programs

Am 1/18/18 um 11:41 AM schrieb Luigi Sgaglione:
Hi Timo,
my objective is to create a web interface that allows me to edit and deploy jobs on Flink.

To do so I'm evaluating all possibilities provided by Flink APIs.

What do you think that is the best solution?

Thanks
Luigi

Il 18/gen/2018 09:39, "Timo Walther" <[hidden email]> ha scritto:
Hi Luigi,

can you try to load an entire configuration file via GlobalConfiguration.loadConfiguration(flinkConfigDir). Maybe you tell us a little bit what you want to achieve?

Is the programmatic submission a requirement for you? Did you consider using the RemoteStreamEnvironment?

Regards,
Timo


Am 1/17/18 um 5:08 PM schrieb Luigi Sgaglione:
Hi,

I am a beginner in Flink and I'm trying to deploy a simple example using a java client in a remote Flink server (1.4.0).

I'm using org.apache.flink.client.program.Client

this is the used code:
Configuration config = new Configuration();
config.setString("jobmanager.rpc.address", "192.168.149.130");
config.setInteger("jobmanager.rpc.port", 6123);

Client c = new Client(config);

PackagedProgram prg = new PackagedProgram(new File("target/Test-1.jar"));
c.runDetached(prg, 1);

but when I try to deploy the jar I receive the following error:

16:03:20,035 INFO  org.apache.flink.client.program.Client                        - Looking up JobManager
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Failed to retrieve the JobManager gateway.
at org.apache.flink.client.program.Client.runDetached(Client.java:380)
at org.apache.flink.client.program.Client.runDetached(Client.java:355)
at org.apache.flink.client.program.Client.runDetached(Client.java:340)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:74)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:804)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1495)
at flink.Job.main(Job.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
at org.apache.flink.client.program.Client.runDetached(Client.java:279)
at flink.DeployJob.main(DeployJob.java:24)
Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader gateway
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:102)
at org.apache.flink.client.program.Client.getJobManagerGateway(Client.java:567)
at org.apache.flink.client.program.Client.runDetached(Client.java:378)
... 15 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:116)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:116)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:100)
... 17 more



Maybe I missed some configuration of the client.
Can you help me to solve the problem?

Thanks