Akka Http used in custom RichSourceFunction

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Akka Http used in custom RichSourceFunction

Niels van Kaam
Hi All,

I wrote a custom source function (RichSourceFunction) which connects to a web socket using the Akka Http Library. The job using this source runs fine on a local environment until upon shutdown I see the following error in the log: "Exception in thread "main" java.lang.NoSuchMethodError: akka.actor.ActorSystem.shutdown()V"

My impression is the issue is caused by a version conflict between flink's akka dependency and my own one (due to akka http). This seems to be related to this issue: https://issues.apache.org/jira/browse/FLINK-9240

Can I somehow avoid this conflict?
If not, does this mean I should avoid using Akka (or at least other versions than Flink's) within my sources/sinks?
Or can I safely catch and ignore the error? 

My dependencies are:
Flink: 1.4.2
akka-actor: 2.5.12
akka-stream: 2.5.12
akka-http: 10.1.1

Thank you for your help!

Cheers,
Niels


Reply | Threaded
Open this post in threaded view
|

Re: Akka Http used in custom RichSourceFunction

Piotr Nowojski
Hi,

Please take a look on https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html . Especially check if you are using child-first class loading config. If it doesn’t help, probably you should shade your akka dependency. 

What is the full exception? Is it thrown when YOURS code tries to shutdown, or when FLINK’s code tries to shutdown?

Piotrek

On 24 May 2018, at 14:38, Niels van Kaam <[hidden email]> wrote:

Hi All,

I wrote a custom source function (RichSourceFunction) which connects to a web socket using the Akka Http Library. The job using this source runs fine on a local environment until upon shutdown I see the following error in the log: "Exception in thread "main" java.lang.NoSuchMethodError: akka.actor.ActorSystem.shutdown()V"

My impression is the issue is caused by a version conflict between flink's akka dependency and my own one (due to akka http). This seems to be related to this issue: https://issues.apache.org/jira/browse/FLINK-9240

Can I somehow avoid this conflict?
If not, does this mean I should avoid using Akka (or at least other versions than Flink's) within my sources/sinks?
Or can I safely catch and ignore the error? 

My dependencies are:
Flink: 1.4.2
akka-actor: 2.5.12
akka-stream: 2.5.12
akka-http: 10.1.1

Thank you for your help!

Cheers,
Niels



Reply | Threaded
Open this post in threaded view
|

Re: Akka Http used in custom RichSourceFunction

Niels van Kaam
Hi Piotrek,

Thank you for your response!

I am currently just testing the job in a local environment. I think that means all classes are in the Java classpath, which might also be the issue then. 
If I am correct that means I am currently not using dynamic classloading and just overwriting the Akka version, also for Flink.

I will try moving my websocket connector to a seperate package and shade it's Akka dependency.


The full stack trace of the exception (I think this is the shutdown of the Flink minicluster):

Exception in thread "main" java.lang.NoSuchMethodError: akka.actor.ActorSystem.shutdown()V
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483)
at scala.Option.foreach(Option.scala:257)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.startInternalShutdown(FlinkMiniCluster.scala:482)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.stop(FlinkMiniCluster.scala:434)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:112)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
at net.vankaam.flink.WebSocketSample$.main(WebSocketSample.scala:42)
at net.vankaam.flink.WebSocketSample.main(WebSocketSample.scala)


Cheers,
Niels


On Thu, May 24, 2018 at 4:08 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

Please take a look on https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html . Especially check if you are using child-first class loading config. If it doesn’t help, probably you should shade your akka dependency. 

What is the full exception? Is it thrown when YOURS code tries to shutdown, or when FLINK’s code tries to shutdown?

Piotrek


On 24 May 2018, at 14:38, Niels van Kaam <[hidden email]> wrote:

Hi All,

I wrote a custom source function (RichSourceFunction) which connects to a web socket using the Akka Http Library. The job using this source runs fine on a local environment until upon shutdown I see the following error in the log: "Exception in thread "main" java.lang.NoSuchMethodError: akka.actor.ActorSystem.shutdown()V"

My impression is the issue is caused by a version conflict between flink's akka dependency and my own one (due to akka http). This seems to be related to this issue: https://issues.apache.org/jira/browse/FLINK-9240

