Simplest way to deploy flink job on k8s for e2e testing purposes

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Simplest way to deploy flink job on k8s for e2e testing purposes

Salva Alcántara
I would like to deploy flink on a local cluster built with KIND for the
purposes of e2e testing. The flink app is one of the components running
within the system, which consists of other components (mostly written in
Golang). I was wondering what would be the simplest way for me to deploy the
flink job for such purposes. Any comments on how to accomplish that would be
much appreciated. I was thinking on something REALLY simple, using local
execution environment to put everything (flink-related) on a single
container. For instance, locally, I do ` sbt run` for running my job,
something along these lines could do the trick, but that is probably a very
naive approach.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Simplest way to deploy flink job on k8s for e2e testing purposes

Yang Wang
Hi Salva,

I think we could have the following options to make Flink application run on a Kubernetes cluster.

1. Local cluster
This is what you have in mind. Flink now is really like a common java application, which you could start easily.

2. Standalone cluster on K8s
By applying some yaml files, you could create a Flink application cluster. Compared with the local mode, you could
have more TaskManagers to get better performance and isolation. Also you could enable the high-availability mode[2]
to eliminate the SPOF your Flink application.

3. K8s operator
Using K8s operator could make the Flink job submission easier. It could also help with managing the whole lifecycle of
the Flink application, including upgrading, restarting, etc. Refer GCP flink-on-k8s-operator[3] for more information.

4. Native Flink on K8s
Flink could also natively interact with K8s cluster if the kube config is configured correctly. You need to have a local Flink
binary and run a command(e.g. flink run-application ...)[4]. This could also be done in a K8s pod, which is started with Flink
official image.



Best,
Yang

Salva Alcántara <[hidden email]> 于2021年1月13日周三 上午2:35写道:
I would like to deploy flink on a local cluster built with KIND for the
purposes of e2e testing. The flink app is one of the components running
within the system, which consists of other components (mostly written in
Golang). I was wondering what would be the simplest way for me to deploy the
flink job for such purposes. Any comments on how to accomplish that would be
much appreciated. I was thinking on something REALLY simple, using local
execution environment to put everything (flink-related) on a single
container. For instance, locally, I do ` sbt run` for running my job,
something along these lines could do the trick, but that is probably a very
naive approach.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Simplest way to deploy flink job on k8s for e2e testing purposes

Salva Alcántara
Hi Yang,

Many thanks for the summary of the different options. For now, as I
mentioned, I am interested in the simplest approach since my purpose is to
run some smoke (e2e) tests. It is not entirely clear to me how to run flink
using option 1. I'm using the official scala template now
(https://github.com/tillrohrmann/flink-project.g8/blob/master/src/main/g8/build.sbt):

```
ThisBuild / resolvers ++= Seq(
    "Apache Development Snapshot Repository" at
"https://repository.apache.org/content/repositories/snapshots/",
    Resolver.mavenLocal
)

name := "$name$"

version := "$version$"

organization := "$organization$"

ThisBuild / scalaVersion := "$scala_version$"

val flinkVersion = "$flink_version$"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided")

lazy val root = (project in file(".")).
  settings(
    libraryDependencies ++= flinkDependencies
  )

assembly / mainClass := Some("$organization$.Job")

// make run command include the provided dependencies
Compile / run  := Defaults.runTask(Compile / fullClasspath,
                                   Compile / run / mainClass,
                                   Compile / run / runner
                                  ).evaluated

// stays inside the sbt console when we press "ctrl-c" while a Flink
programme executes with "run" or "runMain"
Compile / run / fork := true
Global / cancelable := true

// exclude Scala library from assembly
assembly / assemblyOption  := (assembly /
assemblyOption).value.copy(includeScala = false)
```

For local manual testing, I'm just running `sbt run` on the terminal. I
would like to execute the same thing inside a docker container but without
having run `sbt run`. Instead, I would like to run the jar directly.

The jar for my job is produced running `sbt assembly`, but this jar does not
include the `provided` dependencies (see above). So I cannot run the jar
like this `java -jar my.jar` because these dependencies are not in scope.

As a quick check, If I remove the "provided" annotations above as well as
replace `includeScala=false` with `includeScala=true` in order to build a
self-contained fat jar and I execute it by running `java -jar my.jar` then I
am getting this error:

"Exception in thread "main" java.lang.IllegalStateException: No
ExecutorFactory found to execute the application."

I've just come across this jira ticket:
https://issues.apache.org/jira/browse/FLINK-19968

which seems to indicate that this is indeed a bug (I'm also using flink
1.11.2) and that theoretically what I am doing should work.

In any case, is this the right approach for option 1 (local cluster) in your
list? Again, what I want is a single container that runs the flink job jar
as a normal regular java application.

Thanks again for your support!







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Simplest way to deploy flink job on k8s for e2e testing purposes

Salva Alcántara
Can anyone explain why I am getting this error?

"Exception in thread "main" java.lang.IllegalStateException: No
ExecutorFactory found to execute the application."

I have tried a slightly different approach by running the jar that `sbt
assembly`produces inside a container that looks like this (Dockerfile):

```
FROM flink:1.11.2-scala_2.12-java11
COPY ./path/to/my.jar my.jar
```

I have tried with different versions of flink (1.10.2 for example) and I am
getting the same error. This should be pretty straightforward but I cannot
make it work. My main looks like this

```
object MyApp extends App {
  ...
  val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment

  ...
  env.execute
}
```

and it fails when it reaches the call to `getExecutionEnvironment`...




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Simplest way to deploy flink job on k8s for e2e testing purposes

Yang Wang
You could directly run the Flink job in local mode with java command. But you
need to construct the Flink classpath and set it to the start command. An easier
way is to build your image based on the Flink official image and copy your jar into
the image. I believe you have done this. Then you could use the following yaml
to create a K8s job to start the Flink job in the local mode.

Refer here[1] for more information about how to build a runnable jar.


apiVersion: batch/v1
kind: Job
metadata:
  name: flink-local-job
spec:
  template:
    metadata:
      labels:
        app: flink-local-job
    spec:
      restartPolicy: OnFailure
      containers:
        - name: flink-local-job
          image: flink:1.11.3
          args: ["bash", "-c", "/opt/flink/bin/flink run --target local /opt/flink/examples/streaming/WindowJoin.jar"]



Best,
Yang


Salva Alcántara <[hidden email]> 于2021年1月15日周五 下午3:22写道:
Can anyone explain why I am getting this error?

"Exception in thread "main" java.lang.IllegalStateException: No
ExecutorFactory found to execute the application."

I have tried a slightly different approach by running the jar that `sbt
assembly`produces inside a container that looks like this (Dockerfile):

```
FROM flink:1.11.2-scala_2.12-java11
COPY ./path/to/my.jar my.jar
```

I have tried with different versions of flink (1.10.2 for example) and I am
getting the same error. This should be pretty straightforward but I cannot
make it work. My main looks like this

```
object MyApp extends App {
  ...
  val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment

  ...
  env.execute
}
```

and it fails when it reaches the call to `getExecutionEnvironment`...




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Simplest way to deploy flink job on k8s for e2e testing purposes

Salva Alcántara
Hi Yang,

Thanks for your reply. I've given it a try within my container (I
copy-pasted the Dockerfile for it) but running `flink run --target local
my.jar` results in a different error for me:

"org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Could not create actor system"

Before executing `flink run --target local my.jar` I have started a flink
cluster via `start-cluster-sh`...

Salva



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Simplest way to deploy flink job on k8s for e2e testing purposes

Yang Wang
Actually, you do not need to start a Flink cluster beforehand. Because a local cluster will be started
in the same process with CliFrontend automatically. The local cluster means "all-in-one-process".
"start-cluster-sh" will start one JobManager process and one TaskManager process. It is a standalone cluster.

Compared to the above example, the only change is to update the WindowJoin.jar to your own user jar path.

Best,
Yang

Salva Alcántara <[hidden email]> 于2021年1月15日周五 下午4:57写道:
Hi Yang,

Thanks for your reply. I've given it a try within my container (I
copy-pasted the Dockerfile for it) but running `flink run --target local
my.jar` results in a different error for me:

"org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Could not create actor system"

Before executing `flink run --target local my.jar` I have started a flink
cluster via `start-cluster-sh`...

Salva



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Simplest way to deploy flink job on k8s for e2e testing purposes

Salva Alcántara
The example you provided works just fine, but when I replace the jar with
mine I get this error instead:

```
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Could not create actor system
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.lang.Exception: Could not create actor system
        at
org.apache.flink.runtime.clusterframework.BootstrapTools.startLocalActorSystem(BootstrapTools.java:263)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:341)
        at
org.apache.flink.runtime.minicluster.MiniCluster.createLocalRpcService(MiniCluster.java:793)
        at
org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:271)
        at
org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:87)
        at
org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81)
        at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1810)
        at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
        at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
        at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
        at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1679)
        at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:688)
        at ...
```

Any thoughts on this? However, my job runs just fine by doing `sbt run` or
from IntelliJ...



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Simplest way to deploy flink job on k8s for e2e testing purposes

Salva Alcántara
In reply to this post by Yang Wang
Hi Yang,

Just to see that I finally found the problem:

lazy val log4j = "org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.13.3"
% "provided"

This dependency was not added to my jar (but sbt was including it for the
run task, that is why it worked with `sbt run`). After adding this
dependency to the jar (or providing the library as a jar in the container)
everything worked as expected.

Thanks a lot for your help!




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Simplest way to deploy flink job on k8s for e2e testing purposes

Yang Wang
I am not very familiar with scala and glad to hear that you solved the problem.

Best,
Yang

Salva Alcántara <[hidden email]> 于2021年1月15日周五 下午9:19写道:
Hi Yang,

Just to see that I finally found the problem:

lazy val log4j = "org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.13.3"
% "provided"

This dependency was not added to my jar (but sbt was including it for the
run task, that is why it worked with `sbt run`). After adding this
dependency to the jar (or providing the library as a jar in the container)
everything worked as expected.

Thanks a lot for your help!




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/