[BUG?] Cannot Load User Class on Local Environment

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

[BUG?] Cannot Load User Class on Local Environment

Matt
Hi all,

I'm trying to run Flink using a local environment, but on an Ignite node to achieve collocation (as mentioned in my previous message on this list).

Have a look at the code in [1]. It's pretty simple, but I'm getting a "cannot load user class" error as shown in [2].

If you check line #29 on the code, I'm able to create an instance of class Test, and it's the same context from which I'm creating the Flink job. Shouldn't it work provided I'm using a local environment?

It would be really nice to be able to inject a ClassLoader into the chunk of code that creates the job. Is this currently possible?

Any fix or workaround is appreciated!

Best,
Matt

[1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215
Reply | Threaded
Open this post in threaded view
|

Re: [BUG?] Cannot Load User Class on Local Environment

Stefan Richter
Hi,

I would expect that the local environment picks up the class path from the code that launched it. So I think the question is what happens behind the scenes when you call ignite.compute().broadcast(runnable); . Which classes are shipped and how is the classpath build in the environment that runs the code. Your example is also not fully conclusive, because com.myproj.Test (which you can successfully instantiate) and com.myproj.Test$1$2 (which fails) are different classes, so maybe only the outer class is shipped with the broadcast call. My theory is that not all classes are shipped (e.g. inner classes), but only Test . You could try three things to analyze to problem a little more:

1) Create another inner class inside Test and try if you are still able to instantiate also this class via reflection.
2) Let Test class itself implement the map function (avoiding the usage of other/inner classes) and see if this works.
3) Check and set the thread’s context classloader inside the runnable to something that contains all required classes and see if this gets picked up by Flink.

Best,
Stefan

Am 25.04.2017 um 07:27 schrieb Matt <[hidden email]>:

Hi all,

I'm trying to run Flink using a local environment, but on an Ignite node to achieve collocation (as mentioned in my previous message on this list).

Have a look at the code in [1]. It's pretty simple, but I'm getting a "cannot load user class" error as shown in [2].

If you check line #29 on the code, I'm able to create an instance of class Test, and it's the same context from which I'm creating the Flink job. Shouldn't it work provided I'm using a local environment?

It would be really nice to be able to inject a ClassLoader into the chunk of code that creates the job. Is this currently possible?

Any fix or workaround is appreciated!

Best,
Matt

[1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215

Reply | Threaded
Open this post in threaded view
|

Re: [BUG?] Cannot Load User Class on Local Environment

Matt
Hi Stefan,

Check the code here: https://gist.github.com/17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the page.

Here are the results of the additional tests you mentioned:

1. I was able to instantiate an inner class (Test$Foo) inside the Ignite closure, no problem with that
2. I tried implementing SourceFunction and SinkFunction in Test itself, I was able to instantiate the class inside the Ignite closure
3. I'm not sure what you meant in this point, is it something like what I tried in line #56?

Additionally, I tried implementing the SourceFunction and SinkFunction in Test$Foo with the same result: it says "Cannot load user class: com.test.Test$Foo"

Looks like Flink is not using the correct ClassLoader. Any idea?

Regards,
Matt

On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I would expect that the local environment picks up the class path from the code that launched it. So I think the question is what happens behind the scenes when you call ignite.compute().broadcast(runnable); . Which classes are shipped and how is the classpath build in the environment that runs the code. Your example is also not fully conclusive, because com.myproj.Test (which you can successfully instantiate) and com.myproj.Test$1$2 (which fails) are different classes, so maybe only the outer class is shipped with the broadcast call. My theory is that not all classes are shipped (e.g. inner classes), but only Test . You could try three things to analyze to problem a little more:

1) Create another inner class inside Test and try if you are still able to instantiate also this class via reflection.
2) Let Test class itself implement the map function (avoiding the usage of other/inner classes) and see if this works.
3) Check and set the thread’s context classloader inside the runnable to something that contains all required classes and see if this gets picked up by Flink.

Best,
Stefan

Am 25.04.2017 um 07:27 schrieb Matt <[hidden email]>:

Hi all,

I'm trying to run Flink using a local environment, but on an Ignite node to achieve collocation (as mentioned in my previous message on this list).

Have a look at the code in [1]. It's pretty simple, but I'm getting a "cannot load user class" error as shown in [2].

If you check line #29 on the code, I'm able to create an instance of class Test, and it's the same context from which I'm creating the Flink job. Shouldn't it work provided I'm using a local environment?

It would be really nice to be able to inject a ClassLoader into the chunk of code that creates the job. Is this currently possible?

Any fix or workaround is appreciated!

Best,
Matt

[1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215


Reply | Threaded
Open this post in threaded view
|

Re: [BUG?] Cannot Load User Class on Local Environment

Matt
I updated the code a little bit for clarity, now the line #56 mentioned in my previous message is line #25.

In summary the error I'm getting is this:

---
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.test.Test
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
---

But if I'm not wrong, after trying to load the class through URLClassLoader, Flink should try loading it with its parent ClassLoader, which should be the same ClassLoader that executed the environment, and it does have access to the class.

Not sure what is wrong.

On Tue, Apr 25, 2017 at 5:38 PM, Matt <[hidden email]> wrote:
Hi Stefan,

Check the code here: https://gist.github.com/17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the page.

Here are the results of the additional tests you mentioned:

1. I was able to instantiate an inner class (Test$Foo) inside the Ignite closure, no problem with that
2. I tried implementing SourceFunction and SinkFunction in Test itself, I was able to instantiate the class inside the Ignite closure
3. I'm not sure what you meant in this point, is it something like what I tried in line #56?

Additionally, I tried implementing the SourceFunction and SinkFunction in Test$Foo with the same result: it says "Cannot load user class: com.test.Test$Foo"

Looks like Flink is not using the correct ClassLoader. Any idea?

Regards,
Matt

On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I would expect that the local environment picks up the class path from the code that launched it. So I think the question is what happens behind the scenes when you call ignite.compute().broadcast(runnable); . Which classes are shipped and how is the classpath build in the environment that runs the code. Your example is also not fully conclusive, because com.myproj.Test (which you can successfully instantiate) and com.myproj.Test$1$2 (which fails) are different classes, so maybe only the outer class is shipped with the broadcast call. My theory is that not all classes are shipped (e.g. inner classes), but only Test . You could try three things to analyze to problem a little more:

1) Create another inner class inside Test and try if you are still able to instantiate also this class via reflection.
2) Let Test class itself implement the map function (avoiding the usage of other/inner classes) and see if this works.
3) Check and set the thread’s context classloader inside the runnable to something that contains all required classes and see if this gets picked up by Flink.

Best,
Stefan

Am 25.04.2017 um 07:27 schrieb Matt <[hidden email]>:

Hi all,

I'm trying to run Flink using a local environment, but on an Ignite node to achieve collocation (as mentioned in my previous message on this list).

Have a look at the code in [1]. It's pretty simple, but I'm getting a "cannot load user class" error as shown in [2].

If you check line #29 on the code, I'm able to create an instance of class Test, and it's the same context from which I'm creating the Flink job. Shouldn't it work provided I'm using a local environment?

It would be really nice to be able to inject a ClassLoader into the chunk of code that creates the job. Is this currently possible?

Any fix or workaround is appreciated!

Best,
Matt

[1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215



Reply | Threaded
Open this post in threaded view
|

Re: [BUG?] Cannot Load User Class on Local Environment

Stefan Richter
Ok, now the question is also about what classloaders Ignite is creating and how they are used, but the relevant code line in Flink is probably in FlinkMiniCluster.scala, line 538 (current master):

 try {
JobClient.submitJobAndWait(
clientActorSystem,
configuration,
leaderRetrievalService,
jobGraph,
timeout,
printUpdates,
this.getClass.getClassLoader())
} finally {
if(!useSingleActorSystem) {
// we have to shutdown the just created actor system
shutdownJobClientActorSystem(clientActorSystem)
}
}

This is what is executed as part of executing a job through LocalEnvironment. As we can see, the classloader is set to the classloader of FlinkMiniCluster. Depending on the classloader structure inside Ignite, this classloader might not know your user code. What you could do is changing this line in a custom Flink build, changing line 538 for example to Thread.currentThread().getContextClassloader() and ensuring that the context classloader ins the runnable is a classloader that a) knows the user code and b) is a child of the classloader that knows the Ignite and Flink classes. Notice that this is not a general solution and should not become a general fix.

I have heard that Till is about to change some things about local execution, so I included him in CC. Maybe he can provide additional hints how your use case might be better supported in the upcoming Flink 1.3.

Best,
Stefan

Am 25.04.2017 um 22:50 schrieb Matt <[hidden email]>:

I updated the code a little bit for clarity, now the line #56 mentioned in my previous message is line #25.

In summary the error I'm getting is this:

---
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.test.Test
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
---

But if I'm not wrong, after trying to load the class through URLClassLoader, Flink should try loading it with its parent ClassLoader, which should be the same ClassLoader that executed the environment, and it does have access to the class.

Not sure what is wrong.

On Tue, Apr 25, 2017 at 5:38 PM, Matt <[hidden email]> wrote:
Hi Stefan,

Check the code here: https://gist.github.com/17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the page.

Here are the results of the additional tests you mentioned:

