Flink on Minikube

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink on Minikube

Sandeep khanzode
Hello,

I have a fat JAR compiled using the Man Shade plugin and everything  works correctly when I deploy it on a standalone local cluster i.e. one job and one task manager node.

But I installed Minikube and the same JAR file packaged into a docker image fails with weird serialization  errors:

Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner


… or in certain cases, if I comment out everything except the Kafka Source, then ...
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.common.requests.MetadataRequest$Builder


Is there anything I am missing with the Minikube setup? I initially tried with the steps for the Job Application cluster on the website, but I was unable to get the /usrlib mounted from the hostpath. 


So, I created a simple docker image from ...
apache/flink:1.12.1-java11

But I have not had any success getting the same job to run here. Please let me know if there are well-known steps or issues that I can check.

Thanks,
Sandeep
Reply | Threaded
Open this post in threaded view
|

Re: Flink on Minikube

Arvid Heise-4
Hi Sandeep,

The first error definitively indicates a classloading issue, which may also be the cause for the second error.

Can you describe where you put your jar inside the docker image and which execution mode you are using? As a general rule, the jar is not supposed to go into flink/lib.
Also make sure to never shade non-connector classes of Flink into your jar. A typical user jar should be ~1MB.

On Fri, Mar 19, 2021 at 8:58 PM Sandeep khanzode <[hidden email]> wrote:
Hello,

I have a fat JAR compiled using the Man Shade plugin and everything  works correctly when I deploy it on a standalone local cluster i.e. one job and one task manager node.

But I installed Minikube and the same JAR file packaged into a docker image fails with weird serialization  errors:

Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner


… or in certain cases, if I comment out everything except the Kafka Source, then ...
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.common.requests.MetadataRequest$Builder


Is there anything I am missing with the Minikube setup? I initially tried with the steps for the Job Application cluster on the website, but I was unable to get the /usrlib mounted from the hostpath. 


So, I created a simple docker image from ...
apache/flink:1.12.1-java11

But I have not had any success getting the same job to run here. Please let me know if there are well-known steps or issues that I can check.

Thanks,
Sandeep
Reply | Threaded
Open this post in threaded view
|

Re: Flink on Minikube

Sandeep khanzode
Hi Arvid,

I copy the JAR to the usrlib folder. This works in the Cloud EKS cluster. I wanted to set this up for my testing purposes.

Below is the Dockerfile:
FROM apache/flink:1.12.1-java11
RUN mv /opt/flink/opt/flink-queryable-state-runtime_2.12-1.12.1.jar /opt/flink/lib/flink-queryable-state-runtime_2.12-1.12.1.jar
ADD myJar.jar /opt/flink/usrlib/myJar.jar

… But, in my process, this is a Fat JAR created by the Maven Shade Plugin. Are you saying that all Flink classes should not be part of the user JAR? How does that work? Do we set the scope of the dependencies to compile (or, not runtime) for Flink Jars? Do we have any samples/examples that shows this? Would be really helpful.


On 22-Mar-2021, at 8:00 PM, Arvid Heise <[hidden email]> wrote:

Hi Sandeep,

The first error definitively indicates a classloading issue, which may also be the cause for the second error.

Can you describe where you put your jar inside the docker image and which execution mode you are using? As a general rule, the jar is not supposed to go into flink/lib.
Also make sure to never shade non-connector classes of Flink into your jar. A typical user jar should be ~1MB.

On Fri, Mar 19, 2021 at 8:58 PM Sandeep khanzode <[hidden email]> wrote:
Hello,

I have a fat JAR compiled using the Man Shade plugin and everything  works correctly when I deploy it on a standalone local cluster i.e. one job and one task manager node.

But I installed Minikube and the same JAR file packaged into a docker image fails with weird serialization  errors:

Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner


… or in certain cases, if I comment out everything except the Kafka Source, then ...
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.common.requests.MetadataRequest$Builder


Is there anything I am missing with the Minikube setup? I initially tried with the steps for the Job Application cluster on the website, but I was unable to get the /usrlib mounted from the hostpath. 


So, I created a simple docker image from ...
apache/flink:1.12.1-java11

