JobManager is no longer reachable

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

JobManager is no longer reachable

Flavio Pompermaier
Hi to all,

I'm restarting the discussion about a problem I alredy dicussed on this mailing list (but that started with a different subject).
I'm running Flink 0.9.0 on CDH 5.1.3 so I compiled the sources as:

mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos

The problem I'm facing is that the cluster start successfully but when I run my job (from the web-client) I get, after some time, this exception:

16:35:41,636 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@192.168.234.83:6123]
16:35:46,605 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Disconnecting from JobManager: JobManager is no longer reachable
16:35:46,614 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Cancelling all computations and discarding all cached data.
16:35:46,644 INFO  org.apache.flink.runtime.taskmanager.Task                 - Attempting to fail task externally CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36)
16:35:46,669 INFO  org.apache.flink.runtime.taskmanager.Task                 - CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) switched to FAILED with exception.
java.lang.Exception: Disconnecting from JobManager: JobManager is no longer reachable
        at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerDisconnect(TaskManager.scala:741)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$receiveWithLogMessages$1.applyOrElse(TaskManager.scala:267)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:114)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
        at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
        at akka.actor.ActorCell.invoke(ActorCell.scala:486)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
16:35:46,767 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) (57a0ad78726d5ba7255aa87038250c51).

The job instead runs correctly from the IDE (Eclipse). How can I understand/debug what's wrong?

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: JobManager is no longer reachable

Stephan Ewen
Hi Flavio!

Can you post the JobManager's log here? It should have the message about what is going wrong...

Stephan


On Mon, Jun 29, 2015 at 11:43 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,

I'm restarting the discussion about a problem I alredy dicussed on this mailing list (but that started with a different subject).
I'm running Flink 0.9.0 on CDH 5.1.3 so I compiled the sources as:

mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos

The problem I'm facing is that the cluster start successfully but when I run my job (from the web-client) I get, after some time, this exception:

16:35:41,636 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@192.168.234.83:6123]
16:35:46,605 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Disconnecting from JobManager: JobManager is no longer reachable
16:35:46,614 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Cancelling all computations and discarding all cached data.
16:35:46,644 INFO  org.apache.flink.runtime.taskmanager.Task                 - Attempting to fail task externally CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36)
16:35:46,669 INFO  org.apache.flink.runtime.taskmanager.Task                 - CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) switched to FAILED with exception.
java.lang.Exception: Disconnecting from JobManager: JobManager is no longer reachable
        at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerDisconnect(TaskManager.scala:741)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$receiveWithLogMessages$1.applyOrElse(TaskManager.scala:267)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:114)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
        at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
        at akka.actor.ActorCell.invoke(ActorCell.scala:486)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
16:35:46,767 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) (57a0ad78726d5ba7255aa87038250c51).

The job instead runs correctly from the IDE (Eclipse). How can I understand/debug what's wrong?

Best,
Flavio


Reply | Threaded
Open this post in threaded view
|

Re: JobManager is no longer reachable

Stephan Ewen
Hi Flavio!

I had a look at the logs. There seems nothing suspicious - at some point, the TaskManager and JobManager declare each other unreachable.

A pretty common cause for that is that the JVMs stall for a long time due to garbage collection. The JobManager cannot see the difference between a JVM that is irresponsive (due to garbage collection) and a JVM that is dead.

Here is what you can do to prevent long garbage collection stalls:

 - Don't fill the JVMs up to the limit with objects. Give more memory to the JVM, or give less memory to Flink managed memory.
 - Use more JVMs, i.e., a higher parallelism.
 - Use a concurrent garbage collector, like G1.


Greetings,
Stephan


On Mon, Jun 29, 2015 at 12:39 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

Can you post the JobManager's log here? It should have the message about what is going wrong...

Stephan


On Mon, Jun 29, 2015 at 11:43 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,

I'm restarting the discussion about a problem I alredy dicussed on this mailing list (but that started with a different subject).
I'm running Flink 0.9.0 on CDH 5.1.3 so I compiled the sources as:

mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos

The problem I'm facing is that the cluster start successfully but when I run my job (from the web-client) I get, after some time, this exception:

16:35:41,636 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@192.168.234.83:6123]
16:35:46,605 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Disconnecting from JobManager: JobManager is no longer reachable
16:35:46,614 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Cancelling all computations and discarding all cached data.
16:35:46,644 INFO  org.apache.flink.runtime.taskmanager.Task                 - Attempting to fail task externally CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36)
16:35:46,669 INFO  org.apache.flink.runtime.taskmanager.Task                 - CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) switched to FAILED with exception.
java.lang.Exception: Disconnecting from JobManager: JobManager is no longer reachable
        at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerDisconnect(TaskManager.scala:741)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$receiveWithLogMessages$1.applyOrElse(TaskManager.scala:267)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:114)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
        at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
        at akka.actor.ActorCell.invoke(ActorCell.scala:486)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
16:35:46,767 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) (57a0ad78726d5ba7255aa87038250c51).

The job instead runs correctly from the IDE (Eclipse). How can I understand/debug what's wrong?

Best,
Flavio



Reply | Threaded
Open this post in threaded view
|

Re: JobManager is no longer reachable

Flavio Pompermaier
Which file and which JVM options do I have to modify to try options 1 and 3..? 
  1. Don't fill the JVMs up to the limit with objects. Give more memory to the JVM, or give less memory to Flink managed memory
  2. Use more JVMs, i.e., a higher parallelism
  3. Use a concurrent garbage collector, like G1
Actually, when I run the code from Eclipse I see an exception do to an error in the data (because I try to read a URI that contains illegal characters) but I don't think the program reach that point, I don't see anywhere an exception and the error occur later on in the code..

However, all of your options seems related to a scalability problem, where I should add more resources to complete the work...while it works locally in the IDE where I have less resources (except the gc that I use default settings while I don't know if the cluster has some default ones)..isn't it strange?

On Mon, Jun 29, 2015 at 2:29 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

I had a look at the logs. There seems nothing suspicious - at some point, the TaskManager and JobManager declare each other unreachable.

A pretty common cause for that is that the JVMs stall for a long time due to garbage collection. The JobManager cannot see the difference between a JVM that is irresponsive (due to garbage collection) and a JVM that is dead.

Here is what you can do to prevent long garbage collection stalls:

 - Don't fill the JVMs up to the limit with objects. Give more memory to the JVM, or give less memory to Flink managed memory.
 - Use more JVMs, i.e., a higher parallelism.
 - Use a concurrent garbage collector, like G1.


Greetings,
Stephan


On Mon, Jun 29, 2015 at 12:39 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

Can you post the JobManager's log here? It should have the message about what is going wrong...

Stephan


On Mon, Jun 29, 2015 at 11:43 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,

I'm restarting the discussion about a problem I alredy dicussed on this mailing list (but that started with a different subject).
I'm running Flink 0.9.0 on CDH 5.1.3 so I compiled the sources as:

mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos

The problem I'm facing is that the cluster start successfully but when I run my job (from the web-client) I get, after some time, this exception:

16:35:41,636 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@192.168.234.83:6123]
16:35:46,605 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Disconnecting from JobManager: JobManager is no longer reachable
16:35:46,614 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Cancelling all computations and discarding all cached data.
16:35:46,644 INFO  org.apache.flink.runtime.taskmanager.Task                 - Attempting to fail task externally CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36)
16:35:46,669 INFO  org.apache.flink.runtime.taskmanager.Task                 - CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) switched to FAILED with exception.
java.lang.Exception: Disconnecting from JobManager: JobManager is no longer reachable
        at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerDisconnect(TaskManager.scala:741)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$receiveWithLogMessages$1.applyOrElse(TaskManager.scala:267)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:114)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
        at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
        at akka.actor.ActorCell.invoke(ActorCell.scala:486)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
