I would like to deploy flink on a local cluster built with KIND for the
purposes of e2e testing. The flink app is one of the components running within the system, which consists of other components (mostly written in Golang). I was wondering what would be the simplest way for me to deploy the flink job for such purposes. Any comments on how to accomplish that would be much appreciated. I was thinking on something REALLY simple, using local execution environment to put everything (flink-related) on a single container. For instance, locally, I do ` sbt run` for running my job, something along these lines could do the trick, but that is probably a very naive approach. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Salva, I think we could have the following options to make Flink application run on a Kubernetes cluster. 1. Local cluster This is what you have in mind. Flink now is really like a common java application, which you could start easily. 2. Standalone cluster on K8s By applying some yaml files, you could create a Flink application cluster. Compared with the local mode, you could have more TaskManagers to get better performance and isolation. Also you could enable the high-availability mode[2] to eliminate the SPOF your Flink application. 3. K8s operator Using K8s operator could make the Flink job submission easier. It could also help with managing the whole lifecycle of the Flink application, including upgrading, restarting, etc. Refer GCP flink-on-k8s-operator[3] for more information. 4. Native Flink on K8s Flink could also natively interact with K8s cluster if the kube config is configured correctly. You need to have a local Flink binary and run a command(e.g. flink run-application ...)[4]. This could also be done in a K8s pod, which is started with Flink official image. Best, Yang Salva Alcántara <[hidden email]> 于2021年1月13日周三 上午2:35写道: I would like to deploy flink on a local cluster built with KIND for the |
Hi Yang,
Many thanks for the summary of the different options. For now, as I mentioned, I am interested in the simplest approach since my purpose is to run some smoke (e2e) tests. It is not entirely clear to me how to run flink using option 1. I'm using the official scala template now (https://github.com/tillrohrmann/flink-project.g8/blob/master/src/main/g8/build.sbt): ``` ThisBuild / resolvers ++= Seq( "Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/", Resolver.mavenLocal ) name := "$name$" version := "$version$" organization := "$organization$" ThisBuild / scalaVersion := "$scala_version$" val flinkVersion = "$flink_version$" val flinkDependencies = Seq( "org.apache.flink" %% "flink-clients" % flinkVersion % "provided", "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided") lazy val root = (project in file(".")). settings( libraryDependencies ++= flinkDependencies ) assembly / mainClass := Some("$organization$.Job") // make run command include the provided dependencies Compile / run := Defaults.runTask(Compile / fullClasspath, Compile / run / mainClass, Compile / run / runner ).evaluated // stays inside the sbt console when we press "ctrl-c" while a Flink programme executes with "run" or "runMain" Compile / run / fork := true Global / cancelable := true // exclude Scala library from assembly assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false) ``` For local manual testing, I'm just running `sbt run` on the terminal. I would like to execute the same thing inside a docker container but without having run `sbt run`. Instead, I would like to run the jar directly. The jar for my job is produced running `sbt assembly`, but this jar does not include the `provided` dependencies (see above). So I cannot run the jar like this `java -jar my.jar` because these dependencies are not in scope. As a quick check, If I remove the "provided" annotations above as well as replace `includeScala=false` with `includeScala=true` in order to build a self-contained fat jar and I execute it by running `java -jar my.jar` then I am getting this error: "Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application." I've just come across this jira ticket: https://issues.apache.org/jira/browse/FLINK-19968 which seems to indicate that this is indeed a bug (I'm also using flink 1.11.2) and that theoretically what I am doing should work. In any case, is this the right approach for option 1 (local cluster) in your list? Again, what I want is a single container that runs the flink job jar as a normal regular java application. Thanks again for your support! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Can anyone explain why I am getting this error?
"Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application." I have tried a slightly different approach by running the jar that `sbt assembly`produces inside a container that looks like this (Dockerfile): ``` FROM flink:1.11.2-scala_2.12-java11 COPY ./path/to/my.jar my.jar ``` I have tried with different versions of flink (1.10.2 for example) and I am getting the same error. This should be pretty straightforward but I cannot make it work. My main looks like this ``` object MyApp extends App { ... val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment ... env.execute } ``` and it fails when it reaches the call to `getExecutionEnvironment`... -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
You could directly run the Flink job in local mode with java command. But you need to construct the Flink classpath and set it to the start command. An easier way is to build your image based on the Flink official image and copy your jar into the image. I believe you have done this. Then you could use the following yaml to create a K8s job to start the Flink job in the local mode. Refer here[1] for more information about how to build a runnable jar. apiVersion: batch/v1 kind: Job metadata: name: flink-local-job spec: template: metadata: labels: app: flink-local-job spec: restartPolicy: OnFailure containers: - name: flink-local-job image: flink:1.11.3 args: ["bash", "-c", "/opt/flink/bin/flink run --target local /opt/flink/examples/streaming/WindowJoin.jar"] Best, Yang Salva Alcántara <[hidden email]> 于2021年1月15日周五 下午3:22写道: Can anyone explain why I am getting this error? |
Hi Yang,
Thanks for your reply. I've given it a try within my container (I copy-pasted the Dockerfile for it) but running `flink run --target local my.jar` results in a different error for me: "org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not create actor system" Before executing `flink run --target local my.jar` I have started a flink cluster via `start-cluster-sh`... Salva -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Actually, you do not need to start a Flink cluster beforehand. Because a local cluster will be started in the same process with CliFrontend automatically. The local cluster means "all-in-one-process". "start-cluster-sh" will start one JobManager process and one TaskManager process. It is a standalone cluster. Compared to the above example, the only change is to update the WindowJoin.jar to your own user jar path. Best, Yang Salva Alcántara <[hidden email]> 于2021年1月15日周五 下午4:57写道: Hi Yang, |
The example you provided works just fine, but when I replace the jar with
mine I get this error instead: ``` The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not create actor system at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: java.lang.Exception: Could not create actor system at org.apache.flink.runtime.clusterframework.BootstrapTools.startLocalActorSystem(BootstrapTools.java:263) at org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:341) at org.apache.flink.runtime.minicluster.MiniCluster.createLocalRpcService(MiniCluster.java:793) at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:271) at org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:87) at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1810) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1679) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:688) at ... ``` Any thoughts on this? However, my job runs just fine by doing `sbt run` or from IntelliJ... -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Yang Wang
Hi Yang,
Just to see that I finally found the problem: lazy val log4j = "org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.13.3" % "provided" This dependency was not added to my jar (but sbt was including it for the run task, that is why it worked with `sbt run`). After adding this dependency to the jar (or providing the library as a jar in the container) everything worked as expected. Thanks a lot for your help! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I am not very familiar with scala and glad to hear that you solved the problem. Best, Yang Salva Alcántara <[hidden email]> 于2021年1月15日周五 下午9:19写道: Hi Yang, |
Free forum by Nabble | Edit this page |