1. I was able to instantiate an inner class (Test$Foo) inside the Ignite closure, no problem with that
2. I tried implementing SourceFunction and SinkFunction in Test itself, I was able to instantiate the class inside the Ignite closure
3. I'm not sure what you meant in this point, is it something like what I tried in line #56?

Additionally, I tried implementing the SourceFunction and SinkFunction in Test$Foo with the same result: it says "Cannot load user class: com.test.Test$Foo"

Looks like Flink is not using the correct ClassLoader. Any idea?

Regards,
Matt

On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I would expect that the local environment picks up the class path from the code that launched it. So I think the question is what happens behind the scenes when you call ignite.compute().broadcast(runnable); . Which classes are shipped and how is the classpath build in the environment that runs the code. Your example is also not fully conclusive, because com.myproj.Test (which you can successfully instantiate) and com.myproj.Test$1$2 (which fails) are different classes, so maybe only the outer class is shipped with the broadcast call. My theory is that not all classes are shipped (e.g. inner classes), but only Test . You could try three things to analyze to problem a little more:

1) Create another inner class inside Test and try if you are still able to instantiate also this class via reflection.
2) Let Test class itself implement the map function (avoiding the usage of other/inner classes) and see if this works.
3) Check and set the thread’s context classloader inside the runnable to something that contains all required classes and see if this gets picked up by Flink.

Best,
Stefan

Am 25.04.2017 um 07:27 schrieb Matt <[hidden email]>:

Hi all,

I'm trying to run Flink using a local environment, but on an Ignite node to achieve collocation (as mentioned in my previous message on this list).

Have a look at the code in [1]. It's pretty simple, but I'm getting a "cannot load user class" error as shown in [2].

If you check line #29 on the code, I'm able to create an instance of class Test, and it's the same context from which I'm creating the Flink job. Shouldn't it work provided I'm using a local environment?

It would be really nice to be able to inject a ClassLoader into the chunk of code that creates the job. Is this currently possible?

Any fix or workaround is appreciated!

Best,
Matt

[1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215




Reply | Threaded
Open this post in threaded view
|

Re: [BUG?] Cannot Load User Class on Local Environment

Matt
Let's wait for Till then, I hope he can figure this out.

On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <[hidden email]> wrote:
Ok, now the question is also about what classloaders Ignite is creating and how they are used, but the relevant code line in Flink is probably in FlinkMiniCluster.scala, line 538 (current master):

 try {
JobClient.submitJobAndWait(
clientActorSystem,
configuration,
leaderRetrievalService,
jobGraph,
timeout,
printUpdates,
this.getClass.getClassLoader())
} finally {
if(!useSingleActorSystem) {
// we have to shutdown the just created actor system
shutdownJobClientActorSystem(clientActorSystem)
}
}

This is what is executed as part of executing a job through LocalEnvironment. As we can see, the classloader is set to the classloader of FlinkMiniCluster. Depending on the classloader structure inside Ignite, this classloader might not know your user code. What you could do is changing this line in a custom Flink build, changing line 538 for example to Thread.currentThread().getContextClassloader() and ensuring that the context classloader ins the runnable is a classloader that a) knows the user code and b) is a child of the classloader that knows the Ignite and Flink classes. Notice that this is not a general solution and should not become a general fix.

I have heard that Till is about to change some things about local execution, so I included him in CC. Maybe he can provide additional hints how your use case might be better supported in the upcoming Flink 1.3.

Best,
Stefan

Am 25.04.2017 um 22:50 schrieb Matt <[hidden email]>:

I updated the code a little bit for clarity, now the line #56 mentioned in my previous message is line #25.

In summary the error I'm getting is this:

---
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.test.Test
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
---

But if I'm not wrong, after trying to load the class through URLClassLoader, Flink should try loading it with its parent ClassLoader, which should be the same ClassLoader that executed the environment, and it does have access to the class.

Not sure what is wrong.

On Tue, Apr 25, 2017 at 5:38 PM, Matt <[hidden email]> wrote:
Hi Stefan,

Check the code here: https://gist.github.com/17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the page.

Here are the results of the additional tests you mentioned:

1. I was able to instantiate an inner class (Test$Foo) inside the Ignite closure, no problem with that
2. I tried implementing SourceFunction and SinkFunction in Test itself, I was able to instantiate the class inside the Ignite closure
3. I'm not sure what you meant in this point, is it something like what I tried in line #56?

Additionally, I tried implementing the SourceFunction and SinkFunction in Test$Foo with the same result: it says "Cannot load user class: com.test.Test$Foo"

Looks like Flink is not using the correct ClassLoader. Any idea?

Regards,
Matt

On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I would expect that the local environment picks up the class path from the code that launched it. So I think the question is what happens behind the scenes when you call ignite.compute().broadcast(runnable); . Which classes are shipped and how is the classpath build in the environment that runs the code. Your example is also not fully conclusive, because com.myproj.Test (which you can successfully instantiate) and com.myproj.Test$1$2 (which fails) are different classes, so maybe only the outer class is shipped with the broadcast call. My theory is that not all classes are shipped (e.g. inner classes), but only Test . You could try three things to analyze to problem a little more:

1) Create another inner class inside Test and try if you are still able to instantiate also this class via reflection.
2) Let Test class itself implement the map function (avoiding the usage of other/inner classes) and see if this works.
3) Check and set the thread’s context classloader inside the runnable to something that contains all required classes and see if this gets picked up by Flink.

Best,
Stefan

Am 25.04.2017 um 07:27 schrieb Matt <[hidden email]>:

Hi all,

I'm trying to run Flink using a local environment, but on an Ignite node to achieve collocation (as mentioned in my previous message on this list).

Have a look at the code in [1]. It's pretty simple, but I'm getting a "cannot load user class" error as shown in [2].

If you check line #29 on the code, I'm able to create an instance of class Test, and it's the same context from which I'm creating the Flink job. Shouldn't it work provided I'm using a local environment?

It would be really nice to be able to inject a ClassLoader into the chunk of code that creates the job. Is this currently possible?

Any fix or workaround is appreciated!

Best,
Matt

[1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215





Reply | Threaded
Open this post in threaded view
|

Re: [BUG?] Cannot Load User Class on Local Environment

Till Rohrmann
I just copied my response because my other email address is not accepted on the user mailing list.

Hi Matt,

I think Stefan's analysis is correct. I have a PR open [1], where I fix the issue with the class loader.

As a side note, by doing what you're doing, you will spawn on each Ignite node a new Flink mini cluster. These mini cluster won't communicate with each other and run independently. Is this what you intend to do?


Cheers,
Till

On Wed, Apr 26, 2017 at 11:12 PM, Matt <[hidden email]> wrote:
Let's wait for Till then, I hope he can figure this out.

On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <[hidden email]> wrote:
Ok, now the question is also about what classloaders Ignite is creating and how they are used, but the relevant code line in Flink is probably in FlinkMiniCluster.scala, line 538 (current master):

 try {
JobClient.submitJobAndWait(
clientActorSystem,
configuration,
leaderRetrievalService,
jobGraph,
timeout,
printUpdates,
this.getClass.getClassLoader())
} finally {
if(!useSingleActorSystem) {
// we have to shutdown the just created actor system
shutdownJobClientActorSystem(clientActorSystem)
}
}

This is what is executed as part of executing a job through LocalEnvironment. As we can see, the classloader is set to the classloader of FlinkMiniCluster. Depending on the classloader structure inside Ignite, this classloader might not know your user code. What you could do is changing this line in a custom Flink build, changing line 538 for example to Thread.currentThread().getContextClassloader() and ensuring that the context classloader ins the runnable is a classloader that a) knows the user code and b) is a child of the classloader that knows the Ignite and Flink classes. Notice that this is not a general solution and should not become a general fix.

I have heard that Till is about to change some things about local execution, so I included him in CC. Maybe he can provide additional hints how your use case might be better supported in the upcoming Flink 1.3.

Best,
Stefan

Am 25.04.2017 um 22:50 schrieb Matt <[hidden email]>:

I updated the code a little bit for clarity, now the line #56 mentioned in my previous message is line #25.

In summary the error I'm getting is this:

---
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.test.Test
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
---

But if I'm not wrong, after trying to load the class through URLClassLoader, Flink should try loading it with its parent ClassLoader, which should be the same ClassLoader that executed the environment, and it does have access to the class.

Not sure what is wrong.

On Tue, Apr 25, 2017 at 5:38 PM, Matt <[hidden email]> wrote:
Hi Stefan,

Check the code here: https://gist.github.com/17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the page.

Here are the results of the additional tests you mentioned:

1. I was able to instantiate an inner class (Test$Foo) inside the Ignite closure, no problem with that
2. I tried implementing SourceFunction and SinkFunction in Test itself, I was able to instantiate the class inside the Ignite closure
3. I'm not sure what you meant in this point, is it something like what I tried in line #56?

Additionally, I tried implementing the SourceFunction and SinkFunction in Test$Foo with the same result: it says "Cannot load user class: com.test.Test$Foo"

Looks like Flink is not using the correct ClassLoader. Any idea?

Regards,
Matt