16:35:46,767 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) (57a0ad78726d5ba7255aa87038250c51).

The job instead runs correctly from the IDE (Eclipse). How can I understand/debug what's wrong?

Best,
Flavio





Reply | Threaded
Open this post in threaded view
|

Re: JobManager is no longer reachable

Flavio Pompermaier
I think that actually there's an Exception thrown within the code that I suspect it's not reported anywhere..could it be?

On Mon, Jun 29, 2015 at 3:28 PM, Flavio Pompermaier <[hidden email]> wrote:
Which file and which JVM options do I have to modify to try options 1 and 3..? 
  1. Don't fill the JVMs up to the limit with objects. Give more memory to the JVM, or give less memory to Flink managed memory
  2. Use more JVMs, i.e., a higher parallelism
  3. Use a concurrent garbage collector, like G1
Actually, when I run the code from Eclipse I see an exception do to an error in the data (because I try to read a URI that contains illegal characters) but I don't think the program reach that point, I don't see anywhere an exception and the error occur later on in the code..

However, all of your options seems related to a scalability problem, where I should add more resources to complete the work...while it works locally in the IDE where I have less resources (except the gc that I use default settings while I don't know if the cluster has some default ones)..isn't it strange?

On Mon, Jun 29, 2015 at 2:29 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

I had a look at the logs. There seems nothing suspicious - at some point, the TaskManager and JobManager declare each other unreachable.

A pretty common cause for that is that the JVMs stall for a long time due to garbage collection. The JobManager cannot see the difference between a JVM that is irresponsive (due to garbage collection) and a JVM that is dead.

Here is what you can do to prevent long garbage collection stalls:

 - Don't fill the JVMs up to the limit with objects. Give more memory to the JVM, or give less memory to Flink managed memory.
 - Use more JVMs, i.e., a higher parallelism.
 - Use a concurrent garbage collector, like G1.


Greetings,
Stephan


On Mon, Jun 29, 2015 at 12:39 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

Can you post the JobManager's log here? It should have the message about what is going wrong...

Stephan


On Mon, Jun 29, 2015 at 11:43 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,

I'm restarting the discussion about a problem I alredy dicussed on this mailing list (but that started with a different subject).
I'm running Flink 0.9.0 on CDH 5.1.3 so I compiled the sources as:

mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos

The problem I'm facing is that the cluster start successfully but when I run my job (from the web-client) I get, after some time, this exception:

16:35:41,636 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@192.168.234.83:6123]
16:35:46,605 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Disconnecting from JobManager: JobManager is no longer reachable
16:35:46,614 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Cancelling all computations and discarding all cached data.
16:35:46,644 INFO  org.apache.flink.runtime.taskmanager.Task                 - Attempting to fail task externally CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36)
16:35:46,669 INFO  org.apache.flink.runtime.taskmanager.Task                 - CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) switched to FAILED with exception.
java.lang.Exception: Disconnecting from JobManager: JobManager is no longer reachable
        at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerDisconnect(TaskManager.scala:741)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$receiveWithLogMessages$1.applyOrElse(TaskManager.scala:267)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:114)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
        at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
        at akka.actor.ActorCell.invoke(ActorCell.scala:486)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
16:35:46,767 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) (57a0ad78726d5ba7255aa87038250c51).

The job instead runs correctly from the IDE (Eclipse). How can I understand/debug what's wrong?

Best,
Flavio





Reply | Threaded
Open this post in threaded view
|

Re: JobManager is no longer reachable

Stephan Ewen
Exceptions are swallowed upon canceling (because canceling has usually followup exceptions).

Root error cause exceptions should never be swallowed.

Do you have a specific place in mind where that happens?

On Mon, Jun 29, 2015 at 4:49 PM, Flavio Pompermaier <[hidden email]> wrote:
I think that actually there's an Exception thrown within the code that I suspect it's not reported anywhere..could it be?