Can I somehow avoid this conflict?
If not, does this mean I should avoid using Akka (or at least other versions than Flink's) within my sources/sinks?
Or can I safely catch and ignore the error? 

My dependencies are:
Flink: 1.4.2
akka-actor: 2.5.12
akka-stream: 2.5.12
akka-http: 10.1.1

Thank you for your help!

Cheers,
Niels



Reply | Threaded
Open this post in threaded view
|

Re: Akka Http used in custom RichSourceFunction

Piotr Nowojski
Hi,

Yes, this might be the cause of the issue, because indeed it looks like your akka’s version is leaking to Flink’s classloader.

Piotrek

On 25 May 2018, at 09:40, Niels van Kaam <[hidden email]> wrote:

Hi Piotrek,

Thank you for your response!

I am currently just testing the job in a local environment. I think that means all classes are in the Java classpath, which might also be the issue then. 
If I am correct that means I am currently not using dynamic classloading and just overwriting the Akka version, also for Flink.

I will try moving my websocket connector to a seperate package and shade it's Akka dependency.


The full stack trace of the exception (I think this is the shutdown of the Flink minicluster):

Exception in thread "main" java.lang.NoSuchMethodError: akka.actor.ActorSystem.shutdown()V
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483)
at scala.Option.foreach(Option.scala:257)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.startInternalShutdown(FlinkMiniCluster.scala:482)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.stop(FlinkMiniCluster.scala:434)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:112)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
at net.vankaam.flink.WebSocketSample$.main(WebSocketSample.scala:42)
at net.vankaam.flink.WebSocketSample.main(WebSocketSample.scala)


Cheers,
Niels


On Thu, May 24, 2018 at 4:08 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

Please take a look on https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html . Especially check if you are using child-first class loading config. If it doesn’t help, probably you should shade your akka dependency. 

What is the full exception? Is it thrown when YOURS code tries to shutdown, or when FLINK’s code tries to shutdown?

Piotrek


On 24 May 2018, at 14:38, Niels van Kaam <[hidden email]> wrote:

Hi All,

I wrote a custom source function (RichSourceFunction) which connects to a web socket using the Akka Http Library. The job using this source runs fine on a local environment until upon shutdown I see the following error in the log: "Exception in thread "main" java.lang.NoSuchMethodError: akka.actor.ActorSystem.shutdown()V"

My impression is the issue is caused by a version conflict between flink's akka dependency and my own one (due to akka http). This seems to be related to this issue: https://issues.apache.org/jira/browse/FLINK-9240

Can I somehow avoid this conflict?
If not, does this mean I should avoid using Akka (or at least other versions than Flink's) within my sources/sinks?
Or can I safely catch and ignore the error? 

My dependencies are:
Flink: 1.4.2
akka-actor: 2.5.12
akka-stream: 2.5.12
akka-http: 10.1.1

Thank you for your help!

Cheers,
Niels




Reply | Threaded
Open this post in threaded view
|

Re: Akka Http used in custom RichSourceFunction

Niels van Kaam
Hi,

It was indeed the problem, and shading my akka dependency has solved the problem. Thank you for pointing that out!

For references:

When shading akka you also need to merge the reference.conf files from akka, or it will fail. This page contains useful documentation on how to shade akka: https://doc.akka.io/docs/akka/2.5.12/general/configuration.html#When_using_JarJar__OneJar__Assembly_or_any_jar-bundler


Niels



On Fri, May 25, 2018 at 11:00 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

Yes, this might be the cause of the issue, because indeed it looks like your akka’s version is leaking to Flink’s classloader.

Piotrek


On 25 May 2018, at 09:40, Niels van Kaam <[hidden email]> wrote:

Hi Piotrek,

Thank you for your response!

I am currently just testing the job in a local environment. I think that means all classes are in the Java classpath, which might also be the issue then. 
If I am correct that means I am currently not using dynamic classloading and just overwriting the Akka version, also for Flink.

I will try moving my websocket connector to a seperate package and shade it's Akka dependency.


The full stack trace of the exception (I think this is the shutdown of the Flink minicluster):

Exception in thread "main" java.lang.NoSuchMethodError: akka.actor.ActorSystem.shutdown()V
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483)
at scala.Option.foreach(Option.scala:257)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.startInternalShutdown(FlinkMiniCluster.scala:482)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.stop(FlinkMiniCluster.scala:434)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:112)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
at net.vankaam.flink.WebSocketSample$.main(WebSocketSample.scala:42)
at net.vankaam.flink.WebSocketSample.main(WebSocketSample.scala)