On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I would expect that the local environment picks up the class path from the code that launched it. So I think the question is what happens behind the scenes when you call ignite.compute().broadcast(runnable); . Which classes are shipped and how is the classpath build in the environment that runs the code. Your example is also not fully conclusive, because com.myproj.Test (which you can successfully instantiate) and com.myproj.Test$1$2 (which fails) are different classes, so maybe only the outer class is shipped with the broadcast call. My theory is that not all classes are shipped (e.g. inner classes), but only Test . You could try three things to analyze to problem a little more:

1) Create another inner class inside Test and try if you are still able to instantiate also this class via reflection.
2) Let Test class itself implement the map function (avoiding the usage of other/inner classes) and see if this works.
3) Check and set the thread’s context classloader inside the runnable to something that contains all required classes and see if this gets picked up by Flink.

Best,
Stefan

Am 25.04.2017 um 07:27 schrieb Matt <[hidden email]>:

Hi all,

I'm trying to run Flink using a local environment, but on an Ignite node to achieve collocation (as mentioned in my previous message on this list).

Have a look at the code in [1]. It's pretty simple, but I'm getting a "cannot load user class" error as shown in [2].

If you check line #29 on the code, I'm able to create an instance of class Test, and it's the same context from which I'm creating the Flink job. Shouldn't it work provided I'm using a local environment?

It would be really nice to be able to inject a ClassLoader into the chunk of code that creates the job. Is this currently possible?

Any fix or workaround is appreciated!

Best,
Matt

[1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215






Reply | Threaded
Open this post in threaded view
|

Re: [BUG?] Cannot Load User Class on Local Environment

Matt
Hi Till,

Great! Do you know if it's planned to be included in v1.2.x or should we wait for v1.3? I'll give it a try as soon as it's merged.

You're right about this approach launching a mini cluster on each Ignite node. That is intentional, as described in my previous message on the list [1].

The idea is to collocate Flink jobs on Ignite nodes, so each dataflow only processes the elements stored on the local in-memory database. I get the impression this should be much faster than randomly picking a Flink node and sending all the data over the network.

Any insight on this?

Cheers,
Matt


On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <[hidden email]> wrote:
I just copied my response because my other email address is not accepted on the user mailing list.

Hi Matt,

I think Stefan's analysis is correct. I have a PR open [1], where I fix the issue with the class loader.

As a side note, by doing what you're doing, you will spawn on each Ignite node a new Flink mini cluster. These mini cluster won't communicate with each other and run independently. Is this what you intend to do?


Cheers,
Till

On Wed, Apr 26, 2017 at 11:12 PM, Matt <[hidden email]> wrote:
Let's wait for Till then, I hope he can figure this out.

On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <[hidden email]> wrote:
Ok, now the question is also about what classloaders Ignite is creating and how they are used, but the relevant code line in Flink is probably in FlinkMiniCluster.scala, line 538 (current master):

 try {
JobClient.submitJobAndWait(
clientActorSystem,
configuration,
leaderRetrievalService,
jobGraph,
timeout,
printUpdates,
this.getClass.getClassLoader())
} finally {
if(!useSingleActorSystem) {
// we have to shutdown the just created actor system
shutdownJobClientActorSystem(clientActorSystem)
}
}

This is what is executed as part of executing a job through LocalEnvironment. As we can see, the classloader is set to the classloader of FlinkMiniCluster. Depending on the classloader structure inside Ignite, this classloader might not know your user code. What you could do is changing this line in a custom Flink build, changing line 538 for example to Thread.currentThread().getContextClassloader() and ensuring that the context classloader ins the runnable is a classloader that a) knows the user code and b) is a child of the classloader that knows the Ignite and Flink classes. Notice that this is not a general solution and should not become a general fix.

I have heard that Till is about to change some things about local execution, so I included him in CC. Maybe he can provide additional hints how your use case might be better supported in the upcoming Flink 1.3.

Best,
Stefan

Am 25.04.2017 um 22:50 schrieb Matt <[hidden email]>:

I updated the code a little bit for clarity, now the line #56 mentioned in my previous message is line #25.

In summary the error I'm getting is this:

---
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.test.Test
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
---

But if I'm not wrong, after trying to load the class through URLClassLoader, Flink should try loading it with its parent ClassLoader, which should be the same ClassLoader that executed the environment, and it does have access to the class.

Not sure what is wrong.

On Tue, Apr 25, 2017 at 5:38 PM, Matt <[hidden email]> wrote:
Hi Stefan,

Check the code here: https://gist.github.com/17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the page.

Here are the results of the additional tests you mentioned:

1. I was able to instantiate an inner class (Test$Foo) inside the Ignite closure, no problem with that
2. I tried implementing SourceFunction and SinkFunction in Test itself, I was able to instantiate the class inside the Ignite closure
3. I'm not sure what you meant in this point, is it something like what I tried in line #56?

Additionally, I tried implementing the SourceFunction and SinkFunction in Test$Foo with the same result: it says "Cannot load user class: com.test.Test$Foo"

Looks like Flink is not using the correct ClassLoader. Any idea?

Regards,
Matt

On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I would expect that the local environment picks up the class path from the code that launched it. So I think the question is what happens behind the scenes when you call ignite.compute().broadcast(runnable); . Which classes are shipped and how is the classpath build in the environment that runs the code. Your example is also not fully conclusive, because com.myproj.Test (which you can successfully instantiate) and com.myproj.Test$1$2 (which fails) are different classes, so maybe only the outer class is shipped with the broadcast call. My theory is that not all classes are shipped (e.g. inner classes), but only Test . You could try three things to analyze to problem a little more:

1) Create another inner class inside Test and try if you are still able to instantiate also this class via reflection.
2) Let Test class itself implement the map function (avoiding the usage of other/inner classes) and see if this works.
3) Check and set the thread’s context classloader inside the runnable to something that contains all required classes and see if this gets picked up by Flink.

Best,
Stefan

Am 25.04.2017 um 07:27 schrieb Matt <[hidden email]>:

Hi all,

I'm trying to run Flink using a local environment, but on an Ignite node to achieve collocation (as mentioned in my previous message on this list).

Have a look at the code in [1]. It's pretty simple, but I'm getting a "cannot load user class" error as shown in [2].

If you check line #29 on the code, I'm able to create an instance of class Test, and it's the same context from which I'm creating the Flink job. Shouldn't it work provided I'm using a local environment?

It would be really nice to be able to inject a ClassLoader into the chunk of code that creates the job. Is this currently possible?

Any fix or workaround is appreciated!

Best,
Matt

[1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215







Reply | Threaded
Open this post in threaded view
|

Re: [BUG?] Cannot Load User Class on Local Environment

Matt
Hi Till,

I just tried with Flink 1.4 by compiling the current master branch on GitHub (as of this morning) and I still find the same problem as before. If I'm not wrong your PR was merged already, so your fixes should be part of the binary.

I hope you have time to have a look at the test case in [1].

Best,
Matt


On Thu, Apr 27, 2017 at 10:09 AM, Matt <[hidden email]> wrote:
Hi Till,

Great! Do you know if it's planned to be included in v1.2.x or should we wait for v1.3? I'll give it a try as soon as it's merged.

You're right about this approach launching a mini cluster on each Ignite node. That is intentional, as described in my previous message on the list [1].

The idea is to collocate Flink jobs on Ignite nodes, so each dataflow only processes the elements stored on the local in-memory database. I get the impression this should be much faster than randomly picking a Flink node and sending all the data over the network.

Any insight on this?

Cheers,
Matt


On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <[hidden email]> wrote:
I just copied my response because my other email address is not accepted on the user mailing list.

Hi Matt,

I think Stefan's analysis is correct. I have a PR open [1], where I fix the issue with the class loader.

As a side note, by doing what you're doing, you will spawn on each Ignite node a new Flink mini cluster. These mini cluster won't communicate with each other and run independently. Is this what you intend to do?


Cheers,
Till

On Wed, Apr 26, 2017 at 11:12 PM, Matt <[hidden email]> wrote:
Let's wait for Till then, I hope he can figure this out.

On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <[hidden email]> wrote:
Ok, now the question is also about what classloaders Ignite is creating and how they are used, but the relevant code line in Flink is probably in FlinkMiniCluster.scala, line 538 (current master):

 try {
JobClient.submitJobAndWait(
clientActorSystem,
configuration,
leaderRetrievalService,
jobGraph,
timeout,
printUpdates,
this.getClass.getClassLoader())
} finally {
if(!useSingleActorSystem) {
// we have to shutdown the just created actor system
shutdownJobClientActorSystem(clientActorSystem)
}
}

This is what is executed as part of executing a job through LocalEnvironment. As we can see, the classloader is set to the classloader of FlinkMiniCluster. Depending on the classloader structure inside Ignite, this classloader might not know your user code. What you could do is changing this line in a custom Flink build, changing line 538 for example to Thread.currentThread().getContextClassloader() and ensuring that the context classloader ins the runnable is a classloader that a) knows the user code and b) is a child of the classloader that knows the Ignite and Flink classes. Notice that this is not a general solution and should not become a general fix.

I have heard that Till is about to change some things about local execution, so I included him in CC. Maybe he can provide additional hints how your use case might be better supported in the upcoming Flink 1.3.

Best,
Stefan

Am 25.04.2017 um 22:50 schrieb Matt <[hidden email]>:

I updated the code a little bit for clarity, now the line #56 mentioned in my previous message is line #25.

In summary the error I'm getting is this:

---
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.test.Test
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
---

