ClassNotFoundException in custom SourceFunction

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

ClassNotFoundException in custom SourceFunction

romain.jln
This post was updated on .
Hi all,

I am experiencing some problems with a custom source that I have
implemented. I am getting some ClassNotFoundException randomly during the
execution of the job meanwhile the fat jar submitted to Flink contains the
given classes.

After several hours of investigation, I think I have been able to narrow
down the potential cause.

Currently, it seems that the thread executing the cancel method of my source
function is not using the normal FlinkUserCodeClassLoader but rather the
AppClassLoader which seems to cause those ClassNotFoundExceptions.

I wanted to know if this behaviour was expected or if it was actually a bug.

Thanks !

PS: I am using Flink Version: 1.3.2

--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: ClassNotFoundException in custom SourceFunction

Tugdual Grall
ok

On Thu, Dec 7, 2017 at 10:35 AM, romain.jln <[hidden email]> wrote:
Hi all,

I am experiencing some problems with a custom source that I have
implemented. I am getting some ClassNotFoundException randomly during the
execution of the job meanwhile the fat jar submitted to Flink contains the
given classes.

After several hours of investigation, I think I have been able to narrow
down the potential cause.

Currently, it seems that the thread executing the cancel method of my source
function is not using the normal FlinkUserCodeClassLoader but rather the
AppClassLoader which seems to cause those ClassNotFoundExceptions.

I wanted to know if this behaviour was expected or if it was actually a bug.

Thanks !



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: ClassNotFoundException in custom SourceFunction

Fabian Hueske-2
Hi,

A ClassNotFoundException should not be expected behavior.
Can you post the stacktrace of the exception?

We had a few issues in the past where Flink didn't use the correct classloader.
So this would not be an unusual bug.

Thanks,
Fabian

2017-12-07 10:44 GMT+01:00 Tugdual Grall <[hidden email]>:
ok

On Thu, Dec 7, 2017 at 10:35 AM, romain.jln <[hidden email]> wrote:
Hi all,

I am experiencing some problems with a custom source that I have
implemented. I am getting some ClassNotFoundException randomly during the
execution of the job meanwhile the fat jar submitted to Flink contains the
given classes.

After several hours of investigation, I think I have been able to narrow
down the potential cause.

Currently, it seems that the thread executing the cancel method of my source
function is not using the normal FlinkUserCodeClassLoader but rather the
AppClassLoader which seems to cause those ClassNotFoundExceptions.

I wanted to know if this behaviour was expected or if it was actually a bug.

Thanks !



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: ClassNotFoundException in custom SourceFunction

romain.jln
This post was updated on .
Hi,

The stack trace is usually something like :

Exception in thread "Thread-2" java.lang.NoClassDefFoundError: com/microsoft/azure/eventhubs/MessageReceiver$10
        at com.microsoft.azure.eventhubs.MessageReceiver.scheduleLinkCloseTimeout(MessageReceiver.java:574)
        at com.microsoft.azure.eventhubs.MessageReceiver.onClose(MessageReceiver.java:641)
        at com.microsoft.azure.eventhubs.ClientEntity.close(ClientEntity.java:78)
        at com.microsoft.azure.eventhubs.PartitionReceiver.onClose(PartitionReceiver.java:391)
        at com.microsoft.azure.eventhubs.ClientEntity.close(ClientEntity.java:78)
        at com.microsoft.azure.eventhubs.ClientEntity.closeSync(ClientEntity.java:93)
        at commons.source.azure.eventhub.PartitionPoller.shutdown(PartitionPoller.java:80)
        at commons.source.azure.eventhub.PartitionPoller.run(PartitionPoller.java:55)
Caused by: java.lang.ClassNotFoundException: com.microsoft.azure.eventhubs.MessageReceiver$10
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 8 more

(those messages are appearing randomly in the stdout of the task managers)

For a little bit of context about to this stack trace, it is related to a
custom implementation of a Flink Source that connects to an Azure Eventhub.
When starting the source, I create a thread that will poll messages from the Eventhub (PartitionPoller class). This thread is
created and started in the Open function of the custom source.

I checked the fat jar that I am uploading to Flink using the web API and the
given class is correctly located at the given path.