On Mon, Jun 29, 2015 at 3:28 PM, Flavio Pompermaier <[hidden email]> wrote:
Which file and which JVM options do I have to modify to try options 1 and 3..? 
  1. Don't fill the JVMs up to the limit with objects. Give more memory to the JVM, or give less memory to Flink managed memory
  2. Use more JVMs, i.e., a higher parallelism
  3. Use a concurrent garbage collector, like G1
Actually, when I run the code from Eclipse I see an exception do to an error in the data (because I try to read a URI that contains illegal characters) but I don't think the program reach that point, I don't see anywhere an exception and the error occur later on in the code..

However, all of your options seems related to a scalability problem, where I should add more resources to complete the work...while it works locally in the IDE where I have less resources (except the gc that I use default settings while I don't know if the cluster has some default ones)..isn't it strange?

On Mon, Jun 29, 2015 at 2:29 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

I had a look at the logs. There seems nothing suspicious - at some point, the TaskManager and JobManager declare each other unreachable.

A pretty common cause for that is that the JVMs stall for a long time due to garbage collection. The JobManager cannot see the difference between a JVM that is irresponsive (due to garbage collection) and a JVM that is dead.

Here is what you can do to prevent long garbage collection stalls:

 - Don't fill the JVMs up to the limit with objects. Give more memory to the JVM, or give less memory to Flink managed memory.
 - Use more JVMs, i.e., a higher parallelism.
 - Use a concurrent garbage collector, like G1.


Greetings,
Stephan


On Mon, Jun 29, 2015 at 12:39 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

Can you post the JobManager's log here? It should have the message about what is going wrong...

Stephan


On Mon, Jun 29, 2015 at 11:43 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,

I'm restarting the discussion about a problem I alredy dicussed on this mailing list (but that started with a different subject).
I'm running Flink 0.9.0 on CDH 5.1.3 so I compiled the sources as:

mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos

The problem I'm facing is that the cluster start successfully but when I run my job (from the web-client) I get, after some time, this exception:

16:35:41,636 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@192.168.234.83:6123]
16:35:46,605 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Disconnecting from JobManager: JobManager is no longer reachable
16:35:46,614 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Cancelling all computations and discarding all cached data.
16:35:46,644 INFO  org.apache.flink.runtime.taskmanager.Task                 - Attempting to fail task externally CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36)
16:35:46,669 INFO  org.apache.flink.runtime.taskmanager.Task                 - CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) switched to FAILED with exception.
java.lang.Exception: Disconnecting from JobManager: JobManager is no longer reachable
        at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerDisconnect(TaskManager.scala:741)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$receiveWithLogMessages$1.applyOrElse(TaskManager.scala:267)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:114)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
        at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
        at akka.actor.ActorCell.invoke(ActorCell.scala:486)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
16:35:46,767 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) (57a0ad78726d5ba7255aa87038250c51).

The job instead runs correctly from the IDE (Eclipse). How can I understand/debug what's wrong?

Best,
Flavio






Reply | Threaded
Open this post in threaded view
|

Re: JobManager is no longer reachable

Flavio Pompermaier
I think that the problem is that the error was caused by a class logging through java.utils.logging and in the have those  logs working I had to put SLF4JBridgeHandler.install(); at the beginning of the main().
Probably this should be documented..actually I don't know why this worked :)

On Tue, Jul 21, 2015 at 3:29 PM, Stephan Ewen <[hidden email]> wrote:
Exceptions are swallowed upon canceling (because canceling has usually followup exceptions).

Root error cause exceptions should never be swallowed.

Do you have a specific place in mind where that happens?

On Mon, Jun 29, 2015 at 4:49 PM, Flavio Pompermaier <[hidden email]> wrote:
I think that actually there's an Exception thrown within the code that I suspect it's not reported anywhere..could it be?

On Mon, Jun 29, 2015 at 3:28 PM, Flavio Pompermaier <[hidden email]> wrote:
Which file and which JVM options do I have to modify to try options 1 and 3..? 
  1. Don't fill the JVMs up to the limit with objects. Give more memory to the JVM, or give less memory to Flink managed memory
  2. Use more JVMs, i.e., a higher parallelism
  3. Use a concurrent garbage collector, like G1