But I have not had any success getting the same job to run here. Please let me know if there are well-known steps or issues that I can check.

Thanks,
Sandeep

Reply | Threaded
Open this post in threaded view
|

Re: Flink on Minikube

Arvid Heise-4
Hi Sandeep,

please have a look at [1], you should add most Flink dependencies as provided - exceptions are connectors (or in general stuff that is not in flink/lib/ or flink/plugins).


On Tue, Mar 23, 2021 at 5:28 AM Sandeep khanzode <[hidden email]> wrote:
Hi Arvid,

I copy the JAR to the usrlib folder. This works in the Cloud EKS cluster. I wanted to set this up for my testing purposes.

Below is the Dockerfile:
FROM apache/flink:1.12.1-java11
RUN mv /opt/flink/opt/flink-queryable-state-runtime_2.12-1.12.1.jar /opt/flink/lib/flink-queryable-state-runtime_2.12-1.12.1.jar
ADD myJar.jar /opt/flink/usrlib/myJar.jar

… But, in my process, this is a Fat JAR created by the Maven Shade Plugin. Are you saying that all Flink classes should not be part of the user JAR? How does that work? Do we set the scope of the dependencies to compile (or, not runtime) for Flink Jars? Do we have any samples/examples that shows this? Would be really helpful.


On 22-Mar-2021, at 8:00 PM, Arvid Heise <[hidden email]> wrote:

Hi Sandeep,

The first error definitively indicates a classloading issue, which may also be the cause for the second error.

Can you describe where you put your jar inside the docker image and which execution mode you are using? As a general rule, the jar is not supposed to go into flink/lib.
Also make sure to never shade non-connector classes of Flink into your jar. A typical user jar should be ~1MB.

On Fri, Mar 19, 2021 at 8:58 PM Sandeep khanzode <[hidden email]> wrote:
Hello,

I have a fat JAR compiled using the Man Shade plugin and everything  works correctly when I deploy it on a standalone local cluster i.e. one job and one task manager node.

But I installed Minikube and the same JAR file packaged into a docker image fails with weird serialization  errors:

Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner


… or in certain cases, if I comment out everything except the Kafka Source, then ...
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.common.requests.MetadataRequest$Builder


Is there anything I am missing with the Minikube setup? I initially tried with the steps for the Job Application cluster on the website, but I was unable to get the /usrlib mounted from the hostpath. 


So, I created a simple docker image from ...
apache/flink:1.12.1-java11

But I have not had any success getting the same job to run here. Please let me know if there are well-known steps or issues that I can check.

Thanks,
Sandeep

Reply | Threaded
Open this post in threaded view
|

Re: Flink on Minikube

Sandeep khanzode
Hi Arvid,

Thanks, will set the scope to Provided and try. 

Are there public examples in GitHub that demonstrate a sample app in Minikube? 

Sandeep

On 23-Mar-2021, at 3:17 PM, Arvid Heise <[hidden email]> wrote:

Hi Sandeep,

please have a look at [1], you should add most Flink dependencies as provided - exceptions are connectors (or in general stuff that is not in flink/lib/ or flink/plugins).


On Tue, Mar 23, 2021 at 5:28 AM Sandeep khanzode <[hidden email]> wrote:
Hi Arvid,

I copy the JAR to the usrlib folder. This works in the Cloud EKS cluster. I wanted to set this up for my testing purposes.

Below is the Dockerfile:
FROM apache/flink:1.12.1-java11
RUN mv /opt/flink/opt/flink-queryable-state-runtime_2.12-1.12.1.jar /opt/flink/lib/flink-queryable-state-runtime_2.12-1.12.1.jar
ADD myJar.jar /opt/flink/usrlib/myJar.jar

… But, in my process, this is a Fat JAR created by the Maven Shade Plugin. Are you saying that all Flink classes should not be part of the user JAR? How does that work? Do we set the scope of the dependencies to compile (or, not runtime) for Flink Jars? Do we have any samples/examples that shows this? Would be really helpful.


On 22-Mar-2021, at 8:00 PM, Arvid Heise <[hidden email]> wrote:

Hi Sandeep,

The first error definitively indicates a classloading issue, which may also be the cause for the second error.