It is not always the same class that is missing. It can also be
com.microsoft.azure.eventhubs.ExceptionUtil,
com.microsoft.azure.eventhubs.AmqpErrorCode, or other classes of the
same package. All of those classes are correctly located in the fat jar.

I kept on investigating the issue and here are the first results I got :

Using Thread.currentThread().getContextClassLoader(), I can see that, when
manually cancelling the job (via the web API), the class of the ClassLoader
is sun.misc.Launcher$AppClassLoader instead of FlinkUserCodeClassLoader
(which maybe can explain some of the ClassNotFoundException)

However, when Flink automatically cancels the source (because of an error
during the execution of the job), it correctly uses a
FlinkUserCodeClassLoader as expected.

When checking the ClassLoader of the thread during the call to the Open
method of the source, it also correctly uses a FlinkUserCodeClassLoader.

But I still keep on getting some ClassNotFoundException from time to time
for no apparent reason to me.

EDIT: Correction of the stack trace.

--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: ClassNotFoundException in custom SourceFunction

Fabian Hueske-2
Hi,

thanks a lot for investigating this problems and the results you shared.
This looks like a bug to me. I'm CCing Aljoscha who knows the internals of the DataStream API very well.

Which Flink version are you using?

Would you mind creating a JIRA issue [1] with all the info you provided so far?

Thank you,
Fabian

[1] https://issues.apache.org/jira/projects/FLINK/summary

2017-12-08 11:27 GMT+01:00 romain.jln <[hidden email]>:
Hi,

The stack trace is usually something like :

Exception in thread "Thread-49" java.lang.NoClassDefFoundError:
com/microsoft/azure/eventhubs/amqp/AmqpErrorCode
        at
com.microsoft.azure.eventhubs.ExceptionUtil.toException(ExceptionUtil.java:30)
        at
com.microsoft.azure.eventhubs.MessageSender.onClose(MessageSender.java:376)
        at
com.microsoft.azure.eventhubs.amqp.BaseLinkHandler.processOnClose(BaseLinkHandler.java:76)
        at
com.microsoft.azure.eventhubs.amqp.BaseLinkHandler.onLinkRemoteClose(BaseLinkHandler.java:47)
        at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
        at
org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
        at
org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:309)
        at
org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:276)
        at
com.microsoft.azure.eventhubs.MessagingFactory$RunReactor.run(MessagingFactory.java:404)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
com.microsoft.azure.eventhubs.amqp.AmqpErrorCode
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 10 more

(those messages are appearing randomly in the stdout of the task managers)

For a little bit of context about to this stack trace, it is related to a
custom implementation of a Flink Source that connects to an Azure Eventhub.
When starting an Eventhub client, the Eventhub library creates a Reactor
thread for managing the AMQP messages (proton library). This thread is
created in the Open function of the custom source.

I checked the fat jar that I am uploading to Flink using the web API and the
given class is correctly located at the given path.

It is not always the same class that is missing. It can also be
com.microsoft.azure.eventhubs.ExceptionUtil,
com.microsoft.azure.eventhubs.MessageReceiver$10, or other classes of the
same package. All of those classes are correctly located in the fat jar.

I kept on investigating the issue and here are the first results I got :

Using Thread.currentThread().getContextClassLoader(), I can see that, when
manually cancelling the job (via the web API), the class of the ClassLoader
is sun.misc.Launcher$AppClassLoader instead of FlinkUserCodeClassLoader
(which can explain some of the ClassNotFoundException)

However, when Flink automatically cancels the source (because of an error
during the execution of the job), it correctly uses a
FlinkUserCodeClassLoader as expected.

When checking the ClassLoader of the thread during the call to the Open
method of the source, it also correctly uses a FlinkUserCodeClassLoader.

But I still keep on getting some ClassNotFoundException from time to time
for no apparent reason to me.

Reply | Threaded
Open this post in threaded view
|

Re: ClassNotFoundException in custom SourceFunction

Aljoscha Krettek
Hi,

Is the code that is throwing the exception trying to use the Tread Context ClassLoader? If yes, that might explain it because a Thread that you create will not have the correct ClassLoader set.

Best,
Aljoscha

On 8. Dec 2017, at 12:24, Fabian Hueske <[hidden email]> wrote:

Hi,

thanks a lot for investigating this problems and the results you shared.
This looks like a bug to me. I'm CCing Aljoscha who knows the internals of the DataStream API very well.

Which Flink version are you using?

Would you mind creating a JIRA issue [1] with all the info you provided so far?

Thank you,
Fabian

[1] https://issues.apache.org/jira/projects/FLINK/summary

2017-12-08 11:27 GMT+01:00 romain.jln <[hidden email]>:
Hi,

The stack trace is usually something like :

Exception in thread "Thread-49" java.lang.NoClassDefFoundError:
com/microsoft/azure/eventhubs/amqp/AmqpErrorCode
        at
com.microsoft.azure.eventhubs.ExceptionUtil.toException(ExceptionUtil.java:30)
        at
com.microsoft.azure.eventhubs.MessageSender.onClose(MessageSender.java:376)
        at
com.microsoft.azure.eventhubs.amqp.BaseLinkHandler.processOnClose(BaseLinkHandler.java:76)
        at
com.microsoft.azure.eventhubs.amqp.BaseLinkHandler.onLinkRemoteClose(BaseLinkHandler.java:47)
        at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
        at
org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
        at
org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:309)
        at
org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:276)
        at
com.microsoft.azure.eventhubs.MessagingFactory$RunReactor.run(MessagingFactory.java:404)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
com.microsoft.azure.eventhubs.amqp.AmqpErrorCode
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 10 more

(those messages are appearing randomly in the stdout of the task managers)

For a little bit of context about to this stack trace, it is related to a
custom implementation of a Flink Source that connects to an Azure Eventhub.
When starting an Eventhub client, the Eventhub library creates a Reactor
thread for managing the AMQP messages (proton library). This thread is
created in the Open function of the custom source.

I checked the fat jar that I am uploading to Flink using the web API and the
given class is correctly located at the given path.

It is not always the same class that is missing. It can also be
com.microsoft.azure.eventhubs.ExceptionUtil,
com.microsoft.azure.eventhubs.MessageReceiver$10, or other classes of the
same package. All of those classes are correctly located in the fat jar.

I kept on investigating the issue and here are the first results I got :

Using Thread.currentThread().getContextClassLoader(), I can see that, when
manually cancelling the job (via the web API), the class of the ClassLoader
is sun.misc.Launcher$AppClassLoader instead of FlinkUserCodeClassLoader
(which can explain some of the ClassNotFoundException)

However, when Flink automatically cancels the source (because of an error
during the execution of the job), it correctly uses a
FlinkUserCodeClassLoader as expected.

When checking the ClassLoader of the thread during the call to the Open
method of the source, it also correctly uses a FlinkUserCodeClassLoader.

But I still keep on getting some ClassNotFoundException from time to time
for no apparent reason to me.


Reply | Threaded
Open this post in threaded view
|

Re: ClassNotFoundException in custom SourceFunction

romain.jln
Hi,

FYI, I edited my message on the Nabble archive website because I realised I
sent the wrong stack trace at first (but I don't know if you've noticed the
modification). The first one was actually related to a custom Sink function
that sends data to the Eventhub (not sure whether they are related issues or
not)

The one related to the source is the following:

Exception in thread "Thread-2" java.lang.NoClassDefFoundError:
com/microsoft/azure/eventhubs/MessageReceiver$10
        at
com.microsoft.azure.eventhubs.MessageReceiver.scheduleLinkCloseTimeout(MessageReceiver.java:574)
        at
com.microsoft.azure.eventhubs.MessageReceiver.onClose(MessageReceiver.java:641)
        at
com.microsoft.azure.eventhubs.ClientEntity.close(ClientEntity.java:78)
        at
com.microsoft.azure.eventhubs.PartitionReceiver.onClose(PartitionReceiver.java:391)
        at
com.microsoft.azure.eventhubs.ClientEntity.close(ClientEntity.java:78)
        at
com.microsoft.azure.eventhubs.ClientEntity.closeSync(ClientEntity.java:93)
        at
commons.source.azure.eventhub.PartitionPoller.shutdown(PartitionPoller.java:80)
        at