But if I'm not wrong, after trying to load the class through URLClassLoader, Flink should try loading it with its parent ClassLoader, which should be the same ClassLoader that executed the environment, and it does have access to the class.

Not sure what is wrong.

On Tue, Apr 25, 2017 at 5:38 PM, Matt <[hidden email]> wrote:
Hi Stefan,

Check the code here: https://gist.github.com/17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the page.

Here are the results of the additional tests you mentioned:

1. I was able to instantiate an inner class (Test$Foo) inside the Ignite closure, no problem with that
2. I tried implementing SourceFunction and SinkFunction in Test itself, I was able to instantiate the class inside the Ignite closure
3. I'm not sure what you meant in this point, is it something like what I tried in line #56?

Additionally, I tried implementing the SourceFunction and SinkFunction in Test$Foo with the same result: it says "Cannot load user class: com.test.Test$Foo"

Looks like Flink is not using the correct ClassLoader. Any idea?

Regards,
Matt

On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I would expect that the local environment picks up the class path from the code that launched it. So I think the question is what happens behind the scenes when you call ignite.compute().broadcast(runnable); . Which classes are shipped and how is the classpath build in the environment that runs the code. Your example is also not fully conclusive, because com.myproj.Test (which you can successfully instantiate) and com.myproj.Test$1$2 (which fails) are different classes, so maybe only the outer class is shipped with the broadcast call. My theory is that not all classes are shipped (e.g. inner classes), but only Test . You could try three things to analyze to problem a little more:

1) Create another inner class inside Test and try if you are still able to instantiate also this class via reflection.
2) Let Test class itself implement the map function (avoiding the usage of other/inner classes) and see if this works.
3) Check and set the thread’s context classloader inside the runnable to something that contains all required classes and see if this gets picked up by Flink.

Best,
Stefan

Am 25.04.2017 um 07:27 schrieb Matt <[hidden email]>:

Hi all,

I'm trying to run Flink using a local environment, but on an Ignite node to achieve collocation (as mentioned in my previous message on this list).

Have a look at the code in [1]. It's pretty simple, but I'm getting a "cannot load user class" error as shown in [2].

If you check line #29 on the code, I'm able to create an instance of class Test, and it's the same context from which I'm creating the Flink job. Shouldn't it work provided I'm using a local environment?

It would be really nice to be able to inject a ClassLoader into the chunk of code that creates the job. Is this currently possible?

Any fix or workaround is appreciated!

Best,
Matt

[1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215








Reply | Threaded
Open this post in threaded view
|

Re: [BUG?] Cannot Load User Class on Local Environment

Till Rohrmann

Hi Matt,

alright, then we have to look into it again. I tried to run your example, however, it does not seem to be self-contained. Using Ignite 2.0.0 with -DIGNITE_QUIET=false -Xms512m the Ignite object seems to be stuck in Ignite#start. In the logs I see the following warning:

May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger warning
WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses (it is recommended in production to specify at least one address in TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)
May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger warning
WARNING: IP finder returned empty addresses list. Please check IP finder configuration and make sure multicast works on your network. Will retry every 2 secs.

However, I assume that this is not critical.

Maybe you can tell me how I can run your example in order to debug it.

Cheers,
Till


On Mon, May 15, 2017 at 10:05 PM, Matt <[hidden email]> wrote:
Hi Till,

I just tried with Flink 1.4 by compiling the current master branch on GitHub (as of this morning) and I still find the same problem as before. If I'm not wrong your PR was merged already, so your fixes should be part of the binary.

I hope you have time to have a look at the test case in [1].

Best,
Matt


On Thu, Apr 27, 2017 at 10:09 AM, Matt <[hidden email]> wrote:
Hi Till,

Great! Do you know if it's planned to be included in v1.2.x or should we wait for v1.3? I'll give it a try as soon as it's merged.

You're right about this approach launching a mini cluster on each Ignite node. That is intentional, as described in my previous message on the list [1].

The idea is to collocate Flink jobs on Ignite nodes, so each dataflow only processes the elements stored on the local in-memory database. I get the impression this should be much faster than randomly picking a Flink node and sending all the data over the network.

Any insight on this?

Cheers,
Matt


On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <[hidden email]> wrote:
I just copied my response because my other email address is not accepted on the user mailing list.

Hi Matt,

I think Stefan's analysis is correct. I have a PR open [1], where I fix the issue with the class loader.

As a side note, by doing what you're doing, you will spawn on each Ignite node a new Flink mini cluster. These mini cluster won't communicate with each other and run independently. Is this what you intend to do?


Cheers,
Till

On Wed, Apr 26, 2017 at 11:12 PM, Matt <[hidden email]> wrote:
Let's wait for Till then, I hope he can figure this out.

On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <[hidden email]> wrote:
Ok, now the question is also about what classloaders Ignite is creating and how they are used, but the relevant code line in Flink is probably in FlinkMiniCluster.scala, line 538 (current master):

 try {
JobClient.submitJobAndWait(
clientActorSystem,
configuration,
leaderRetrievalService,
jobGraph,
timeout,
printUpdates,
this.getClass.getClassLoader())
} finally {
if(!useSingleActorSystem) {
// we have to shutdown the just created actor system
shutdownJobClientActorSystem(clientActorSystem)
}
}

This is what is executed as part of executing a job through LocalEnvironment. As we can see, the classloader is set to the classloader of FlinkMiniCluster. Depending on the classloader structure inside Ignite, this classloader might not know your user code. What you could do is changing this line in a custom Flink build, changing line 538 for example to Thread.currentThread().getContextClassloader() and ensuring that the context classloader ins the runnable is a classloader that a) knows the user code and b) is a child of the classloader that knows the Ignite and Flink classes. Notice that this is not a general solution and should not become a general fix.

I have heard that Till is about to change some things about local execution, so I included him in CC. Maybe he can provide additional hints how your use case might be better supported in the upcoming Flink 1.3.

Best,
Stefan

Am 25.04.2017 um 22:50 schrieb Matt <[hidden email]>:

I updated the code a little bit for clarity, now the line #56 mentioned in my previous message is line #25.

In summary the error I'm getting is this:

---
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.test.Test
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
---

But if I'm not wrong, after trying to load the class through URLClassLoader, Flink should try loading it with its parent ClassLoader, which should be the same ClassLoader that executed the environment, and it does have access to the class.

Not sure what is wrong.

On Tue, Apr 25, 2017 at 5:38 PM, Matt <[hidden email]> wrote:
Hi Stefan,

Check the code here: https://gist.github.com/17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the page.

Here are the results of the additional tests you mentioned:

1. I was able to instantiate an inner class (Test$Foo) inside the Ignite closure, no problem with that
2. I tried implementing SourceFunction and SinkFunction in Test itself, I was able to instantiate the class inside the Ignite closure
3. I'm not sure what you meant in this point, is it something like what I tried in line #56?

Additionally, I tried implementing the SourceFunction and SinkFunction in Test$Foo with the same result: it says "Cannot load user class: com.test.Test$Foo"

Looks like Flink is not using the correct ClassLoader. Any idea?

Regards,
Matt

On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I would expect that the local environment picks up the class path from the code that launched it. So I think the question is what happens behind the scenes when you call ignite.compute().broadcast(runnable); . Which classes are shipped and how is the classpath build in the environment that runs the code. Your example is also not fully conclusive, because com.myproj.Test (which you can successfully instantiate) and com.myproj.Test$1$2 (which fails) are different classes, so maybe only the outer class is shipped with the broadcast call. My theory is that not all classes are shipped (e.g. inner classes), but only Test . You could try three things to analyze to problem a little more:

1) Create another inner class inside Test and try if you are still able to instantiate also this class via reflection.
2) Let Test class itself implement the map function (avoiding the usage of other/inner classes) and see if this works.
3) Check and set the thread’s context classloader inside the runnable to something that contains all required classes and see if this gets picked up by Flink.

Best,
Stefan

Am 25.04.2017 um 07:27 schrieb Matt <[hidden email]>:

Hi all,

I'm trying to run Flink using a local environment, but on an Ignite node to achieve collocation (as mentioned in my previous message on this list).

Have a look at the code in [1]. It's pretty simple, but I'm getting a "cannot load user class" error as shown in [2].

If you check line #29 on the code, I'm able to create an instance of class Test, and it's the same context from which I'm creating the Flink job. Shouldn't it work provided I'm using a local environment?

It would be really nice to be able to inject a ClassLoader into the chunk of code that creates the job. Is this currently possible?

Any fix or workaround is appreciated!

Best,
Matt