Cheers,
Niels


On Thu, May 24, 2018 at 4:08 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

Please take a look on https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html . Especially check if you are using child-first class loading config. If it doesn’t help, probably you should shade your akka dependency. 

What is the full exception? Is it thrown when YOURS code tries to shutdown, or when FLINK’s code tries to shutdown?

Piotrek


On 24 May 2018, at 14:38, Niels van Kaam <[hidden email]> wrote:

Hi All,

I wrote a custom source function (RichSourceFunction) which connects to a web socket using the Akka Http Library. The job using this source runs fine on a local environment until upon shutdown I see the following error in the log: "Exception in thread "main" java.lang.NoSuchMethodError: akka.actor.ActorSystem.shutdown()V"

My impression is the issue is caused by a version conflict between flink's akka dependency and my own one (due to akka http). This seems to be related to this issue: https://issues.apache.org/jira/browse/FLINK-9240

Can I somehow avoid this conflict?
If not, does this mean I should avoid using Akka (or at least other versions than Flink's) within my sources/sinks?
Or can I safely catch and ignore the error? 

My dependencies are:
Flink: 1.4.2
akka-actor: 2.5.12
akka-stream: 2.5.12
akka-http: 10.1.1

Thank you for your help!

Cheers,
Niels




Reply | Threaded
Open this post in threaded view
|

Re: Akka Http used in custom RichSourceFunction

Piotr Nowojski
Thanks for getting back and I’m glad that you were able to resolve your issue :)

Piotrek

On 25 May 2018, at 11:25, Niels van Kaam <[hidden email]> wrote:

Hi,

It was indeed the problem, and shading my akka dependency has solved the problem. Thank you for pointing that out!

For references:

When shading akka you also need to merge the reference.conf files from akka, or it will fail. This page contains useful documentation on how to shade akka: https://doc.akka.io/docs/akka/2.5.12/general/configuration.html#When_using_JarJar__OneJar__Assembly_or_any_jar-bundler


Niels



On Fri, May 25, 2018 at 11:00 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

Yes, this might be the cause of the issue, because indeed it looks like your akka’s version is leaking to Flink’s classloader.

Piotrek


On 25 May 2018, at 09:40, Niels van Kaam <[hidden email]> wrote:

Hi Piotrek,

Thank you for your response!

I am currently just testing the job in a local environment. I think that means all classes are in the Java classpath, which might also be the issue then. 
If I am correct that means I am currently not using dynamic classloading and just overwriting the Akka version, also for Flink.

I will try moving my websocket connector to a seperate package and shade it's Akka dependency.


The full stack trace of the exception (I think this is the shutdown of the Flink minicluster):

Exception in thread "main" java.lang.NoSuchMethodError: akka.actor.ActorSystem.shutdown()V
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483)
at scala.Option.foreach(Option.scala:257)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.startInternalShutdown(FlinkMiniCluster.scala:482)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.stop(FlinkMiniCluster.scala:434)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:112)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
at net.vankaam.flink.WebSocketSample$.main(WebSocketSample.scala:42)
at net.vankaam.flink.WebSocketSample.main(WebSocketSample.scala)


Cheers,
Niels


On Thu, May 24, 2018 at 4:08 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

Please take a look on https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html . Especially check if you are using child-first class loading config. If it doesn’t help, probably you should shade your akka dependency. 

What is the full exception? Is it thrown when YOURS code tries to shutdown, or when FLINK’s code tries to shutdown?

Piotrek


On 24 May 2018, at 14:38, Niels van Kaam <[hidden email]> wrote:

Hi All,

I wrote a custom source function (RichSourceFunction) which connects to a web socket using the Akka Http Library. The job using this source runs fine on a local environment until upon shutdown I see the following error in the log: "Exception in thread "main" java.lang.NoSuchMethodError: akka.actor.ActorSystem.shutdown()V"

My impression is the issue is caused by a version conflict between flink's akka dependency and my own one (due to akka http). This seems to be related to this issue: https://issues.apache.org/jira/browse/FLINK-9240

Can I somehow avoid this conflict?
If not, does this mean I should avoid using Akka (or at least other versions than Flink's) within my sources/sinks?
Or can I safely catch and ignore the error? 

My dependencies are:
Flink: 1.4.2
akka-actor: 2.5.12
akka-stream: 2.5.12
akka-http: 10.1.1

Thank you for your help!

Cheers,
Niels