commons.source.azure.eventhub.PartitionPoller.run(PartitionPoller.java:55)
Caused by: java.lang.ClassNotFoundException:
com.microsoft.azure.eventhubs.MessageReceiver$10
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 8 more


I don't know if I exactly understand what you mean by " trying to use the
Thread Context ClassLoader" (I do not have a deep knowledge related to
ClassLoader) but I am not calling explicitly any method on the Thread
Context ClassLoader and by going through the different methods of the stack
trace I did not notice any neither.

As I was explaining at first, within the source I basically create a thread
that is constantly polling messages from the Eventhub (like the
KafkaConsummerThread is doing with kafka) using Microsoft library. Under the
hood, Microsoft uses some threadPoolExecutor and the proton library.

The whole source works fine except that I get those ClassNotFoundException
from time to time with no real apparent reason.

I use Flink Version: 1.3.2

Best,
Romain



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: ClassNotFoundException in custom SourceFunction

Aljoscha Krettek
Hi,

Is it possible to go in there with a debugger and see where exactly the code is invoking the ClassLoader?

Best,
Aljoscha

> On 8. Dec 2017, at 14:13, romain.jln <[hidden email]> wrote:
>
> Hi,
>
> FYI, I edited my message on the Nabble archive website because I realised I
> sent the wrong stack trace at first (but I don't know if you've noticed the
> modification). The first one was actually related to a custom Sink function
> that sends data to the Eventhub (not sure whether they are related issues or
> not)
>
> The one related to the source is the following:
>
> Exception in thread "Thread-2" java.lang.NoClassDefFoundError:
> com/microsoft/azure/eventhubs/MessageReceiver$10
>        at
> com.microsoft.azure.eventhubs.MessageReceiver.scheduleLinkCloseTimeout(MessageReceiver.java:574)
>        at
> com.microsoft.azure.eventhubs.MessageReceiver.onClose(MessageReceiver.java:641)
>        at
> com.microsoft.azure.eventhubs.ClientEntity.close(ClientEntity.java:78)
>        at
> com.microsoft.azure.eventhubs.PartitionReceiver.onClose(PartitionReceiver.java:391)
>        at
> com.microsoft.azure.eventhubs.ClientEntity.close(ClientEntity.java:78)
>        at
> com.microsoft.azure.eventhubs.ClientEntity.closeSync(ClientEntity.java:93)
>        at
> commons.source.azure.eventhub.PartitionPoller.shutdown(PartitionPoller.java:80)
>        at
> commons.source.azure.eventhub.PartitionPoller.run(PartitionPoller.java:55)
> Caused by: java.lang.ClassNotFoundException:
> com.microsoft.azure.eventhubs.MessageReceiver$10
>        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>        ... 8 more
>
>
> I don't know if I exactly understand what you mean by " trying to use the
> Thread Context ClassLoader" (I do not have a deep knowledge related to
> ClassLoader) but I am not calling explicitly any method on the Thread
> Context ClassLoader and by going through the different methods of the stack
> trace I did not notice any neither.
>
> As I was explaining at first, within the source I basically create a thread
> that is constantly polling messages from the Eventhub (like the
> KafkaConsummerThread is doing with kafka) using Microsoft library. Under the
> hood, Microsoft uses some threadPoolExecutor and the proton library.
>
> The whole source works fine except that I get those ClassNotFoundException
> from time to time with no real apparent reason.
>
> I use Flink Version: 1.3.2
>
> Best,
> Romain
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: ClassNotFoundException in custom SourceFunction

romain.jln
Hi,

The problem is that most of the exceptions appear when my job has been
running for some hours.

The only way for me to reproduce some of those errors is by using the web UI
and hitting the cancel button of my job. So if I can find a way to generate
this action locally in a test, maybe I can use a debugger to see where the
code is invoking the ClassLoader.

But in the case of the stack trace I sent you, when going into the source
code of the Eventhub, it turns out that the class causing the exception is
actually an anonymous class implementing the standard interface Runnable.
The cluster is running on a linux distribution, so I thought at first that,
as linux is flushing from time to time the tmp file, it could be the reason.
But since then, I restarted the cluster with the configuration
taskmanager.tmp.dirs and jobmanager.web.tmpdir set to another directory and
it seems that it did not solve the issue.

Best,
Romain



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/