[1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215









Reply | Threaded
Open this post in threaded view
|

Re: [BUG?] Cannot Load User Class on Local Environment

Matt
Thanks for your help Till.

I will create a self contained test case in a moment and send you the link, wait for it.

Cheers,
Matt

On Wed, May 17, 2017 at 4:38 AM, Till Rohrmann <[hidden email]> wrote:

Hi Matt,

alright, then we have to look into it again. I tried to run your example, however, it does not seem to be self-contained. Using Ignite 2.0.0 with -DIGNITE_QUIET=false -Xms512m the Ignite object seems to be stuck in Ignite#start. In the logs I see the following warning:

May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger warning
WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses (it is recommended in production to specify at least one address in TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)
May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger warning
WARNING: IP finder returned empty addresses list. Please check IP finder configuration and make sure multicast works on your network. Will retry every 2 secs.

However, I assume that this is not critical.

Maybe you can tell me how I can run your example in order to debug it.

Cheers,
Till


On Mon, May 15, 2017 at 10:05 PM, Matt <[hidden email]> wrote:
Hi Till,

I just tried with Flink 1.4 by compiling the current master branch on GitHub (as of this morning) and I still find the same problem as before. If I'm not wrong your PR was merged already, so your fixes should be part of the binary.

I hope you have time to have a look at the test case in [1].

Best,
Matt


On Thu, Apr 27, 2017 at 10:09 AM, Matt <[hidden email]> wrote:
Hi Till,

Great! Do you know if it's planned to be included in v1.2.x or should we wait for v1.3? I'll give it a try as soon as it's merged.

You're right about this approach launching a mini cluster on each Ignite node. That is intentional, as described in my previous message on the list [1].

The idea is to collocate Flink jobs on Ignite nodes, so each dataflow only processes the elements stored on the local in-memory database. I get the impression this should be much faster than randomly picking a Flink node and sending all the data over the network.

Any insight on this?

Cheers,
Matt


On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <[hidden email]> wrote:
I just copied my response because my other email address is not accepted on the user mailing list.

Hi Matt,

I think Stefan's analysis is correct. I have a PR open [1], where I fix the issue with the class loader.

As a side note, by doing what you're doing, you will spawn on each Ignite node a new Flink mini cluster. These mini cluster won't communicate with each other and run independently. Is this what you intend to do?


Cheers,
Till

On Wed, Apr 26, 2017 at 11:12 PM, Matt <[hidden email]> wrote:
Let's wait for Till then, I hope he can figure this out.

On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <[hidden email]> wrote:
Ok, now the question is also about what classloaders Ignite is creating and how they are used, but the relevant code line in Flink is probably in FlinkMiniCluster.scala, line 538 (current master):

 try {
JobClient.submitJobAndWait(
clientActorSystem,
configuration,
leaderRetrievalService,
jobGraph,
timeout,
printUpdates,
this.getClass.getClassLoader())
} finally {
if(!useSingleActorSystem) {
// we have to shutdown the just created actor system
shutdownJobClientActorSystem(clientActorSystem)
}
}

This is what is executed as part of executing a job through LocalEnvironment. As we can see, the classloader is set to the classloader of FlinkMiniCluster. Depending on the classloader structure inside Ignite, this classloader might not know your user code. What you could do is changing this line in a custom Flink build, changing line 538 for example to Thread.currentThread().getContextClassloader() and ensuring that the context classloader ins the runnable is a classloader that a) knows the user code and b) is a child of the classloader that knows the Ignite and Flink classes. Notice that this is not a general solution and should not become a general fix.

I have heard that Till is about to change some things about local execution, so I included him in CC. Maybe he can provide additional hints how your use case might be better supported in the upcoming Flink 1.3.

Best,
Stefan

Am 25.04.2017 um 22:50 schrieb Matt <[hidden email]>:

I updated the code a little bit for clarity, now the line #56 mentioned in my previous message is line #25.

In summary the error I'm getting is this:

---
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.test.Test
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
---

But if I'm not wrong, after trying to load the class through URLClassLoader, Flink should try loading it with its parent ClassLoader, which should be the same ClassLoader that executed the environment, and it does have access to the class.

Not sure what is wrong.

On Tue, Apr 25, 2017 at 5:38 PM, Matt <[hidden email]> wrote:
Hi Stefan,

Check the code here: https://gist.github.com/17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the page.

Here are the results of the additional tests you mentioned:

1. I was able to instantiate an inner class (Test$Foo) inside the Ignite closure, no problem with that
2. I tried implementing SourceFunction and SinkFunction in Test itself, I was able to instantiate the class inside the Ignite closure
3. I'm not sure what you meant in this point, is it something like what I tried in line #56?

Additionally, I tried implementing the SourceFunction and SinkFunction in Test$Foo with the same result: it says "Cannot load user class: com.test.Test$Foo"

Looks like Flink is not using the correct ClassLoader. Any idea?

Regards,
Matt

On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I would expect that the local environment picks up the class path from the code that launched it. So I think the question is what happens behind the scenes when you call ignite.compute().broadcast(runnable); . Which classes are shipped and how is the classpath build in the environment that runs the code. Your example is also not fully conclusive, because com.myproj.Test (which you can successfully instantiate) and com.myproj.Test$1$2 (which fails) are different classes, so maybe only the outer class is shipped with the broadcast call. My theory is that not all classes are shipped (e.g. inner classes), but only Test . You could try three things to analyze to problem a little more:

1) Create another inner class inside Test and try if you are still able to instantiate also this class via reflection.
2) Let Test class itself implement the map function (avoiding the usage of other/inner classes) and see if this works.
3) Check and set the thread’s context classloader inside the runnable to something that contains all required classes and see if this gets picked up by Flink.

Best,
Stefan

Am 25.04.2017 um 07:27 schrieb Matt <[hidden email]>:

Hi all,

I'm trying to run Flink using a local environment, but on an Ignite node to achieve collocation (as mentioned in my previous message on this list).

Have a look at the code in [1]. It's pretty simple, but I'm getting a "cannot load user class" error as shown in [2].

If you check line #29 on the code, I'm able to create an instance of class Test, and it's the same context from which I'm creating the Flink job. Shouldn't it work provided I'm using a local environment?

It would be really nice to be able to inject a ClassLoader into the chunk of code that creates the job. Is this currently possible?

Any fix or workaround is appreciated!

Best,
Matt

[1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215










Reply | Threaded
Open this post in threaded view
|

Re: [BUG?] Cannot Load User Class on Local Environment

Matt
Check the repo at [1].

The important step which I think is what you missed is running an Ignite node on your computer so the Java code, which launches an Ignite client on the JVM, connects to it and executes Flink on that node on a local environment.

Be aware "peerClassLoadingEnabled" should be enabled (as in ignite.xml), because it must match the config on the client node.

If you follow the Readme file it's everything there, if you have any problem let me know!

Cheers,
Matt


On Wed, May 17, 2017 at 3:49 PM, Matt <[hidden email]> wrote:
Thanks for your help Till.

I will create a self contained test case in a moment and send you the link, wait for it.

Cheers,
Matt

On Wed, May 17, 2017 at 4:38 AM, Till Rohrmann <[hidden email]> wrote:

Hi Matt,

alright, then we have to look into it again. I tried to run your example, however, it does not seem to be self-contained. Using Ignite 2.0.0 with -DIGNITE_QUIET=false -Xms512m the Ignite object seems to be stuck in Ignite#start. In the logs I see the following warning:

May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger warning
WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses (it is recommended in production to specify at least one address in TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)
May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger warning
WARNING: IP finder returned empty addresses list. Please check IP finder configuration and make sure multicast works on your network. Will retry every 2 secs.

However, I assume that this is not critical.

Maybe you can tell me how I can run your example in order to debug it.

Cheers,
Till


On Mon, May 15, 2017 at 10:05 PM, Matt <[hidden email]> wrote:
Hi Till,

I just tried with Flink 1.4 by compiling the current master branch on GitHub (as of this morning) and I still find the same problem as before. If I'm not wrong your PR was merged already, so your fixes should be part of the binary.

I hope you have time to have a look at the test case in [1].

Best,
Matt


On Thu, Apr 27, 2017 at 10:09 AM, Matt <[hidden email]> wrote:
Hi Till,

Great! Do you know if it's planned to be included in v1.2.x or should we wait for v1.3? I'll give it a try as soon as it's merged.

You're right about this approach launching a mini cluster on each Ignite node. That is intentional, as described in my previous message on the list [1].

The idea is to collocate Flink jobs on Ignite nodes, so each dataflow only processes the elements stored on the local in-memory database. I get the impression this should be much faster than randomly picking a Flink node and sending all the data over the network.

Any insight on this?

Cheers,
Matt


On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <[hidden email]> wrote:
I just copied my response because my other email address is not accepted on the user mailing list.

Hi Matt,

I think Stefan's analysis is correct. I have a PR open [1], where I fix the issue with the class loader.

As a side note, by doing what you're doing, you will spawn on each Ignite node a new Flink mini cluster. These mini cluster won't communicate with each other and run independently. Is this what you intend to do?


Cheers,
Till

On Wed, Apr 26, 2017 at 11:12 PM, Matt <[hidden email]> wrote:
Let's wait for Till then, I hope he can figure this out.

On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <[hidden email]> wrote:
Ok, now the question is also about what classloaders Ignite is creating and how they are used, but the relevant code line in Flink is probably in FlinkMiniCluster.scala, line 538 (current master):

 try {
JobClient.submitJobAndWait(
clientActorSystem,
configuration,
leaderRetrievalService,
jobGraph,
timeout,
printUpdates,
this.getClass.getClassLoader())
} finally {
if(!useSingleActorSystem) {
// we have to shutdown the just created actor system
shutdownJobClientActorSystem(clientActorSystem)
}
}

This is what is executed as part of executing a job through LocalEnvironment. As we can see, the classloader is set to the classloader of FlinkMiniCluster. Depending on the classloader structure inside Ignite, this classloader might not know your user code. What you could do is changing this line in a custom Flink build, changing line 538 for example to Thread.currentThread().getContextClassloader() and ensuring that the context classloader ins the runnable is a classloader that a) knows the user code and b) is a child of the classloader that knows the Ignite and Flink classes. Notice that this is not a general solution and should not become a general fix.

