Hi,
I am a beginner in Flink and I'm trying to deploy a simple example using a java client in a remote Flink server (1.4.0). I'm using org.apache.flink.client. this is the used code:
but when I try to deploy the jar I receive the following error: 16:03:20,035 INFO org.apache.flink.client. Exception in thread "main" org.apache.flink.client. at org.apache.flink.client. at org.apache.flink.client. at org.apache.flink.client. at org.apache.flink.client. at org.apache.flink.api.java. at org.apache.flink.api.java. at org.apache.flink.api.java. at flink.Job.main(Job.java:67) at sun.reflect. at sun.reflect. at sun.reflect. at java.lang.reflect.Method. at org.apache.flink.client. at org.apache.flink.client. at org.apache.flink.client. at flink.DeployJob.main( Caused by: org.apache.flink.runtime. at org.apache.flink.runtime.util. at org.apache.flink.client. at org.apache.flink.client. ... 15 more Caused by: java.util.concurrent. at scala.concurrent.impl.Promise$ at scala.concurrent.impl.Promise$ at scala.concurrent.Await$$ at scala.concurrent.BlockContext$ at scala.concurrent.Await$. at scala.concurrent.Await.result( at org.apache.flink.runtime.util. ... 17 more Maybe I missed some configuration of the client. Can you help me to solve the problem? Thanks |
Hi Luigi,
can you try to load an entire configuration file via GlobalConfiguration.loadConfiguration(flinkConfigDir). Maybe you tell us a little bit what you want to achieve? Is the programmatic submission a requirement for you? Did you consider using the RemoteStreamEnvironment? Regards, Timo Am 1/17/18 um 5:08 PM schrieb Luigi Sgaglione:
|
Hi Timo, my objective is to create a web interface that allows me to edit and deploy jobs on Flink. To do so I'm evaluating all possibilities provided by Flink APIs. What do you think that is the best solution? Thanks 2018-01-18 9:39 GMT+01:00 Timo Walther <[hidden email]>:
|
In reply to this post by Timo Walther
Hi Luigi,
I'm also working on a solution for submitting jobs programmatically. You can look into my working branch [1]. As far as I know, the best and most stable solution is using the ClusterClient. But this is internal API and might change. You could also use Flink's REST API for submitting a job [2]. Regards, Timo [1] https://github.com/twalthr/flink/blob/FLINK-7594_rebased/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#submitting-programs Am 1/18/18 um 11:41 AM schrieb Luigi Sgaglione:
|
Hi Timo, I think that the REST API is the most suitable solution. Thanks. So, I'm trying to use the Flink REST API and I'm able to perform get request but not the post one. In particular when I issue a post to upload the jar I receive this error form the server: {"error": "Failed to upload the file."} this is the used code: URL serverUrl = new URL("http://192.168.149.130:8081/jars/upload"); HttpURLConnection urlConnection = (HttpURLConnection) serverUrl.openConnection(); String boundaryString = "------BoundaryXXXX"; String crlf = "\r\n"; String fileUrl = "Test-1.jar"; File jarToUpload = new File(fileUrl); urlConnection.setDoOutput(true); urlConnection.setRequestMethod("POST"); urlConnection.setRequestProperty("Connection", "Keep-Alive"); urlConnection.setRequestProperty("Cache-Control", "no-cache"); urlConnection.addRequestProperty("Content-Type", "multipart/form-data; boundary=" + boundaryString); OutputStream outputStreamToRequestBody = urlConnection.getOutputStream(); BufferedWriter httpRequestBodyWriter = new BufferedWriter(new OutputStreamWriter(outputStreamToRequestBody)); String payloadString = boundaryString + crlf + "Content-Disposition: form-data;" + " name=\"jarfile\";" + " filename=\"Test-1.jar\""+crlf + "Content-Type: application/x-java-archive"+crlf+crlf; System.out.println(payloadString); httpRequestBodyWriter.write(payloadString); httpRequestBodyWriter.flush(); // Write the actual file contents FileInputStream inputStream = new FileInputStream(jarToUpload); int bytesRead; byte[] dataBuffer = new byte[1024]; while((bytesRead = inputStream.read(dataBuffer)) != -1) { outputStreamToRequestBody.write(dataBuffer, 0, bytesRead); } outputStreamToRequestBody.flush(); httpRequestBodyWriter.write(boundaryString +crlf); httpRequestBodyWriter.flush(); inputStream.close(); outputStreamToRequestBody.close(); httpRequestBodyWriter.close(); BufferedReader httpResponseReader = new BufferedReader(new InputStreamReader(urlConnection.getInputStream())); String lineRead; while((lineRead = httpResponseReader.readLine()) != null) { System.out.println(lineRead); } The documentation of Flink REST API is not so detailed, or better it doesn't include a clear example. Do you have any idea to solve the error? thanks 2018-01-18 12:54 GMT+01:00 Timo Walther <[hidden email]>:
|
Solved. this is the corret code to deploy a Job programmatically via REST API. Thanks URL serverUrl = new URL("http://192.168.149.130:8081/jars/upload"); HttpURLConnection urlConnection = (HttpURLConnection) serverUrl.openConnection(); String boundaryString = "------BoundaryXXXX"; String crlf = "\r\n"; String fileUrl = "Test-1.jar"; File jarToUpload = new File(fileUrl); urlConnection.setUseCaches(false); urlConnection.setDoOutput(true); urlConnection.setDoInput(true); urlConnection.setRequestMethod("POST"); urlConnection.setRequestProperty("Connection", "Keep-Alive"); urlConnection.setRequestProperty("Cache-Control", "no-cache"); urlConnection.addRequestProperty("Content-Type", "multipart/form-data; boundary=" + boundaryString); OutputStream outputStreamToRequestBody = urlConnection.getOutputStream(); BufferedWriter httpRequestBodyWriter = new BufferedWriter(new OutputStreamWriter(outputStreamToRequestBody)); // Include the section to describe the file String payloadString = "--"+boundaryString + crlf + "Content-Disposition: form-data;" + " name=\"jarfile\";" + " filename=\"Test-1.jar\""+crlf + "Content-Type: application/x-java-archive"+crlf+crlf; System.out.println(payloadString); httpRequestBodyWriter.write(payloadString); httpRequestBodyWriter.flush(); // Write the actual file contents FileInputStream inputStream = new FileInputStream(jarToUpload); int bytesRead; byte[] dataBuffer = new byte[1024]; while((bytesRead = inputStream.read(dataBuffer)) != -1) { outputStreamToRequestBody.write(dataBuffer, 0, bytesRead); } outputStreamToRequestBody.flush(); httpRequestBodyWriter.write(crlf+"--"+boundaryString+"--" +crlf); httpRequestBodyWriter.flush(); inputStream.close(); outputStreamToRequestBody.close(); httpRequestBodyWriter.close(); BufferedReader httpResponseReader = new BufferedReader(new InputStreamReader(urlConnection.getInputStream())); String lineRead; while((lineRead = httpResponseReader.readLine()) != null) { System.out.println(lineRead); } } Best Regards Luigi 2018-01-18 17:02 GMT+01:00 Luigi Sgaglione <[hidden email]>:
|
Free forum by Nabble | Edit this page |