Actually, when I run the code from Eclipse I see an exception do to an error in the data (because I try to read a URI that contains illegal characters) but I don't think the program reach that point, I don't see anywhere an exception and the error occur later on in the code..

However, all of your options seems related to a scalability problem, where I should add more resources to complete the work...while it works locally in the IDE where I have less resources (except the gc that I use default settings while I don't know if the cluster has some default ones)..isn't it strange?

On Mon, Jun 29, 2015 at 2:29 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

I had a look at the logs. There seems nothing suspicious - at some point, the TaskManager and JobManager declare each other unreachable.

A pretty common cause for that is that the JVMs stall for a long time due to garbage collection. The JobManager cannot see the difference between a JVM that is irresponsive (due to garbage collection) and a JVM that is dead.

Here is what you can do to prevent long garbage collection stalls:

 - Don't fill the JVMs up to the limit with objects. Give more memory to the JVM, or give less memory to Flink managed memory.
 - Use more JVMs, i.e., a higher parallelism.
 - Use a concurrent garbage collector, like G1.


Greetings,
Stephan


On Mon, Jun 29, 2015 at 12:39 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

Can you post the JobManager's log here? It should have the message about what is going wrong...

Stephan


On Mon, Jun 29, 2015 at 11:43 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,

I'm restarting the discussion about a problem I alredy dicussed on this mailing list (but that started with a different subject).
I'm running Flink 0.9.0 on CDH 5.1.3 so I compiled the sources as:

mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos

The problem I'm facing is that the cluster start successfully but when I run my job (from the web-client) I get, after some time, this exception:

16:35:41,636 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@192.168.234.83:6123]
16:35:46,605 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Disconnecting from JobManager: JobManager is no longer reachable
16:35:46,614 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Cancelling all computations and discarding all cached data.
16:35:46,644 INFO  org.apache.flink.runtime.taskmanager.Task                 - Attempting to fail task externally CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36)
16:35:46,669 INFO  org.apache.flink.runtime.taskmanager.Task                 - CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) switched to FAILED with exception.
java.lang.Exception: Disconnecting from JobManager: JobManager is no longer reachable
        at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerDisconnect(TaskManager.scala:741)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$receiveWithLogMessages$1.applyOrElse(TaskManager.scala:267)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:114)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
        at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
        at akka.actor.ActorCell.invoke(ActorCell.scala:486)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
16:35:46,767 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) (57a0ad78726d5ba7255aa87038250c51).

The job instead runs correctly from the IDE (Eclipse). How can I understand/debug what's wrong?

Best,
Flavio







Reply | Threaded
Open this post in threaded view
|

Re: JobManager is no longer reachable

Stephan Ewen
Do you know which component logs via java.utils.logging?

On Tue, Jul 21, 2015 at 4:02 PM, Flavio Pompermaier <[hidden email]> wrote:
I think that the problem is that the error was caused by a class logging through java.utils.logging and in the have those  logs working I had to put SLF4JBridgeHandler.install(); at the beginning of the main().
Probably this should be documented..actually I don't know why this worked :)


On Tue, Jul 21, 2015 at 3:29 PM, Stephan Ewen <[hidden email]> wrote:
Exceptions are swallowed upon canceling (because canceling has usually followup exceptions).

Root error cause exceptions should never be swallowed.

Do you have a specific place in mind where that happens?

On Mon, Jun 29, 2015 at 4:49 PM, Flavio Pompermaier <[hidden email]> wrote:
I think that actually there's an Exception thrown within the code that I suspect it's not reported anywhere..could it be?

On Mon, Jun 29, 2015 at 3:28 PM, Flavio Pompermaier <[hidden email]> wrote:
Which file and which JVM options do I have to modify to try options 1 and 3..? 
  1. Don't fill the JVMs up to the limit with objects. Give more memory to the JVM, or give less memory to Flink managed memory
  2. Use more JVMs, i.e., a higher parallelism
  3. Use a concurrent garbage collector, like G1