I have heard that Till is about to change some things about local execution, so I included him in CC. Maybe he can provide additional hints how your use case might be better supported in the upcoming Flink 1.3.

Best,
Stefan

Am 25.04.2017 um 22:50 schrieb Matt <[hidden email]>:

I updated the code a little bit for clarity, now the line #56 mentioned in my previous message is line #25.

In summary the error I'm getting is this:

---
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.test.Test
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
---

But if I'm not wrong, after trying to load the class through URLClassLoader, Flink should try loading it with its parent ClassLoader, which should be the same ClassLoader that executed the environment, and it does have access to the class.

Not sure what is wrong.

On Tue, Apr 25, 2017 at 5:38 PM, Matt <[hidden email]> wrote:
Hi Stefan,

Check the code here: https://gist.github.com/17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the page.

Here are the results of the additional tests you mentioned:

1. I was able to instantiate an inner class (Test$Foo) inside the Ignite closure, no problem with that
2. I tried implementing SourceFunction and SinkFunction in Test itself, I was able to instantiate the class inside the Ignite closure
3. I'm not sure what you meant in this point, is it something like what I tried in line #56?

Additionally, I tried implementing the SourceFunction and SinkFunction in Test$Foo with the same result: it says "Cannot load user class: com.test.Test$Foo"

Looks like Flink is not using the correct ClassLoader. Any idea?

Regards,
Matt

On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I would expect that the local environment picks up the class path from the code that launched it. So I think the question is what happens behind the scenes when you call ignite.compute().broadcast(runnable); . Which classes are shipped and how is the classpath build in the environment that runs the code. Your example is also not fully conclusive, because com.myproj.Test (which you can successfully instantiate) and com.myproj.Test$1$2 (which fails) are different classes, so maybe only the outer class is shipped with the broadcast call. My theory is that not all classes are shipped (e.g. inner classes), but only Test . You could try three things to analyze to problem a little more:

1) Create another inner class inside Test and try if you are still able to instantiate also this class via reflection.
2) Let Test class itself implement the map function (avoiding the usage of other/inner classes) and see if this works.
3) Check and set the thread’s context classloader inside the runnable to something that contains all required classes and see if this gets picked up by Flink.

Best,
Stefan

Am 25.04.2017 um 07:27 schrieb Matt <[hidden email]>:

Hi all,

I'm trying to run Flink using a local environment, but on an Ignite node to achieve collocation (as mentioned in my previous message on this list).

Have a look at the code in [1]. It's pretty simple, but I'm getting a "cannot load user class" error as shown in [2].

If you check line #29 on the code, I'm able to create an instance of class Test, and it's the same context from which I'm creating the Flink job. Shouldn't it work provided I'm using a local environment?

It would be really nice to be able to inject a ClassLoader into the chunk of code that creates the job. Is this currently possible?

Any fix or workaround is appreciated!

Best,
Matt

[1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215











Reply | Threaded
Open this post in threaded view
|

Re: [BUG?] Cannot Load User Class on Local Environment

Till Rohrmann
Hi Matt,

sorry for not coming back to you sooner. We're currently in the release phase and this consumes a lot of capacities.

I tried to go to the linked repo, but Github tells me that it does not exist. Have you removed it?

Cheers,
Till

On Wed, May 17, 2017 at 10:56 PM, Matt <[hidden email]> wrote:
Check the repo at [1].

The important step which I think is what you missed is running an Ignite node on your computer so the Java code, which launches an Ignite client on the JVM, connects to it and executes Flink on that node on a local environment.

Be aware "peerClassLoadingEnabled" should be enabled (as in ignite.xml), because it must match the config on the client node.

If you follow the Readme file it's everything there, if you have any problem let me know!

Cheers,
Matt


On Wed, May 17, 2017 at 3:49 PM, Matt <[hidden email]> wrote:
Thanks for your help Till.

I will create a self contained test case in a moment and send you the link, wait for it.

Cheers,
Matt

On Wed, May 17, 2017 at 4:38 AM, Till Rohrmann <[hidden email]> wrote:

Hi Matt,

alright, then we have to look into it again. I tried to run your example, however, it does not seem to be self-contained. Using Ignite 2.0.0 with -DIGNITE_QUIET=false -Xms512m the Ignite object seems to be stuck in Ignite#start. In the logs I see the following warning:

May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger warning
WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses (it is recommended in production to specify at least one address in TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)
May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger warning
WARNING: IP finder returned empty addresses list. Please check IP finder configuration and make sure multicast works on your network. Will retry every 2 secs.

However, I assume that this is not critical.

Maybe you can tell me how I can run your example in order to debug it.

Cheers,
Till


On Mon, May 15, 2017 at 10:05 PM, Matt <[hidden email]> wrote:
Hi Till,

I just tried with Flink 1.4 by compiling the current master branch on GitHub (as of this morning) and I still find the same problem as before. If I'm not wrong your PR was merged already, so your fixes should be part of the binary.

I hope you have time to have a look at the test case in [1].

Best,
Matt


On Thu, Apr 27, 2017 at 10:09 AM, Matt <[hidden email]> wrote:
Hi Till,

Great! Do you know if it's planned to be included in v1.2.x or should we wait for v1.3? I'll give it a try as soon as it's merged.

You're right about this approach launching a mini cluster on each Ignite node. That is intentional, as described in my previous message on the list [1].

The idea is to collocate Flink jobs on Ignite nodes, so each dataflow only processes the elements stored on the local in-memory database. I get the impression this should be much faster than randomly picking a Flink node and sending all the data over the network.

Any insight on this?

Cheers,
Matt


On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <[hidden email]> wrote:
I just copied my response because my other email address is not accepted on the user mailing list.

Hi Matt,

I think Stefan's analysis is correct. I have a PR open [1], where I fix the issue with the class loader.

As a side note, by doing what you're doing, you will spawn on each Ignite node a new Flink mini cluster. These mini cluster won't communicate with each other and run independently. Is this what you intend to do?


Cheers,
Till

On Wed, Apr 26, 2017 at 11:12 PM, Matt <[hidden email]> wrote:
Let's wait for Till then, I hope he can figure this out.

On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <[hidden email]> wrote:
Ok, now the question is also about what classloaders Ignite is creating and how they are used, but the relevant code line in Flink is probably in FlinkMiniCluster.scala, line 538 (current master):

 try {
JobClient.submitJobAndWait(
clientActorSystem,
configuration,
leaderRetrievalService,
jobGraph,
timeout,
printUpdates,
this.getClass.getClassLoader())
} finally {
if(!useSingleActorSystem) {
// we have to shutdown the just created actor system
shutdownJobClientActorSystem(clientActorSystem)
}
}

This is what is executed as part of executing a job through LocalEnvironment. As we can see, the classloader is set to the classloader of FlinkMiniCluster. Depending on the classloader structure inside Ignite, this classloader might not know your user code. What you could do is changing this line in a custom Flink build, changing line 538 for example to Thread.currentThread().getContextClassloader() and ensuring that the context classloader ins the runnable is a classloader that a) knows the user code and b) is a child of the classloader that knows the Ignite and Flink classes. Notice that this is not a general solution and should not become a general fix.

I have heard that Till is about to change some things about local execution, so I included him in CC. Maybe he can provide additional hints how your use case might be better supported in the upcoming Flink 1.3.

Best,
Stefan

Am 25.04.2017 um 22:50 schrieb Matt <[hidden email]>:

I updated the code a little bit for clarity, now the line #56 mentioned in my previous message is line #25.

In summary the error I'm getting is this:

---
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.test.Test
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
---

But if I'm not wrong, after trying to load the class through URLClassLoader, Flink should try loading it with its parent ClassLoader, which should be the same ClassLoader that executed the environment, and it does have access to the class.

Not sure what is wrong.

On Tue, Apr 25, 2017 at 5:38 PM, Matt <[hidden email]> wrote:
Hi Stefan,

Check the code here: https://gist.github.com/17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the page.

Here are the results of the additional tests you mentioned:

1. I was able to instantiate an inner class (Test$Foo) inside the Ignite closure, no problem with that
2. I tried implementing SourceFunction and SinkFunction in Test itself, I was able to instantiate the class inside the Ignite closure
3. I'm not sure what you meant in this point, is it something like what I tried in line #56?

Additionally, I tried implementing the SourceFunction and SinkFunction in Test$Foo with the same result: it says "Cannot load user class: com.test.Test$Foo"

Looks like Flink is not using the correct ClassLoader. Any idea?

Regards,
Matt

On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I would expect that the local environment picks up the class path from the code that launched it. So I think the question is what happens behind the scenes when you call ignite.compute().broadcast(runnable); . Which classes are shipped and how is the classpath build in the environment that runs the code. Your example is also not fully conclusive, because com.myproj.Test (which you can successfully instantiate) and com.myproj.Test$1$2 (which fails) are different classes, so maybe only the outer class is shipped with the broadcast call. My theory is that not all classes are shipped (e.g. inner classes), but only Test . You could try three things to analyze to problem a little more:

1) Create another inner class inside Test and try if you are still able to instantiate also this class via reflection.
2) Let Test class itself implement the map function (avoiding the usage of other/inner classes) and see if this works.
3) Check and set the thread’s context classloader inside the runnable to something that contains all required classes and see if this gets picked up by Flink.

Best,
Stefan

Am 25.04.2017 um 07:27 schrieb Matt <[hidden email]>:

Hi all,

I'm trying to run Flink using a local environment, but on an Ignite node to achieve collocation (as mentioned in my previous message on this list).

Have a look at the code in [1]. It's pretty simple, but I'm getting a "cannot load user class" error as shown in [2].

If you check line #29 on the code, I'm able to create an instance of class Test, and it's the same context from which I'm creating the Flink job. Shouldn't it work provided I'm using a local environment?

It would be really nice to be able to inject a ClassLoader into the chunk of code that creates the job. Is this currently possible?

Any fix or workaround is appreciated!

Best,
Matt

[1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215












Reply | Threaded
Open this post in threaded view
|

Re: [BUG?] Cannot Load User Class on Local Environment

Matt
Thanks for looking into it Till!

I'll try changing that line locally and then send a JIRA issue. When it gets officially fixed I'll probably create an Ignite-Flink connector to replace the older and less efficient one [1]. Users will be able to create Flink jobs on Ignite nodes, right where the data is stored.


Best,
Matt



Matt

On Mon, May 29, 2017 at 9:37 AM, Till Rohrmann <[hidden email]> wrote:

Hi Matt,

I looked into it and it seems that the Task does not respect the context class loader. The problem is that the local mode was not developed with the intention to be executed within something like Ignite or an application server. It rather assumes that you have a user code jar which is sent to the TaskManager. This jar is then added to an URLClassLoader which is used for user code class loading. In the case of the local execution mode, Flink assumes that all user code jars are in the system class loader (which usually holds true when running examples from the IDE). That is the reason why we don’t check the TCCL. In order to fix your problem you can replace BlobLibraryCacheManager.java:298 with this.classLoader = new FlinkUserCodeClassLoader(libraryURLs, Thread.currentThread().getContextClassLoader());. Alternatively, you can build your job, copy the user code jar to IGNITE_HOME/libs and then restart ignite.

If you want to get the TCCL problem properly fixed, I suggest to open a JIRA issue here [1].

[1] https://issues.apache.org/jira/browse/FLINK/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel

Cheers,
Till


On Mon, May 29, 2017 at 12:02 PM, Matt <[hidden email]> wrote:
Hi Till,

Have you found anything or are you still busy with the release? I have no idea what may be wrong, but let me know if I can help you in any way to find what may be going on.

Best,
Matt

On Wed, May 24, 2017 at 5:37 AM, Till Rohrmann <[hidden email]> wrote:
Hi Matt,

sorry for not coming back to you sooner. We're currently in the release phase and this consumes a lot of capacities.

I tried to go to the linked repo, but Github tells me that it does not exist. Have you removed it?

Cheers,
Till

On Wed, May 17, 2017 at 10:56 PM, Matt <[hidden email]> wrote:
Check the repo at [1].

The important step which I think is what you missed is running an Ignite node on your computer so the Java code, which launches an Ignite client on the JVM, connects to it and executes Flink on that node on a local environment.

Be aware "peerClassLoadingEnabled" should be enabled (as in ignite.xml), because it must match the config on the client node.

If you follow the Readme file it's everything there, if you have any problem let me know!

Cheers,
Matt


On Wed, May 17, 2017 at 3:49 PM, Matt <[hidden email]> wrote:
Thanks for your help Till.

I will create a self contained test case in a moment and send you the link, wait for it.

Cheers,
Matt

On Wed, May 17, 2017 at 4:38 AM, Till Rohrmann <[hidden email]> wrote:

Hi Matt,

alright, then we have to look into it again. I tried to run your example, however, it does not seem to be self-contained. Using Ignite 2.0.0 with -DIGNITE_QUIET=false -Xms512m the Ignite object seems to be stuck in Ignite#start. In the logs I see the following warning:

May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger warning
WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses (it is recommended in production to specify at least one address in TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)
May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger warning
WARNING: IP finder returned empty addresses list. Please check IP finder configuration and make sure multicast works on your network. Will retry every 2 secs.

However, I assume that this is not critical.

Maybe you can tell me how I can run your example in order to debug it.

Cheers,
Till


On Mon, May 15, 2017 at 10:05 PM, Matt <[hidden email]> wrote:
Hi Till,

I just tried with Flink 1.4 by compiling the current master branch on GitHub (as of this morning) and I still find the same problem as before. If I'm not wrong your PR was merged already, so your fixes should be part of the binary.

I hope you have time to have a look at the test case in [1].

Best,
Matt


On Thu, Apr 27, 2017 at 10:09 AM, Matt <[hidden email]> wrote:
Hi Till,

Great! Do you know if it's planned to be included in v1.2.x or should we wait for v1.3? I'll give it a try as soon as it's merged.

You're right about this approach launching a mini cluster on each Ignite node. That is intentional, as described in my previous message on the list [1].

The idea is to collocate Flink jobs on Ignite nodes, so each dataflow only processes the elements stored on the local in-memory database. I get the impression this should be much faster than randomly picking a Flink node and sending all the data over the network.

Any insight on this?

Cheers,
Matt


On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <[hidden email]> wrote:
I just copied my response because my other email address is not accepted on the user mailing list.

Hi Matt,

I think Stefan's analysis is correct. I have a PR open [1], where I fix the issue with the class loader.

As a side note, by doing what you're doing, you will spawn on each Ignite node a new Flink mini cluster. These mini cluster won't communicate with each other and run independently. Is this what you intend to do?


Cheers,
Till

On Wed, Apr 26, 2017 at 11:12 PM, Matt <[hidden email]> wrote:
Let's wait for Till then, I hope he can figure this out.

On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <[hidden email]> wrote:
Ok, now the question is also about what classloaders Ignite is creating and how they are used, but the relevant code line in Flink is probably in FlinkMiniCluster.scala, line 538 (current master):

 try {
JobClient.submitJobAndWait(
clientActorSystem,
configuration,
leaderRetrievalService,
jobGraph,
timeout,
printUpdates,
this.getClass.getClassLoader())
} finally {
if(!useSingleActorSystem) {
// we have to shutdown the just created actor system
shutdownJobClientActorSystem(clientActorSystem)
}
}

This is what is executed as part of executing a job through LocalEnvironment. As we can see, the classloader is set to the classloader of FlinkMiniCluster. Depending on the classloader structure inside Ignite, this classloader might not know your user code. What you could do is changing this line in a custom Flink build, changing line 538 for example to Thread.currentThread().getContextClassloader() and ensuring that the context classloader ins the runnable is a classloader that a) knows the user code and b) is a child of the classloader that knows the Ignite and Flink classes. Notice that this is not a general solution and should not become a general fix.

I have heard that Till is about to change some things about local execution, so I included him in CC. Maybe he can provide additional hints how your use case might be better supported in the upcoming Flink 1.3.

Best,
Stefan

Am 25.04.2017 um 22:50 schrieb Matt <[hidden email]>:

I updated the code a little bit for clarity, now the line #56 mentioned in my previous message is line #25.

In summary the error I'm getting is this:

---
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.test.Test
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
---

But if I'm not wrong, after trying to load the class through URLClassLoader, Flink should try loading it with its parent ClassLoader, which should be the same ClassLoader that executed the environment, and it does have access to the class.

Not sure what is wrong.

On Tue, Apr 25, 2017 at 5:38 PM, Matt <[hidden email]> wrote:
Hi Stefan,

Check the code here: https://gist.github.com/17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the page.

Here are the results of the additional tests you mentioned:

1. I was able to instantiate an inner class (Test$Foo) inside the Ignite closure, no problem with that
2. I tried implementing SourceFunction and SinkFunction in Test itself, I was able to instantiate the class inside the Ignite closure
3. I'm not sure what you meant in this point, is it something like what I tried in line #56?

Additionally, I tried implementing the SourceFunction and SinkFunction in Test$Foo with the same result: it says "Cannot load user class: com.test.Test$Foo"

Looks like Flink is not using the correct ClassLoader. Any idea?

Regards,
Matt

On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I would expect that the local environment picks up the class path from the code that launched it. So I think the question is what happens behind the scenes when you call ignite.compute().broadcast(runnable); . Which classes are shipped and how is the classpath build in the environment that runs the code. Your example is also not fully conclusive, because com.myproj.Test (which you can successfully instantiate) and com.myproj.Test$1$2 (which fails) are different classes, so maybe only the outer class is shipped with the broadcast call. My theory is that not all classes are shipped (e.g. inner classes), but only Test . You could try three things to analyze to problem a little more:

1) Create another inner class inside Test and try if you are still able to instantiate also this class via reflection.
2) Let Test class itself implement the map function (avoiding the usage of other/inner classes) and see if this works.
3) Check and set the thread’s context classloader inside the runnable to something that contains all required classes and see if this gets picked up by Flink.

Best,
Stefan

Am 25.04.2017 um 07:27 schrieb Matt <[hidden email]>:

Hi all,

I'm trying to run Flink using a local environment, but on an Ignite node to achieve collocation (as mentioned in my previous message on this list).

Have a look at the code in [1]. It's pretty simple, but I'm getting a "cannot load user class" error as shown in [2].