Can you describe where you put your jar inside the docker image and which execution mode you are using? As a general rule, the jar is not supposed to go into flink/lib.
Also make sure to never shade non-connector classes of Flink into your jar. A typical user jar should be ~1MB.

On Fri, Mar 19, 2021 at 8:58 PM Sandeep khanzode <[hidden email]> wrote:
Hello,

I have a fat JAR compiled using the Man Shade plugin and everything  works correctly when I deploy it on a standalone local cluster i.e. one job and one task manager node.

But I installed Minikube and the same JAR file packaged into a docker image fails with weird serialization  errors:

Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner


… or in certain cases, if I comment out everything except the Kafka Source, then ...
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.common.requests.MetadataRequest$Builder


Is there anything I am missing with the Minikube setup? I initially tried with the steps for the Job Application cluster on the website, but I was unable to get the /usrlib mounted from the hostpath. 


So, I created a simple docker image from ...
apache/flink:1.12.1-java11

But I have not had any success getting the same job to run here. Please let me know if there are well-known steps or issues that I can check.

Thanks,
Sandeep


Reply | Threaded
Open this post in threaded view
|

Re: Flink on Minikube

rmetzger0
Hey Sandeep,

here's a project I've recently worked on, that deploys Flink on Minikube: https://github.com/rmetzger/flink-reactive-mode-k8s-demo
The project is pretty big, but I guess you can pick the bits related to the Flink deployment on minikube.

On Thu, Mar 25, 2021 at 7:48 PM Sandeep khanzode <[hidden email]> wrote:
Hi Arvid,

Thanks, will set the scope to Provided and try. 

Are there public examples in GitHub that demonstrate a sample app in Minikube? 

Sandeep

On 23-Mar-2021, at 3:17 PM, Arvid Heise <[hidden email]> wrote:

Hi Sandeep,

please have a look at [1], you should add most Flink dependencies as provided - exceptions are connectors (or in general stuff that is not in flink/lib/ or flink/plugins).


On Tue, Mar 23, 2021 at 5:28 AM Sandeep khanzode <[hidden email]> wrote:
Hi Arvid,

I copy the JAR to the usrlib folder. This works in the Cloud EKS cluster. I wanted to set this up for my testing purposes.

Below is the Dockerfile:
FROM apache/flink:1.12.1-java11
RUN mv /opt/flink/opt/flink-queryable-state-runtime_2.12-1.12.1.jar /opt/flink/lib/flink-queryable-state-runtime_2.12-1.12.1.jar
ADD myJar.jar /opt/flink/usrlib/myJar.jar

… But, in my process, this is a Fat JAR created by the Maven Shade Plugin. Are you saying that all Flink classes should not be part of the user JAR? How does that work? Do we set the scope of the dependencies to compile (or, not runtime) for Flink Jars? Do we have any samples/examples that shows this? Would be really helpful.


On 22-Mar-2021, at 8:00 PM, Arvid Heise <[hidden email]> wrote:

Hi Sandeep,

The first error definitively indicates a classloading issue, which may also be the cause for the second error.

Can you describe where you put your jar inside the docker image and which execution mode you are using? As a general rule, the jar is not supposed to go into flink/lib.
Also make sure to never shade non-connector classes of Flink into your jar. A typical user jar should be ~1MB.

On Fri, Mar 19, 2021 at 8:58 PM Sandeep khanzode <[hidden email]> wrote:
Hello,

I have a fat JAR compiled using the Man Shade plugin and everything  works correctly when I deploy it on a standalone local cluster i.e. one job and one task manager node.

But I installed Minikube and the same JAR file packaged into a docker image fails with weird serialization  errors:

Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner


… or in certain cases, if I comment out everything except the Kafka Source, then ...
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.common.requests.MetadataRequest$Builder


Is there anything I am missing with the Minikube setup? I initially tried with the steps for the Job Application cluster on the website, but I was unable to get the /usrlib mounted from the hostpath. 


So, I created a simple docker image from ...
apache/flink:1.12.1-java11

But I have not had any success getting the same job to run here. Please let me know if there are well-known steps or issues that I can check.

Thanks,
Sandeep