Actually, when I run the code from Eclipse I see an exception do to an error in the data (because I try to read a URI that contains illegal characters) but I don't think the program reach that point, I don't see anywhere an exception and the error occur later on in the code..

However, all of your options seems related to a scalability problem, where I should add more resources to complete the work...while it works locally in the IDE where I have less resources (except the gc that I use default settings while I don't know if the cluster has some default ones)..isn't it strange?

On Mon, Jun 29, 2015 at 2:29 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

I had a look at the logs. There seems nothing suspicious - at some point, the TaskManager and JobManager declare each other unreachable.

A pretty common cause for that is that the JVMs stall for a long time due to garbage collection. The JobManager cannot see the difference between a JVM that is irresponsive (due to garbage collection) and a JVM that is dead.

Here is what you can do to prevent long garbage collection stalls:

 - Don't fill the JVMs up to the limit with objects. Give more memory to the JVM, or give less memory to Flink managed memory.
 - Use more JVMs, i.e., a higher parallelism.
 - Use a concurrent garbage collector, like G1.


Greetings,
Stephan


On Mon, Jun 29, 2015 at 12:39 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

Can you post the JobManager's log here? It should have the message about what is going wrong...

Stephan


On Mon, Jun 29, 2015 at 11:43 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,

I'm restarting the discussion about a problem I alredy dicussed on this mailing list (but that started with a different subject).
I'm running Flink 0.9.0 on CDH 5.1.3 so I compiled the sources as:

mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos

The problem I'm facing is that the cluster start successfully but when I run my job (from the web-client) I get, after some time, this exception:

16:35:41,636 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@192.168.234.83:6123]
16:35:46,605 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Disconnecting from JobManager: JobManager is no longer reachable
16:35:46,614 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Cancelling all computations and discarding all cached data.
16:35:46,644 INFO  org.apache.flink.runtime.taskmanager.Task                 - Attempting to fail task externally CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36)
16:35:46,669 INFO  org.apache.flink.runtime.taskmanager.Task                 - CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) switched to FAILED with exception.
java.lang.Exception: Disconnecting from JobManager: JobManager is no longer reachable
        at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerDisconnect(TaskManager.scala:741)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$receiveWithLogMessages$1.applyOrElse(TaskManager.scala:267)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:114)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
        at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
        at akka.actor.ActorCell.invoke(ActorCell.scala:486)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
16:35:46,767 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) (57a0ad78726d5ba7255aa87038250c51).

The job instead runs correctly from the IDE (Eclipse). How can I understand/debug what's wrong?

Best,
Flavio








Reply | Threaded
Open this post in threaded view
|

Re: JobManager is no longer reachable

Flavio Pompermaier
It was a class contained in a dependency I call in a mapPartition or map function..

On Tue, Jul 21, 2015 at 4:09 PM, Stephan Ewen <[hidden email]> wrote:
Do you know which component logs via java.utils.logging?

On Tue, Jul 21, 2015 at 4:02 PM, Flavio Pompermaier <[hidden email]> wrote:
I think that the problem is that the error was caused by a class logging through java.utils.logging and in the have those  logs working I had to put SLF4JBridgeHandler.install(); at the beginning of the main().
Probably this should be documented..actually I don't know why this worked :)


On Tue, Jul 21, 2015 at 3:29 PM, Stephan Ewen <[hidden email]> wrote:
Exceptions are swallowed upon canceling (because canceling has usually followup exceptions).

Root error cause exceptions should never be swallowed.

Do you have a specific place in mind where that happens?

On Mon, Jun 29, 2015 at 4:49 PM, Flavio Pompermaier <[hidden email]> wrote:
I think that actually there's an Exception thrown within the code that I suspect it's not reported anywhere..could it be?