If you check line #29 on the code, I'm able to create an instance of class Test, and it's the same context from which I'm creating the Flink job. Shouldn't it work provided I'm using a local environment?

It would be really nice to be able to inject a ClassLoader into the chunk of code that creates the job. Is this currently possible?

Any fix or workaround is appreciated!

Best,
Matt

[1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215















Reply | Threaded
Open this post in threaded view
|

Re: [BUG?] Cannot Load User Class on Local Environment

Till Rohrmann
Sounds great Matt :-)

On Tue, May 30, 2017 at 12:17 AM, Matt <[hidden email]> wrote:
Thanks for looking into it Till!

I'll try changing that line locally and then send a JIRA issue. When it gets officially fixed I'll probably create an Ignite-Flink connector to replace the older and less efficient one [1]. Users will be able to create Flink jobs on Ignite nodes, right where the data is stored.


Best,
Matt



Matt

On Mon, May 29, 2017 at 9:37 AM, Till Rohrmann <[hidden email]> wrote:

Hi Matt,

I looked into it and it seems that the Task does not respect the context class loader. The problem is that the local mode was not developed with the intention to be executed within something like Ignite or an application server. It rather assumes that you have a user code jar which is sent to the TaskManager. This jar is then added to an URLClassLoader which is used for user code class loading. In the case of the local execution mode, Flink assumes that all user code jars are in the system class loader (which usually holds true when running examples from the IDE). That is the reason why we don’t check the TCCL. In order to fix your problem you can replace BlobLibraryCacheManager.java:298 with this.classLoader = new FlinkUserCodeClassLoader(libraryURLs, Thread.currentThread().getContextClassLoader());. Alternatively, you can build your job, copy the user code jar to IGNITE_HOME/libs and then restart ignite.

If you want to get the TCCL problem properly fixed, I suggest to open a JIRA issue here [1].

[1] https://issues.apache.org/jira/browse/FLINK/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel

Cheers,
Till


On Mon, May 29, 2017 at 12:02 PM, Matt <[hidden email]> wrote:
Hi Till,

Have you found anything or are you still busy with the release? I have no idea what may be wrong, but let me know if I can help you in any way to find what may be going on.

Best,
Matt

On Wed, May 24, 2017 at 5:37 AM, Till Rohrmann <[hidden email]> wrote:
Hi Matt,

sorry for not coming back to you sooner. We're currently in the release phase and this consumes a lot of capacities.

I tried to go to the linked repo, but Github tells me that it does not exist. Have you removed it?

Cheers,
Till

On Wed, May 17, 2017 at 10:56 PM, Matt <[hidden email]> wrote:
Check the repo at [1].

The important step which I think is what you missed is running an Ignite node on your computer so the Java code, which launches an Ignite client on the JVM, connects to it and executes Flink on that node on a local environment.

Be aware "peerClassLoadingEnabled" should be enabled (as in ignite.xml), because it must match the config on the client node.

If you follow the Readme file it's everything there, if you have any problem let me know!

Cheers,
Matt


On Wed, May 17, 2017 at 3:49 PM, Matt <[hidden email]> wrote:
Thanks for your help Till.

I will create a self contained test case in a moment and send you the link, wait for it.

Cheers,
Matt

On Wed, May 17, 2017 at 4:38 AM, Till Rohrmann <[hidden email]> wrote:

Hi Matt,

alright, then we have to look into it again. I tried to run your example, however, it does not seem to be self-contained. Using Ignite 2.0.0 with -DIGNITE_QUIET=false -Xms512m the Ignite object seems to be stuck in Ignite#start. In the logs I see the following warning:

May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger warning
WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses (it is recommended in production to specify at least one address in TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)
May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger warning
WARNING: IP finder returned empty addresses list. Please check IP finder configuration and make sure multicast works on your network. Will retry every 2 secs.

However, I assume that this is not critical.

Maybe you can tell me how I can run your example in order to debug it.

Cheers,
Till


On Mon, May 15, 2017 at 10:05 PM, Matt <[hidden email]> wrote:
Hi Till,

I just tried with Flink 1.4 by compiling the current master branch on GitHub (as of this morning) and I still find the same problem as before. If I'm not wrong your PR was merged already, so your fixes should be part of the binary.

I hope you have time to have a look at the test case in [1].

Best,
Matt


On Thu, Apr 27, 2017 at 10:09 AM, Matt <[hidden email]> wrote:
Hi Till,

Great! Do you know if it's planned to be included in v1.2.x or should we wait for v1.3? I'll give it a try as soon as it's merged.

You're right about this approach launching a mini cluster on each Ignite node. That is intentional, as described in my previous message on the list [1].

The idea is to collocate Flink jobs on Ignite nodes, so each dataflow only processes the elements stored on the local in-memory database. I get the impression this should be much faster than randomly picking a Flink node and sending all the data over the network.

Any insight on this?

Cheers,
Matt


On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <[hidden email]> wrote:
I just copied my response because my other email address is not accepted on the user mailing list.

Hi Matt,

I think Stefan's analysis is correct. I have a PR open [1], where I fix the issue with the class loader.

As a side note, by doing what you're doing, you will spawn on each Ignite node a new Flink mini cluster. These mini cluster won't communicate with each other and run independently. Is this what you intend to do?


Cheers,
Till

On Wed, Apr 26, 2017 at 11:12 PM, Matt <[hidden email]> wrote:
Let's wait for Till then, I hope he can figure this out.

On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <[hidden email]> wrote:
Ok, now the question is also about what classloaders Ignite is creating and how they are used, but the relevant code line in Flink is probably in FlinkMiniCluster.scala, line 538 (current master):

 try {
JobClient.submitJobAndWait(
clientActorSystem,
configuration,
leaderRetrievalService,
jobGraph,
timeout,
printUpdates,
this.getClass.getClassLoader())
} finally {
if(!useSingleActorSystem) {
// we have to shutdown the just created actor system
shutdownJobClientActorSystem(clientActorSystem)
}
}

This is what is executed as part of executing a job through LocalEnvironment. As we can see, the classloader is set to the classloader of FlinkMiniCluster. Depending on the classloader structure inside Ignite, this classloader might not know your user code. What you could do is changing this line in a custom Flink build, changing line 538 for example to Thread.currentThread().getContextClassloader() and ensuring that the context classloader ins the runnable is a classloader that a) knows the user code and b) is a child of the classloader that knows the Ignite and Flink classes. Notice that this is not a general solution and should not become a general fix.

I have heard that Till is about to change some things about local execution, so I included him in CC. Maybe he can provide additional hints how your use case might be better supported in the upcoming Flink 1.3.

Best,
Stefan

Am 25.04.2017 um 22:50 schrieb Matt <[hidden email]>:

I updated the code a little bit for clarity, now the line #56 mentioned in my previous message is line #25.

In summary the error I'm getting is this:

---
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.test.Test
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
---

But if I'm not wrong, after trying to load the class through URLClassLoader, Flink should try loading it with its parent ClassLoader, which should be the same ClassLoader that executed the environment, and it does have access to the class.

Not sure what is wrong.

On Tue, Apr 25, 2017 at 5:38 PM, Matt <[hidden email]> wrote:
Hi Stefan,

Check the code here: https://gist.github.com/17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the page.

Here are the results of the additional tests you mentioned:

1. I was able to instantiate an inner class (Test$Foo) inside the Ignite closure, no problem with that
2. I tried implementing SourceFunction and SinkFunction in Test itself, I was able to instantiate the class inside the Ignite closure
3. I'm not sure what you meant in this point, is it something like what I tried in line #56?

Additionally, I tried implementing the SourceFunction and SinkFunction in Test$Foo with the same result: it says "Cannot load user class: com.test.Test$Foo"

Looks like Flink is not using the correct ClassLoader. Any idea?

Regards,
Matt

On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I would expect that the local environment picks up the class path from the code that launched it. So I think the question is what happens behind the scenes when you call ignite.compute().broadcast(runnable); . Which classes are shipped and how is the classpath build in the environment that runs the code. Your example is also not fully conclusive, because com.myproj.Test (which you can successfully instantiate) and com.myproj.Test$1$2 (which fails) are different classes, so maybe only the outer class is shipped with the broadcast call. My theory is that not all classes are shipped (e.g. inner classes), but only Test . You could try three things to analyze to problem a little more:

1) Create another inner class inside Test and try if you are still able to instantiate also this class via reflection.
2) Let Test class itself implement the map function (avoiding the usage of other/inner classes) and see if this works.
3) Check and set the thread’s context classloader inside the runnable to something that contains all required classes and see if this gets picked up by Flink.

Best,
Stefan

Am 25.04.2017 um 07:27 schrieb Matt <[hidden email]>:

Hi all,

I'm trying to run Flink using a local environment, but on an Ignite node to achieve collocation (as mentioned in my previous message on this list).

Have a look at the code in [1]. It's pretty simple, but I'm getting a "cannot load user class" error as shown in [2].

If you check line #29 on the code, I'm able to create an instance of class Test, and it's the same context from which I'm creating the Flink job. Shouldn't it work provided I'm using a local environment?

It would be really nice to be able to inject a ClassLoader into the chunk of code that creates the job. Is this currently possible?

Any fix or workaround is appreciated!

Best,
Matt

[1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215