On Mon, Jun 29, 2015 at 3:28 PM, Flavio Pompermaier <[hidden email]> wrote:
Which file and which JVM options do I have to modify to try options 1 and 3..? 
  1. Don't fill the JVMs up to the limit with objects. Give more memory to the JVM, or give less memory to Flink managed memory
  2. Use more JVMs, i.e., a higher parallelism
  3. Use a concurrent garbage collector, like G1
Actually, when I run the code from Eclipse I see an exception do to an error in the data (because I try to read a URI that contains illegal characters) but I don't think the program reach that point, I don't see anywhere an exception and the error occur later on in the code..

However, all of your options seems related to a scalability problem, where I should add more resources to complete the work...while it works locally in the IDE where I have less resources (except the gc that I use default settings while I don't know if the cluster has some default ones)..isn't it strange?

On Mon, Jun 29, 2015 at 2:29 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

I had a look at the logs. There seems nothing suspicious - at some point, the TaskManager and JobManager declare each other unreachable.

A pretty common cause for that is that the JVMs stall for a long time due to garbage collection. The JobManager cannot see the difference between a JVM that is irresponsive (due to garbage collection) and a JVM that is dead.

Here is what you can do to prevent long garbage collection stalls:

 - Don't fill the JVMs up to the limit with objects. Give more memory to the JVM, or give less memory to Flink managed memory.
 - Use more JVMs, i.e., a higher parallelism.
 - Use a concurrent garbage collector, like G1.


Greetings,
Stephan


On Mon, Jun 29, 2015 at 12:39 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

Can you post the JobManager's log here? It should have the message about what is going wrong...

Stephan


On Mon, Jun 29, 2015 at 11:43 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,

I'm restarting the discussion about a problem I alredy dicussed on this mailing list (but that started with a different subject).
I'm running Flink 0.9.0 on CDH 5.1.3 so I compiled the sources as:

mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos

The problem I'm facing is that the cluster start successfully but when I run my job (from the web-client) I get, after some time, this exception:

16:35:41,636 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@192.168.234.83:6123]
16:35:46,605 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Disconnecting from JobManager: JobManager is no longer reachable
16:35:46,614 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Cancelling all computations and discarding all cached data.
16:35:46,644 INFO  org.apache.flink.runtime.taskmanager.Task                 - Attempting to fail task externally CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36)
16:35:46,669 INFO  org.apache.flink.runtime.taskmanager.Task                 - CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) switched to FAILED with exception.
java.lang.Exception: Disconnecting from JobManager: JobManager is no longer reachable
        at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerDisconnect(TaskManager.scala:741)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$receiveWithLogMessages$1.applyOrElse(TaskManager.scala:267)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:114)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
        at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
        at akka.actor.ActorCell.invoke(ActorCell.scala:486)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
16:35:46,767 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) (57a0ad78726d5ba7255aa87038250c51).

The job instead runs correctly from the IDE (Eclipse). How can I understand/debug what's wrong?

Best,
Flavio








Reply | Threaded
Open this post in threaded view
|

Re: JobManager is no longer reachable

Stephan Ewen
Okay. User-code logging behavior is impossible for Flink to control.

I think the right solution is, as you did, manually trigger a bridging from java.util-logging to slf4j.


On Tue, Jul 21, 2015 at 5:27 PM, Flavio Pompermaier <[hidden email]> wrote:
It was a class contained in a dependency I call in a mapPartition or map function..

On Tue, Jul 21, 2015 at 4:09 PM, Stephan Ewen <[hidden email]> wrote:
Do you know which component logs via java.utils.logging?

On Tue, Jul 21, 2015 at 4:02 PM, Flavio Pompermaier <[hidden email]> wrote:
I think that the problem is that the error was caused by a class logging through java.utils.logging and in the have those  logs working I had to put SLF4JBridgeHandler.install(); at the beginning of the main().
Probably this should be documented..actually I don't know why this worked :)


On Tue, Jul 21, 2015 at 3:29 PM, Stephan Ewen <[hidden email]> wrote:
Exceptions are swallowed upon canceling (because canceling has usually followup exceptions).

Root error cause exceptions should never be swallowed.

Do you have a specific place in mind where that happens?

On Mon, Jun 29, 2015 at 4:49 PM, Flavio Pompermaier <[hidden email]> wrote:
I think that actually there's an Exception thrown within the code that I suspect it's not reported anywhere..could it be?

On Mon, Jun 29, 2015 at 3:28 PM, Flavio Pompermaier <[hidden email]> wrote:
Which file and which JVM options do I have to modify to try options 1 and 3..? 
  1. Don't fill the JVMs up to the limit with objects. Give more memory to the JVM, or give less memory to Flink managed memory
  2. Use more JVMs, i.e., a higher parallelism
  3. Use a concurrent garbage collector, like G1
Actually, when I run the code from Eclipse I see an exception do to an error in the data (because I try to read a URI that contains illegal characters) but I don't think the program reach that point, I don't see anywhere an exception and the error occur later on in the code..

However, all of your options seems related to a scalability problem, where I should add more resources to complete the work...while it works locally in the IDE where I have less resources (except the gc that I use default settings while I don't know if the cluster has some default ones)..isn't it strange?

On Mon, Jun 29, 2015 at 2:29 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

I had a look at the logs. There seems nothing suspicious - at some point, the TaskManager and JobManager declare each other unreachable.

A pretty common cause for that is that the JVMs stall for a long time due to garbage collection. The JobManager cannot see the difference between a JVM that is irresponsive (due to garbage collection) and a JVM that is dead.

Here is what you can do to prevent long garbage collection stalls:

 - Don't fill the JVMs up to the limit with objects. Give more memory to the JVM, or give less memory to Flink managed memory.
 - Use more JVMs, i.e., a higher parallelism.
 - Use a concurrent garbage collector, like G1.


Greetings,
Stephan


On Mon, Jun 29, 2015 at 12:39 PM, Stephan Ewen <[hidden email]> wrote:
Hi Flavio!

Can you post the JobManager's log here? It should have the message about what is going wrong...

Stephan


On Mon, Jun 29, 2015 at 11:43 AM, Flavio Pompermaier <[hidden email]> wrote:
Hi to all,

I'm restarting the discussion about a problem I alredy dicussed on this mailing list (but that started with a different subject).
I'm running Flink 0.9.0 on CDH 5.1.3 so I compiled the sources as:

mvn clean  install -Dhadoop.version=2.3.0-cdh5.1.3 -Dhbase.version=0.98.1-cdh5.1.3 -Dhadoop.core.version=2.3.0-mr1-cdh5.1.3 -DskipTests -Pvendor-repos

The problem I'm facing is that the cluster start successfully but when I run my job (from the web-client) I get, after some time, this exception:

16:35:41,636 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@192.168.234.83:6123]
16:35:46,605 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Disconnecting from JobManager: JobManager is no longer reachable
16:35:46,614 INFO  org.apache.flink.runtime.taskmanager.TaskManager   - Cancelling all computations and discarding all cached data.
16:35:46,644 INFO  org.apache.flink.runtime.taskmanager.Task                 - Attempting to fail task externally CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36)
16:35:46,669 INFO  org.apache.flink.runtime.taskmanager.Task                 - CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) switched to FAILED with exception.
java.lang.Exception: Disconnecting from JobManager: JobManager is no longer reachable
        at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerDisconnect(TaskManager.scala:741)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$receiveWithLogMessages$1.applyOrElse(TaskManager.scala:267)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:114)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
        at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
        at akka.actor.ActorCell.invoke(ActorCell.scala:486)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
16:35:46,767 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code CHAIN GroupReduce (GroupReduce at compactDataSources(MyClass.java:213)) -> Combine(Distinct at compactDataSources(MyClass.java:213)) (8/36) (57a0ad78726d5ba7255aa87038250c51).

The job instead runs correctly from the IDE (Eclipse). How can I understand/debug what's wrong?

Best,
Flavio