Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

Miguel Coimbra
​Hello,

I am running into a situation where the Flink threads responsible for my operator execution are all stuck on WAITING mode.
Before anything else, this is my machine's spec:

Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz GenuineIntel GNU/Linux
256 GB RAM

I am running in local mode on a machine with a considerable amount of memory, so perhaps that may be triggering some execution edge-case?

Moving on, this is my Java:

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)


Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT with LocalEnvironment on this large-memory machine, with parallelism set to one:

Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
ExecutionEnvironment env = lenv;
env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
env.setParallelism(1);

This initializes the execution environment for a series of sequential jobs (any data dependency between jobs is flushed to disk on job i and read back from disk into a DataSet in job i + 1).
To reiterate, I am not launching a Flink cluster, I am just executing in local mode from a code base compiled with Maven.

I have tested this program via mvn exec:exec with different values of memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and the result is always the same: the process' memory fills up completely and then the process' CPU usage drops to 0%.
This is strange because if it was lack of memory, I would expect an OutOfMemoryError.

I have debugged with IntelliJ IDEA and obtained thread dumps from different executions, and realized quite a few operator threads are stuck on java.lang.Thread.State: WAITING.

There are four major threads that I find to be in this waiting state.
The thread dumps I obtained show me where the wait calls originated:

Number 1:

"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)


Number 2:

"Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

Number 3:

"Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 tid=0xd75 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

Number 4:

"Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
  java.lang.Thread.State: WAITING
      at sun.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
      at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
      at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
      at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:519)
      at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:231)
      - locked <0x23eb> (a java.lang.Object)
      at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
      at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
      at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
      at my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(Misc.java:103)
      at my.package.algorithm.Sample.computeApproximateDeltaFast(Sample.java:492)
      at my.package.algorithm.Sample.run(Sample.java:291).
      at java.lang.Thread.run(Thread.java:748)


While I realize these dumps on their own may not be helpful, they at least (as far as I know) indicate that the threads are all waiting on something.
But if it was resource scarcity I believe the program would terminate with an exception.
And if it was garbage collection activity, I believe the JVM process would not be at 0% CPU usage.

Note: I realize I didn't provide the user-code code that generates the execution plan for Flink which led to the contexts in which the threads are waiting, but I hope it may not be necessary.
My problem now is that I am unsure on how to proceed to further debug this issue:
- The assigned memory is fully used, but there are no exceptions about lack of memory.
- The CPU usage is at 0% and all threads are all in a waiting state, but I don't understand what signal they're waiting for exactly.

Hoping anyone might be able to give me a hint.

Thank you very much for your time.

Best regards,

Miguel E. Coimbra
Reply | Threaded
Open this post in threaded view
|

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

Chesnay Schepler
Hello,

Thread #1-3 are waiting for input, Thread #4 is waiting for the job to finish.

To further debug this I would look into what the preceding operators are doing, whether they are blocked on something or are emitting records (which you can check in the UI/metrics).

On 15.04.2018 18:40, Miguel Coimbra wrote:
​Hello,

I am running into a situation where the Flink threads responsible for my operator execution are all stuck on WAITING mode.
Before anything else, this is my machine's spec:

Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz GenuineIntel GNU/Linux
256 GB RAM

I am running in local mode on a machine with a considerable amount of memory, so perhaps that may be triggering some execution edge-case?

Moving on, this is my Java:

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)


Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT with LocalEnvironment on this large-memory machine, with parallelism set to one:

Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
ExecutionEnvironment env = lenv;
env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
env.setParallelism(1);

This initializes the execution environment for a series of sequential jobs (any data dependency between jobs is flushed to disk on job i and read back from disk into a DataSet in job i + 1).
To reiterate, I am not launching a Flink cluster, I am just executing in local mode from a code base compiled with Maven.

I have tested this program via mvn exec:exec with different values of memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and the result is always the same: the process' memory fills up completely and then the process' CPU usage drops to 0%.
This is strange because if it was lack of memory, I would expect an OutOfMemoryError.

I have debugged with IntelliJ IDEA and obtained thread dumps from different executions, and realized quite a few operator threads are stuck on java.lang.Thread.State: WAITING.

There are four major threads that I find to be in this waiting state.
The thread dumps I obtained show me where the wait calls originated:

Number 1:

"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)


Number 2:

"Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

Number 3:

"Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 tid=0xd75 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

Number 4:

"Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
  java.lang.Thread.State: WAITING
      at sun.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
      at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
      at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
      at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:519)
      at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:231)
      - locked <0x23eb> (a java.lang.Object)
      at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
      at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
      at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
      at my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(Misc.java:103)
      at my.package.algorithm.Sample.computeApproximateDeltaFast(Sample.java:492)
      at my.package.algorithm.Sample.run(Sample.java:291).
      at java.lang.Thread.run(Thread.java:748)


While I realize these dumps on their own may not be helpful, they at least (as far as I know) indicate that the threads are all waiting on something.
But if it was resource scarcity I believe the program would terminate with an exception.
And if it was garbage collection activity, I believe the JVM process would not be at 0% CPU usage.

Note: I realize I didn't provide the user-code code that generates the execution plan for Flink which led to the contexts in which the threads are waiting, but I hope it may not be necessary.
My problem now is that I am unsure on how to proceed to further debug this issue:
- The assigned memory is fully used, but there are no exceptions about lack of memory.
- The CPU usage is at 0% and all threads are all in a waiting state, but I don't understand what signal they're waiting for exactly.

Hoping anyone might be able to give me a hint.

Thank you very much for your time.

Best regards,

Miguel E. Coimbra


Reply | Threaded
Open this post in threaded view
|

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

Miguel Coimbra
In reply to this post by Miguel Coimbra
Thanks for the suggestions Chesnay, I will try them out.

However, I have already tried your suggestion with the dependency flink-runtime-web and nothing happened.
If I understood you correctly, adding that dependency in the pom.xml would make it so the web front-end is running when I call the following line?

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

I added flink-runtime-web  in my pom.xml, recompiled and launched the program but I simply got "Unable to connect" in my browser (Firefox) on localhost:8081.
Performing wget on localhost:8081 resulted in this:

$ wget localhost:8081
--2018-04-16 12:47:26--  http://localhost:8081/
Resolving localhost (localhost)... ::1, 127.0.0.1
Connecting to localhost (localhost)|::1|:8081... failed: Connection refused.
Connecting to localhost (localhost)|127.0.0.1|:8081... failed: Connection refused.


It seems something was bound to localhost:8081 but the connection is not working for some reason.
I probably am skipping some important detail.
These are some of my dependencies:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-gelly_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-gelly-examples_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
</dependency>


Have you managed to get the web front-end in local mode?


Best regards,

Miguel E. Coimbra
Email: [hidden email]


On 16 April 2018 at 05:12, Chesnay Schepler <[hidden email]> wrote:
The thing with createLocalEnvironmentWithWebUI is that it requires flink-runtime-web to be on the classpath, which is rarely the class when running things in the IDE.
It should work fine in the IDE if you add it as a dependency to your project. This should've been logged as a warning.

Chaining is unrelated to this issue as join operators are never chained to one another.
Lambda functions are also not the issue, if they were the job would fail much earlier.

It is reasonable that T3 is blocked if T1 is blocked. T1 gets no input hence produces no output, which now also blocks T3.

There are multiple possible explanations i can come up with:
* the preceding operators are blocked on something or really slow
* the preceding operators are actually finished, but aren't shutting down due to an implementation error
* a deadlock in Flink's join logic
* a deadlock in Flink's network stack

For the first 2 we will have to consult the UI or logs. You said you were dumping the input DataSets into files, but were they actually complete?

A deadlock in the network stack should appear as all existing operator threads being blocked.
We can probably rule out a problem with the join logic by removing the second join and trying again.



On 16.04.2018 03:10, Miguel Coimbra wrote:
Hello,

It would seem that the function which is supposed to launch local mode with the web front-end doesn't launch the front-end at all...
This function seems not to be doing what it is supposed to do, if I'm not mistaken:

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

Regarding the preceding operators, the thread dumps I got were pointing to a specific set of operations over DataSet instances that were passed into my function.
Below I show the code segment and put the lines where threads are waiting in bold:

public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final Graph<K, VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) {
    return vertices
            .joinWithHuge(originalGraph.getEdges())
            .where(0).equalTo(0)
            .with((source, edge) -> edge) // Thread 1 is blocked here
            .returns(originalGraph.getEdges().getType())
            .join(vertices)
            .where(1).equalTo(0)
            .with((e, v) -> e) // Thread 3 is blocked here
            .returns(originalGraph.getEdges().getType())
            .distinct(0, 1);
}


Note: the edges inside the graph originalGraph edge DataSet are much greater in number than the elements of the vertices DataSet, so I believe that function is being used correctly.

I will try testing with remote (cluster) mode to have access to the web front-end, but I have some questions for now:

- The fact that they are blocked in different ​JoinOperator instances that are chained, is this a result of Flink's default pipeline mechanism?
- Could there be a problem stemming from the fact they are both waiting on lambdas?
- I have tried dumping both DataSet variables originalGraph and vertices into files (the ones being used in this code), and they produced correct values (non-empty files), so I don't have a clue what the threads inside Flink's runtime are waiting on.

​Thanks for the help so far Chesnay.​


Miguel E. Coimbra
Email: [hidden email]


---------- Forwarded message ----------
From: Chesnay Schepler <[hidden email]>
To: [hidden email]
Cc: 
Bcc: 
Date: Sun, 15 Apr 2018 18:54:33 +0200
Subject: Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING
Hello,

Thread #1-3 are waiting for input, Thread #4 is waiting for the job to finish.

To further debug this I would look into what the preceding operators are doing, whether they are blocked on something or are emitting records (which you can check in the UI/metrics).

On 15.04.2018 18:40, Miguel Coimbra wrote:
​Hello,

I am running into a situation where the Flink threads responsible for my operator execution are all stuck on WAITING mode.
Before anything else, this is my machine's spec:

Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz GenuineIntel GNU/Linux
256 GB RAM

I am running in local mode on a machine with a considerable amount of memory, so perhaps that may be triggering some execution edge-case?

Moving on, this is my Java:

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)


Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT with LocalEnvironment on this large-memory machine, with parallelism set to one:

Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
ExecutionEnvironment env = lenv;
env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
env.setParallelism(1);

This initializes the execution environment for a series of sequential jobs (any data dependency between jobs is flushed to disk on job i and read back from disk into a DataSet in job i + 1).
To reiterate, I am not launching a Flink cluster, I am just executing in local mode from a code base compiled with Maven.

I have tested this program via mvn exec:exec with different values of memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and the result is always the same: the process' memory fills up completely and then the process' CPU usage drops to 0%.
This is strange because if it was lack of memory, I would expect an OutOfMemoryError.

I have debugged with IntelliJ IDEA and obtained thread dumps from different executions, and realized quite a few operator threads are stuck on java.lang.Thread.State: WAITING.

There are four major threads that I find to be in this waiting state.
The thread dumps I obtained show me where the wait calls originated:

Number 1:

"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)


Number 2:

"Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

Number 3:

"Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 tid=0xd75 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

Number 4:

"Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
  java.lang.Thread.State: WAITING
      at sun.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
      at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
      at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
      at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:519)
      at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:231)
      - locked <0x23eb> (a java.lang.Object)
      at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
      at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
      at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
      at my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(Misc.java:103)
      at my.package.algorithm.Sample.computeApproximateDeltaFast(Sample.java:492)
      at my.package.algorithm.Sample.run(Sample.java:291).
      at java.lang.Thread.run(Thread.java:748)


While I realize these dumps on their own may not be helpful, they at least (as far as I know) indicate that the threads are all waiting on something.
But if it was resource scarcity I believe the program would terminate with an exception.
And if it was garbage collection activity, I believe the JVM process would not be at 0% CPU usage.

Note: I realize I didn't provide the user-code code that generates the execution plan for Flink which led to the contexts in which the threads are waiting, but I hope it may not be necessary.
My problem now is that I am unsure on how to proceed to further debug this issue:
- The assigned memory is fully used, but there are no exceptions about lack of memory.
- The CPU usage is at 0% and all threads are all in a waiting state, but I don't understand what signal they're waiting for exactly.

Hoping anyone might be able to give me a hint.

Thank you very much for your time.

Best regards,

Miguel E. Coimbra




Reply | Threaded
Open this post in threaded view
|

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

Chesnay Schepler
In reply to this post by Miguel Coimbra
ah yes, currently when you use that method the UI is started on a random port. I'm currently fixing that in this PR that will be merged today. For now you will enable logging and search for something along the lines of "<a class="moz-txt-link-freetext" href="http://">http://<host>:<port> was granted leadership"

Sorry for the inconvenience.

On 16.04.2018 15:04, Miguel Coimbra wrote:
Thanks for the suggestions Chesnay, I will try them out.

However, I have already tried your suggestion with the dependency flink-runtime-web and nothing happened.
If I understood you correctly, adding that dependency in the pom.xml would make it so the web front-end is running when I call the following line?

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

I added flink-runtime-web  in my pom.xml, recompiled and launched the program but I simply got "Unable to connect" in my browser (Firefox) on localhost:8081.
Performing wget on localhost:8081 resulted in this:

$ wget localhost:8081
--2018-04-16 12:47:26--  http://localhost:8081/
Resolving localhost (localhost)... ::1, 127.0.0.1
Connecting to localhost (localhost)|::1|:8081... failed: Connection refused.
Connecting to localhost (localhost)|127.0.0.1|:8081... failed: Connection refused.


It seems something was bound to localhost:8081 but the connection is not working for some reason.
I probably am skipping some important detail.
These are some of my dependencies:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-gelly_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-gelly-examples_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
</dependency>


Have you managed to get the web front-end in local mode?


Best regards,

Miguel E. Coimbra
Email: [hidden email]


On 16 April 2018 at 05:12, Chesnay Schepler <[hidden email]> wrote:
The thing with createLocalEnvironmentWithWebUI is that it requires flink-runtime-web to be on the classpath, which is rarely the class when running things in the IDE.
It should work fine in the IDE if you add it as a dependency to your project. This should've been logged as a warning.

Chaining is unrelated to this issue as join operators are never chained to one another.
Lambda functions are also not the issue, if they were the job would fail much earlier.

It is reasonable that T3 is blocked if T1 is blocked. T1 gets no input hence produces no output, which now also blocks T3.

There are multiple possible explanations i can come up with:
* the preceding operators are blocked on something or really slow
* the preceding operators are actually finished, but aren't shutting down due to an implementation error
* a deadlock in Flink's join logic
* a deadlock in Flink's network stack

For the first 2 we will have to consult the UI or logs. You said you were dumping the input DataSets into files, but were they actually complete?

A deadlock in the network stack should appear as all existing operator threads being blocked.
We can probably rule out a problem with the join logic by removing the second join and trying again.



On 16.04.2018 03:10, Miguel Coimbra wrote:
Hello,

It would seem that the function which is supposed to launch local mode with the web front-end doesn't launch the front-end at all...
This function seems not to be doing what it is supposed to do, if I'm not mistaken:

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

Regarding the preceding operators, the thread dumps I got were pointing to a specific set of operations over DataSet instances that were passed into my function.
Below I show the code segment and put the lines where threads are waiting in bold:

public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final Graph<K, VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) {
    return vertices
            .joinWithHuge(originalGraph.getEdges())
            .where(0).equalTo(0)
            .with((source, edge) -> edge) // Thread 1 is blocked here
            .returns(originalGraph.getEdges().getType())
            .join(vertices)
            .where(1).equalTo(0)
            .with((e, v) -> e) // Thread 3 is blocked here
            .returns(originalGraph.getEdges().getType())
            .distinct(0, 1);
}


Note: the edges inside the graph originalGraph edge DataSet are much greater in number than the elements of the vertices DataSet, so I believe that function is being used correctly.

I will try testing with remote (cluster) mode to have access to the web front-end, but I have some questions for now:

- The fact that they are blocked in different ​JoinOperator instances that are chained, is this a result of Flink's default pipeline mechanism?
- Could there be a problem stemming from the fact they are both waiting on lambdas?
- I have tried dumping both DataSet variables originalGraph and vertices into files (the ones being used in this code), and they produced correct values (non-empty files), so I don't have a clue what the threads inside Flink's runtime are waiting on.

​Thanks for the help so far Chesnay.​


Miguel E. Coimbra
Email: [hidden email]


---------- Forwarded message ----------
From: Chesnay Schepler <[hidden email]>
To: [hidden email]
Cc: 
Bcc: 
Date: Sun, 15 Apr 2018 18:54:33 +0200
Subject: Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING
Hello,

Thread #1-3 are waiting for input, Thread #4 is waiting for the job to finish.

To further debug this I would look into what the preceding operators are doing, whether they are blocked on something or are emitting records (which you can check in the UI/metrics).

On 15.04.2018 18:40, Miguel Coimbra wrote:
​Hello,

I am running into a situation where the Flink threads responsible for my operator execution are all stuck on WAITING mode.
Before anything else, this is my machine's spec:

Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz GenuineIntel GNU/Linux
256 GB RAM

I am running in local mode on a machine with a considerable amount of memory, so perhaps that may be triggering some execution edge-case?

Moving on, this is my Java:

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)


Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT with LocalEnvironment on this large-memory machine, with parallelism set to one:

Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
ExecutionEnvironment env = lenv;
env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
env.setParallelism(1);

This initializes the execution environment for a series of sequential jobs (any data dependency between jobs is flushed to disk on job i and read back from disk into a DataSet in job i + 1).
To reiterate, I am not launching a Flink cluster, I am just executing in local mode from a code base compiled with Maven.

I have tested this program via mvn exec:exec with different values of memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and the result is always the same: the process' memory fills up completely and then the process' CPU usage drops to 0%.
This is strange because if it was lack of memory, I would expect an OutOfMemoryError.

I have debugged with IntelliJ IDEA and obtained thread dumps from different executions, and realized quite a few operator threads are stuck on java.lang.Thread.State: WAITING.

There are four major threads that I find to be in this waiting state.
The thread dumps I obtained show me where the wait calls originated:

Number 1:

"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)


Number 2:

"Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

Number 3:

"Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 tid=0xd75 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

Number 4:

"Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
  java.lang.Thread.State: WAITING
      at sun.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
      at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
      at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
      at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:519)
      at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:231)
      - locked <0x23eb> (a java.lang.Object)
      at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
      at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
      at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
      at my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(Misc.java:103)
      at my.package.algorithm.Sample.computeApproximateDeltaFast(Sample.java:492)
      at my.package.algorithm.Sample.run(Sample.java:291).
      at java.lang.Thread.run(Thread.java:748)


While I realize these dumps on their own may not be helpful, they at least (as far as I know) indicate that the threads are all waiting on something.
But if it was resource scarcity I believe the program would terminate with an exception.
And if it was garbage collection activity, I believe the JVM process would not be at 0% CPU usage.

Note: I realize I didn't provide the user-code code that generates the execution plan for Flink which led to the contexts in which the threads are waiting, but I hope it may not be necessary.
My problem now is that I am unsure on how to proceed to further debug this issue:
- The assigned memory is fully used, but there are no exceptions about lack of memory.
- The CPU usage is at 0% and all threads are all in a waiting state, but I don't understand what signal they're waiting for exactly.

Hoping anyone might be able to give me a hint.

Thank you very much for your time.

Best regards,

Miguel E. Coimbra




Reply | Threaded
Open this post in threaded view
|

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

Miguel Coimbra
Chesnay, following your suggestions I got access to the web interface and also took a closer look at the debugging logs.
I have noticed one problem regarding the web interface port - it keeps changing port now and then during my Java program's execution.

Not sure if that is due to my program launching several job executions sequentially, but the fact is that it happened.
Since I am accessing the web interface via tunneling, it becomes rather cumbersome to keep adapting it.

Another particular problem I'm noticing is that this exception frequently pops up (debugging with log4j):

00:17:54,368 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing slot with slot request id 9055ef473251505dac04c99727106dc9.
org.apache.flink.util.FlinkException: Slot is being returned to the SlotPool.
        at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.returnAllocatedSlot(SlotPool.java:1521)
        at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$releaseSlot$0(SingleLogicalSlot.java:130)
        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
        at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834)
        at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155)
        at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
        at org.apache.flink.runtime.executiongraph.Execution.releaseAssignedResource(Execution.java:1239)
        at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:946)
        at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1588)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:593)
        at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
        at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Don't know if the internals of Flink are explicitly using an exception for control flow, but there are several occurrences of this as time goes by.

Regarding my program itself, I've achieved some progress.
In my program I need to do a sequence of series of Flink jobs, and need extra care to make sure no DataSet instance from job i is being used in an operator in job i + 1.
I believe this was generating the waiting scenarios I describe in an earlier email.
The bottom line is to be extra careful about when job executions are actually triggered and to make sure that a DataSet which will need to be used in different Flink jobs is available for example as a file in secondary storage (possibly masked as a memory-mapping) and is exclusively read from that source.
This means ensuring the job that originally produces a DataSet (for reuse on a later job) assigns to it a DataSink for secondary storage.

I'm going to keep digging taking this in account - if will report back if I manage to fix everything or find a new problem.

Thanks again,



Miguel E. Coimbra
Email: [hidden email]


On 16 April 2018 at 10:26, Chesnay Schepler <[hidden email]> wrote:
ah yes, currently when you use that method the UI is started on a random port. I'm currently fixing that in this PR that will be merged today. For now you will enable logging and search for something along the lines of "http://<host>:<port> was granted leadership"

Sorry for the inconvenience.

On 16.04.2018 15:04, Miguel Coimbra wrote:
Thanks for the suggestions Chesnay, I will try them out.

However, I have already tried your suggestion with the dependency flink-runtime-web and nothing happened.
If I understood you correctly, adding that dependency in the pom.xml would make it so the web front-end is running when I call the following line?

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

I added flink-runtime-web  in my pom.xml, recompiled and launched the program but I simply got "Unable to connect" in my browser (Firefox) on localhost:8081.
Performing wget on localhost:8081 resulted in this:

$ wget localhost:8081
--2018-04-16 12:47:26--  http://localhost:8081/
Resolving localhost (localhost)... ::1, 127.0.0.1
Connecting to localhost (localhost)|::1|:8081... failed: Connection refused.
Connecting to localhost (localhost)|127.0.0.1|:8081... failed: Connection refused.


It seems something was bound to localhost:8081 but the connection is not working for some reason.
I probably am skipping some important detail.
These are some of my dependencies:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-gelly_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-gelly-examples_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
</dependency>


Have you managed to get the web front-end in local mode?


Best regards,

Miguel E. Coimbra
Email: [hidden email]


On 16 April 2018 at 05:12, Chesnay Schepler <[hidden email]> wrote:
The thing with createLocalEnvironmentWithWebUI is that it requires flink-runtime-web to be on the classpath, which is rarely the class when running things in the IDE.
It should work fine in the IDE if you add it as a dependency to your project. This should've been logged as a warning.

Chaining is unrelated to this issue as join operators are never chained to one another.
Lambda functions are also not the issue, if they were the job would fail much earlier.

It is reasonable that T3 is blocked if T1 is blocked. T1 gets no input hence produces no output, which now also blocks T3.

There are multiple possible explanations i can come up with:
* the preceding operators are blocked on something or really slow
* the preceding operators are actually finished, but aren't shutting down due to an implementation error
* a deadlock in Flink's join logic
* a deadlock in Flink's network stack

For the first 2 we will have to consult the UI or logs. You said you were dumping the input DataSets into files, but were they actually complete?

A deadlock in the network stack should appear as all existing operator threads being blocked.
We can probably rule out a problem with the join logic by removing the second join and trying again.



On 16.04.2018 03:10, Miguel Coimbra wrote:
Hello,

It would seem that the function which is supposed to launch local mode with the web front-end doesn't launch the front-end at all...
This function seems not to be doing what it is supposed to do, if I'm not mistaken:

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

Regarding the preceding operators, the thread dumps I got were pointing to a specific set of operations over DataSet instances that were passed into my function.
Below I show the code segment and put the lines where threads are waiting in bold:

public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final Graph<K, VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) {
    return vertices
            .joinWithHuge(originalGraph.getEdges())
            .where(0).equalTo(0)
            .with((source, edge) -> edge) // Thread 1 is blocked here
            .returns(originalGraph.getEdges().getType())
            .join(vertices)
            .where(1).equalTo(0)
            .with((e, v) -> e) // Thread 3 is blocked here
            .returns(originalGraph.getEdges().getType())
            .distinct(0, 1);
}


Note: the edges inside the graph originalGraph edge DataSet are much greater in number than the elements of the vertices DataSet, so I believe that function is being used correctly.

I will try testing with remote (cluster) mode to have access to the web front-end, but I have some questions for now:

- The fact that they are blocked in different ​JoinOperator instances that are chained, is this a result of Flink's default pipeline mechanism?
- Could there be a problem stemming from the fact they are both waiting on lambdas?
- I have tried dumping both DataSet variables originalGraph and vertices into files (the ones being used in this code), and they produced correct values (non-empty files), so I don't have a clue what the threads inside Flink's runtime are waiting on.

​Thanks for the help so far Chesnay.​


Miguel E. Coimbra
Email: [hidden email]


---------- Forwarded message ----------
From: Chesnay Schepler <[hidden email]>
To: [hidden email]
Cc: 
Bcc: 
Date: Sun, 15 Apr 2018 18:54:33 +0200
Subject: Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING
Hello,

Thread #1-3 are waiting for input, Thread #4 is waiting for the job to finish.

To further debug this I would look into what the preceding operators are doing, whether they are blocked on something or are emitting records (which you can check in the UI/metrics).

On 15.04.2018 18:40, Miguel Coimbra wrote:
​Hello,

I am running into a situation where the Flink threads responsible for my operator execution are all stuck on WAITING mode.
Before anything else, this is my machine's spec:

Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz GenuineIntel GNU/Linux
256 GB RAM

I am running in local mode on a machine with a considerable amount of memory, so perhaps that may be triggering some execution edge-case?

Moving on, this is my Java:

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)


Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT with LocalEnvironment on this large-memory machine, with parallelism set to one:

Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
ExecutionEnvironment env = lenv;
env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
env.setParallelism(1);

This initializes the execution environment for a series of sequential jobs (any data dependency between jobs is flushed to disk on job i and read back from disk into a DataSet in job i + 1).
To reiterate, I am not launching a Flink cluster, I am just executing in local mode from a code base compiled with Maven.

I have tested this program via mvn exec:exec with different values of memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and the result is always the same: the process' memory fills up completely and then the process' CPU usage drops to 0%.
This is strange because if it was lack of memory, I would expect an OutOfMemoryError.

I have debugged with IntelliJ IDEA and obtained thread dumps from different executions, and realized quite a few operator threads are stuck on java.lang.Thread.State: WAITING.

There are four major threads that I find to be in this waiting state.
The thread dumps I obtained show me where the wait calls originated:

Number 1:

"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)


Number 2:

"Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

Number 3:

"Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 tid=0xd75 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

Number 4:

"Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
  java.lang.Thread.State: WAITING
      at sun.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
      at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
      at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
      at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:519)
      at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:231)
      - locked <0x23eb> (a java.lang.Object)
      at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
      at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
      at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
      at my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(Misc.java:103)
      at my.package.algorithm.Sample.computeApproximateDeltaFast(Sample.java:492)
      at my.package.algorithm.Sample.run(Sample.java:291).
      at java.lang.Thread.run(Thread.java:748)


While I realize these dumps on their own may not be helpful, they at least (as far as I know) indicate that the threads are all waiting on something.
But if it was resource scarcity I believe the program would terminate with an exception.
And if it was garbage collection activity, I believe the JVM process would not be at 0% CPU usage.

Note: I realize I didn't provide the user-code code that generates the execution plan for Flink which led to the contexts in which the threads are waiting, but I hope it may not be necessary.
My problem now is that I am unsure on how to proceed to further debug this issue:
- The assigned memory is fully used, but there are no exceptions about lack of memory.
- The CPU usage is at 0% and all threads are all in a waiting state, but I don't understand what signal they're waiting for exactly.

Hoping anyone might be able to give me a hint.

Thank you very much for your time.

Best regards,

Miguel E. Coimbra





Reply | Threaded
Open this post in threaded view
|

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

James Yu
Miguel, I and my colleague ran into same problem yesterday.
We were expecting Flink to get 4 inputs from Kafka and write the inputs to Cassandra, but the operators got stuck after the 1st input is written into Cassandra.
This is how DAG looks like:
Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Cassandra Sink)
After we disable the auto chaining (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#task-chaining-and-resource-groups), all 4 inputs are read from Kafka and written into Cassandra.
We are still figuring out why the chaining causes the blocking.


This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275

2018-04-18 6:57 GMT+08:00 Miguel Coimbra <[hidden email]>:
Chesnay, following your suggestions I got access to the web interface and also took a closer look at the debugging logs.
I have noticed one problem regarding the web interface port - it keeps changing port now and then during my Java program's execution.

Not sure if that is due to my program launching several job executions sequentially, but the fact is that it happened.
Since I am accessing the web interface via tunneling, it becomes rather cumbersome to keep adapting it.

Another particular problem I'm noticing is that this exception frequently pops up (debugging with log4j):

00:17:54,368 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing slot with slot request id 9055ef473251505dac04c99727106dc9.
org.apache.flink.util.FlinkException: Slot is being returned to the SlotPool.
        at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.returnAllocatedSlot(SlotPool.java:1521)
        at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$releaseSlot$0(SingleLogicalSlot.java:130)
        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
        at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834)
        at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155)
        at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
        at org.apache.flink.runtime.executiongraph.Execution.releaseAssignedResource(Execution.java:1239)
        at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:946)
        at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1588)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:593)
        at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
        at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Don't know if the internals of Flink are explicitly using an exception for control flow, but there are several occurrences of this as time goes by.

Regarding my program itself, I've achieved some progress.
In my program I need to do a sequence of series of Flink jobs, and need extra care to make sure no DataSet instance from job i is being used in an operator in job i + 1.
I believe this was generating the waiting scenarios I describe in an earlier email.
The bottom line is to be extra careful about when job executions are actually triggered and to make sure that a DataSet which will need to be used in different Flink jobs is available for example as a file in secondary storage (possibly masked as a memory-mapping) and is exclusively read from that source.
This means ensuring the job that originally produces a DataSet (for reuse on a later job) assigns to it a DataSink for secondary storage.

I'm going to keep digging taking this in account - if will report back if I manage to fix everything or find a new problem.

Thanks again,



Miguel E. Coimbra
Email: [hidden email]


On 16 April 2018 at 10:26, Chesnay Schepler <[hidden email]> wrote:
ah yes, currently when you use that method the UI is started on a random port. I'm currently fixing that in this PR that will be merged today. For now you will enable logging and search for something along the lines of "http://<host>:<port> was granted leadership"

Sorry for the inconvenience.

On 16.04.2018 15:04, Miguel Coimbra wrote:
Thanks for the suggestions Chesnay, I will try them out.

However, I have already tried your suggestion with the dependency flink-runtime-web and nothing happened.
If I understood you correctly, adding that dependency in the pom.xml would make it so the web front-end is running when I call the following line?

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

I added flink-runtime-web  in my pom.xml, recompiled and launched the program but I simply got "Unable to connect" in my browser (Firefox) on localhost:8081.
Performing wget on localhost:8081 resulted in this:

$ wget localhost:8081
--2018-04-16 12:47:26--  http://localhost:8081/
Resolving localhost (localhost)... ::1, 127.0.0.1
Connecting to localhost (localhost)|::1|:8081... failed: Connection refused.
Connecting to localhost (localhost)|127.0.0.1|:8081... failed: Connection refused.


It seems something was bound to localhost:8081 but the connection is not working for some reason.
I probably am skipping some important detail.
These are some of my dependencies:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-gelly_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-gelly-examples_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
</dependency>


Have you managed to get the web front-end in local mode?


Best regards,

Miguel E. Coimbra
Email: [hidden email]


On 16 April 2018 at 05:12, Chesnay Schepler <[hidden email]> wrote:
The thing with createLocalEnvironmentWithWebUI is that it requires flink-runtime-web to be on the classpath, which is rarely the class when running things in the IDE.
It should work fine in the IDE if you add it as a dependency to your project. This should've been logged as a warning.

Chaining is unrelated to this issue as join operators are never chained to one another.
Lambda functions are also not the issue, if they were the job would fail much earlier.

It is reasonable that T3 is blocked if T1 is blocked. T1 gets no input hence produces no output, which now also blocks T3.

There are multiple possible explanations i can come up with:
* the preceding operators are blocked on something or really slow
* the preceding operators are actually finished, but aren't shutting down due to an implementation error
* a deadlock in Flink's join logic
* a deadlock in Flink's network stack

For the first 2 we will have to consult the UI or logs. You said you were dumping the input DataSets into files, but were they actually complete?

A deadlock in the network stack should appear as all existing operator threads being blocked.
We can probably rule out a problem with the join logic by removing the second join and trying again.



On 16.04.2018 03:10, Miguel Coimbra wrote:
Hello,

It would seem that the function which is supposed to launch local mode with the web front-end doesn't launch the front-end at all...
This function seems not to be doing what it is supposed to do, if I'm not mistaken:

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

Regarding the preceding operators, the thread dumps I got were pointing to a specific set of operations over DataSet instances that were passed into my function.
Below I show the code segment and put the lines where threads are waiting in bold:

public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final Graph<K, VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) {
    return vertices
            .joinWithHuge(originalGraph.getEdges())
            .where(0).equalTo(0)
            .with((source, edge) -> edge) // Thread 1 is blocked here
            .returns(originalGraph.getEdges().getType())
            .join(vertices)
            .where(1).equalTo(0)
            .with((e, v) -> e) // Thread 3 is blocked here
            .returns(originalGraph.getEdges().getType())
            .distinct(0, 1);
}


Note: the edges inside the graph originalGraph edge DataSet are much greater in number than the elements of the vertices DataSet, so I believe that function is being used correctly.

I will try testing with remote (cluster) mode to have access to the web front-end, but I have some questions for now:

- The fact that they are blocked in different ​JoinOperator instances that are chained, is this a result of Flink's default pipeline mechanism?
- Could there be a problem stemming from the fact they are both waiting on lambdas?
- I have tried dumping both DataSet variables originalGraph and vertices into files (the ones being used in this code), and they produced correct values (non-empty files), so I don't have a clue what the threads inside Flink's runtime are waiting on.

​Thanks for the help so far Chesnay.​


Miguel E. Coimbra
Email: [hidden email]


---------- Forwarded message ----------
From: Chesnay Schepler <[hidden email]>
To: [hidden email]
Cc: 
Bcc: 
Date: Sun, 15 Apr 2018 18:54:33 +0200
Subject: Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING
Hello,

Thread #1-3 are waiting for input, Thread #4 is waiting for the job to finish.

To further debug this I would look into what the preceding operators are doing, whether they are blocked on something or are emitting records (which you can check in the UI/metrics).

On 15.04.2018 18:40, Miguel Coimbra wrote:
​Hello,

I am running into a situation where the Flink threads responsible for my operator execution are all stuck on WAITING mode.
Before anything else, this is my machine's spec:

Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz GenuineIntel GNU/Linux
256 GB RAM

I am running in local mode on a machine with a considerable amount of memory, so perhaps that may be triggering some execution edge-case?

Moving on, this is my Java:

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)


Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT with LocalEnvironment on this large-memory machine, with parallelism set to one:

Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
ExecutionEnvironment env = lenv;
env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
env.setParallelism(1);

This initializes the execution environment for a series of sequential jobs (any data dependency between jobs is flushed to disk on job i and read back from disk into a DataSet in job i + 1).
To reiterate, I am not launching a Flink cluster, I am just executing in local mode from a code base compiled with Maven.

I have tested this program via mvn exec:exec with different values of memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and the result is always the same: the process' memory fills up completely and then the process' CPU usage drops to 0%.
This is strange because if it was lack of memory, I would expect an OutOfMemoryError.

I have debugged with IntelliJ IDEA and obtained thread dumps from different executions, and realized quite a few operator threads are stuck on java.lang.Thread.State: WAITING.

There are four major threads that I find to be in this waiting state.
The thread dumps I obtained show me where the wait calls originated:

Number 1:

"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)


Number 2:

"Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

Number 3:

"Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 tid=0xd75 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

Number 4:

"Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
  java.lang.Thread.State: WAITING
      at sun.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
      at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
      at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
      at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:519)
      at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:231)
      - locked <0x23eb> (a java.lang.Object)
      at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
      at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
      at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
      at my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(Misc.java:103)
      at my.package.algorithm.Sample.computeApproximateDeltaFast(Sample.java:492)
      at my.package.algorithm.Sample.run(Sample.java:291).
      at java.lang.Thread.run(Thread.java:748)


While I realize these dumps on their own may not be helpful, they at least (as far as I know) indicate that the threads are all waiting on something.
But if it was resource scarcity I believe the program would terminate with an exception.
And if it was garbage collection activity, I believe the JVM process would not be at 0% CPU usage.

Note: I realize I didn't provide the user-code code that generates the execution plan for Flink which led to the contexts in which the threads are waiting, but I hope it may not be necessary.
My problem now is that I am unsure on how to proceed to further debug this issue:
- The assigned memory is fully used, but there are no exceptions about lack of memory.
- The CPU usage is at 0% and all threads are all in a waiting state, but I don't understand what signal they're waiting for exactly.

Hoping anyone might be able to give me a hint.

Thank you very much for your time.

Best regards,

Miguel E. Coimbra






Reply | Threaded
Open this post in threaded view
|

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

Miguel Coimbra
Hello James,

Thanks for the information.
I noticed something suspicious as well: I have chains of operators where the first operator will ingest the expected amount of records but will not emit any, leaving the following operator empty in a "RUNNING" state.
For example:



I will get back if I find out more.


Best regards,

Miguel E. Coimbra
Email: [hidden email]


On 17 April 2018 at 20:59, James Yu <[hidden email]> wrote:
Miguel, I and my colleague ran into same problem yesterday.
We were expecting Flink to get 4 inputs from Kafka and write the inputs to Cassandra, but the operators got stuck after the 1st input is written into Cassandra.
This is how DAG looks like:
Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Cassandra Sink)
After we disable the auto chaining (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#task-chaining-and-resource-groups), all 4 inputs are read from Kafka and written into Cassandra.
We are still figuring out why the chaining causes the blocking.


This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275

2018-04-18 6:57 GMT+08:00 Miguel Coimbra <[hidden email]>:
Chesnay, following your suggestions I got access to the web interface and also took a closer look at the debugging logs.
I have noticed one problem regarding the web interface port - it keeps changing port now and then during my Java program's execution.

Not sure if that is due to my program launching several job executions sequentially, but the fact is that it happened.
Since I am accessing the web interface via tunneling, it becomes rather cumbersome to keep adapting it.

Another particular problem I'm noticing is that this exception frequently pops up (debugging with log4j):

00:17:54,368 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing slot with slot request id 9055ef473251505dac04c99727106dc9.
org.apache.flink.util.FlinkException: Slot is being returned to the SlotPool.
        at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.returnAllocatedSlot(SlotPool.java:1521)
        at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$releaseSlot$0(SingleLogicalSlot.java:130)
        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
        at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834)
        at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155)
        at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
        at org.apache.flink.runtime.executiongraph.Execution.releaseAssignedResource(Execution.java:1239)
        at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:946)
        at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1588)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:593)
        at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
        at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Don't know if the internals of Flink are explicitly using an exception for control flow, but there are several occurrences of this as time goes by.

Regarding my program itself, I've achieved some progress.
In my program I need to do a sequence of series of Flink jobs, and need extra care to make sure no DataSet instance from job i is being used in an operator in job i + 1.
I believe this was generating the waiting scenarios I describe in an earlier email.
The bottom line is to be extra careful about when job executions are actually triggered and to make sure that a DataSet which will need to be used in different Flink jobs is available for example as a file in secondary storage (possibly masked as a memory-mapping) and is exclusively read from that source.
This means ensuring the job that originally produces a DataSet (for reuse on a later job) assigns to it a DataSink for secondary storage.

I'm going to keep digging taking this in account - if will report back if I manage to fix everything or find a new problem.

Thanks again,



Miguel E. Coimbra
Email: [hidden email]


On 16 April 2018 at 10:26, Chesnay Schepler <[hidden email]> wrote:
ah yes, currently when you use that method the UI is started on a random port. I'm currently fixing that in this PR that will be merged today. For now you will enable logging and search for something along the lines of "http://<host>:<port> was granted leadership"

Sorry for the inconvenience.

On 16.04.2018 15:04, Miguel Coimbra wrote:
Thanks for the suggestions Chesnay, I will try them out.

However, I have already tried your suggestion with the dependency flink-runtime-web and nothing happened.
If I understood you correctly, adding that dependency in the pom.xml would make it so the web front-end is running when I call the following line?

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

I added flink-runtime-web  in my pom.xml, recompiled and launched the program but I simply got "Unable to connect" in my browser (Firefox) on localhost:8081.
Performing wget on localhost:8081 resulted in this:

$ wget localhost:8081
--2018-04-16 12:47:26--  http://localhost:8081/
Resolving localhost (localhost)... ::1, 127.0.0.1
Connecting to localhost (localhost)|::1|:8081... failed: Connection refused.
Connecting to localhost (localhost)|127.0.0.1|:8081... failed: Connection refused.


It seems something was bound to localhost:8081 but the connection is not working for some reason.
I probably am skipping some important detail.
These are some of my dependencies:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-gelly_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-gelly-examples_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
</dependency>


Have you managed to get the web front-end in local mode?


Best regards,

Miguel E. Coimbra
Email: [hidden email]


On 16 April 2018 at 05:12, Chesnay Schepler <[hidden email]> wrote:
The thing with createLocalEnvironmentWithWebUI is that it requires flink-runtime-web to be on the classpath, which is rarely the class when running things in the IDE.
It should work fine in the IDE if you add it as a dependency to your project. This should've been logged as a warning.

Chaining is unrelated to this issue as join operators are never chained to one another.
Lambda functions are also not the issue, if they were the job would fail much earlier.

It is reasonable that T3 is blocked if T1 is blocked. T1 gets no input hence produces no output, which now also blocks T3.

There are multiple possible explanations i can come up with:
* the preceding operators are blocked on something or really slow
* the preceding operators are actually finished, but aren't shutting down due to an implementation error
* a deadlock in Flink's join logic
* a deadlock in Flink's network stack

For the first 2 we will have to consult the UI or logs. You said you were dumping the input DataSets into files, but were they actually complete?

A deadlock in the network stack should appear as all existing operator threads being blocked.
We can probably rule out a problem with the join logic by removing the second join and trying again.



On 16.04.2018 03:10, Miguel Coimbra wrote:
Hello,

It would seem that the function which is supposed to launch local mode with the web front-end doesn't launch the front-end at all...
This function seems not to be doing what it is supposed to do, if I'm not mistaken:

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

Regarding the preceding operators, the thread dumps I got were pointing to a specific set of operations over DataSet instances that were passed into my function.
Below I show the code segment and put the lines where threads are waiting in bold:

public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final Graph<K, VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) {
    return vertices
            .joinWithHuge(originalGraph.getEdges())
            .where(0).equalTo(0)
            .with((source, edge) -> edge) // Thread 1 is blocked here
            .returns(originalGraph.getEdges().getType())
            .join(vertices)
            .where(1).equalTo(0)
            .with((e, v) -> e) // Thread 3 is blocked here
            .returns(originalGraph.getEdges().getType())
            .distinct(0, 1);
}


Note: the edges inside the graph originalGraph edge DataSet are much greater in number than the elements of the vertices DataSet, so I believe that function is being used correctly.

I will try testing with remote (cluster) mode to have access to the web front-end, but I have some questions for now:

- The fact that they are blocked in different ​JoinOperator instances that are chained, is this a result of Flink's default pipeline mechanism?
- Could there be a problem stemming from the fact they are both waiting on lambdas?
- I have tried dumping both DataSet variables originalGraph and vertices into files (the ones being used in this code), and they produced correct values (non-empty files), so I don't have a clue what the threads inside Flink's runtime are waiting on.

​Thanks for the help so far Chesnay.​


Miguel E. Coimbra
Email: [hidden email]


---------- Forwarded message ----------
From: Chesnay Schepler <[hidden email]>
To: [hidden email]
Cc: 
Bcc: 
Date: Sun, 15 Apr 2018 18:54:33 +0200
Subject: Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING
Hello,

Thread #1-3 are waiting for input, Thread #4 is waiting for the job to finish.

To further debug this I would look into what the preceding operators are doing, whether they are blocked on something or are emitting records (which you can check in the UI/metrics).

On 15.04.2018 18:40, Miguel Coimbra wrote:
​Hello,

I am running into a situation where the Flink threads responsible for my operator execution are all stuck on WAITING mode.
Before anything else, this is my machine's spec:

Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz GenuineIntel GNU/Linux
256 GB RAM

I am running in local mode on a machine with a considerable amount of memory, so perhaps that may be triggering some execution edge-case?

Moving on, this is my Java:

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)


Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT with LocalEnvironment on this large-memory machine, with parallelism set to one:

Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
ExecutionEnvironment env = lenv;
env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
env.setParallelism(1);

This initializes the execution environment for a series of sequential jobs (any data dependency between jobs is flushed to disk on job i and read back from disk into a DataSet in job i + 1).
To reiterate, I am not launching a Flink cluster, I am just executing in local mode from a code base compiled with Maven.

I have tested this program via mvn exec:exec with different values of memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and the result is always the same: the process' memory fills up completely and then the process' CPU usage drops to 0%.
This is strange because if it was lack of memory, I would expect an OutOfMemoryError.

I have debugged with IntelliJ IDEA and obtained thread dumps from different executions, and realized quite a few operator threads are stuck on java.lang.Thread.State: WAITING.

There are four major threads that I find to be in this waiting state.
The thread dumps I obtained show me where the wait calls originated:

Number 1:

"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)


Number 2:

"Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

Number 3:

"Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 tid=0xd75 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

Number 4:

"Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
  java.lang.Thread.State: WAITING
      at sun.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
      at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
      at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
      at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:519)
      at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:231)
      - locked <0x23eb> (a java.lang.Object)
      at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
      at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
      at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
      at my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(Misc.java:103)
      at my.package.algorithm.Sample.computeApproximateDeltaFast(Sample.java:492)
      at my.package.algorithm.Sample.run(Sample.java:291).
      at java.lang.Thread.run(Thread.java:748)


While I realize these dumps on their own may not be helpful, they at least (as far as I know) indicate that the threads are all waiting on something.
But if it was resource scarcity I believe the program would terminate with an exception.
And if it was garbage collection activity, I believe the JVM process would not be at 0% CPU usage.

Note: I realize I didn't provide the user-code code that generates the execution plan for Flink which led to the contexts in which the threads are waiting, but I hope it may not be necessary.
My problem now is that I am unsure on how to proceed to further debug this issue:
- The assigned memory is fully used, but there are no exceptions about lack of memory.
- The CPU usage is at 0% and all threads are all in a waiting state, but I don't understand what signal they're waiting for exactly.

Hoping anyone might be able to give me a hint.

Thank you very much for your time.

Best regards,

Miguel E. Coimbra







Reply | Threaded
Open this post in threaded view
|

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

Miguel Coimbra
In reply to this post by Miguel Coimbra
Hello,

Just to provide a brief update: I got this working by moving to the stable version 1.4.2.
I previously tested under 1.5-SNAPSHOT and 1.6-SNAPSHOT and the problem occurred in both.

​If I'm not mistaken, LocalEnvironment is primarily ​targeted at debugging scenarios.
In my case, I explicitly want to use it on a complex series of jobs for now.
However, it seems some sort of bug was introduced after 1.4.2?

I ask this because my same code leads to the operators stuck on
java.lang.Thread.State: WAITING
in the snapshot versions but it works fine in 1.4.2.
Was there any specific design change after 1.4.2 regarding the way the Flink cluster is simulated (LocalFlinkMiniCluster if I'm not mistaken?) when using LocalEnvironment?

I would like to explore this issue and perhaps contribute to fixing it or at least understand.

Thank you very much.​


Miguel E. Coimbra
Email: [hidden email]


On 17 April 2018 at 22:52, Miguel Coimbra <[hidden email]> wrote:
Hello James,

Thanks for the information.
I noticed something suspicious as well: I have chains of operators where the first operator will ingest the expected amount of records but will not emit any, leaving the following operator empty in a "RUNNING" state.
For example:



I will get back if I find out more.


Best regards,

Miguel E. Coimbra
Email: [hidden email]


On 17 April 2018 at 20:59, James Yu <[hidden email]> wrote:
Miguel, I and my colleague ran into same problem yesterday.
We were expecting Flink to get 4 inputs from Kafka and write the inputs to Cassandra, but the operators got stuck after the 1st input is written into Cassandra.
This is how DAG looks like:
Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Cassandra Sink)
After we disable the auto chaining (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#task-chaining-and-resource-groups), all 4 inputs are read from Kafka and written into Cassandra.
We are still figuring out why the chaining causes the blocking.


This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275

2018-04-18 6:57 GMT+08:00 Miguel Coimbra <[hidden email]>:
Chesnay, following your suggestions I got access to the web interface and also took a closer look at the debugging logs.
I have noticed one problem regarding the web interface port - it keeps changing port now and then during my Java program's execution.

Not sure if that is due to my program launching several job executions sequentially, but the fact is that it happened.
Since I am accessing the web interface via tunneling, it becomes rather cumbersome to keep adapting it.

Another particular problem I'm noticing is that this exception frequently pops up (debugging with log4j):

00:17:54,368 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing slot with slot request id 9055ef473251505dac04c99727106dc9.
org.apache.flink.util.FlinkException: Slot is being returned to the SlotPool.
        at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.returnAllocatedSlot(SlotPool.java:1521)
        at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$releaseSlot$0(SingleLogicalSlot.java:130)
        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
        at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834)
        at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155)
        at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
        at org.apache.flink.runtime.executiongraph.Execution.releaseAssignedResource(Execution.java:1239)
        at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:946)
        at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1588)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:593)
        at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
        at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Don't know if the internals of Flink are explicitly using an exception for control flow, but there are several occurrences of this as time goes by.

Regarding my program itself, I've achieved some progress.
In my program I need to do a sequence of series of Flink jobs, and need extra care to make sure no DataSet instance from job i is being used in an operator in job i + 1.
I believe this was generating the waiting scenarios I describe in an earlier email.
The bottom line is to be extra careful about when job executions are actually triggered and to make sure that a DataSet which will need to be used in different Flink jobs is available for example as a file in secondary storage (possibly masked as a memory-mapping) and is exclusively read from that source.
This means ensuring the job that originally produces a DataSet (for reuse on a later job) assigns to it a DataSink for secondary storage.

I'm going to keep digging taking this in account - if will report back if I manage to fix everything or find a new problem.

Thanks again,



Miguel E. Coimbra
Email: [hidden email]


On 16 April 2018 at 10:26, Chesnay Schepler <[hidden email]> wrote:
ah yes, currently when you use that method the UI is started on a random port. I'm currently fixing that in this PR that will be merged today. For now you will enable logging and search for something along the lines of "http://<host>:<port> was granted leadership"

Sorry for the inconvenience.

On 16.04.2018 15:04, Miguel Coimbra wrote:
Thanks for the suggestions Chesnay, I will try them out.

However, I have already tried your suggestion with the dependency flink-runtime-web and nothing happened.
If I understood you correctly, adding that dependency in the pom.xml would make it so the web front-end is running when I call the following line?

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

I added flink-runtime-web  in my pom.xml, recompiled and launched the program but I simply got "Unable to connect" in my browser (Firefox) on localhost:8081.
Performing wget on localhost:8081 resulted in this:

$ wget localhost:8081
--2018-04-16 12:47:26--  http://localhost:8081/
Resolving localhost (localhost)... ::1, 127.0.0.1
Connecting to localhost (localhost)|::1|:8081... failed: Connection refused.
Connecting to localhost (localhost)|127.0.0.1|:8081... failed: Connection refused.


It seems something was bound to localhost:8081 but the connection is not working for some reason.
I probably am skipping some important detail.
These are some of my dependencies:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-gelly_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-gelly-examples_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
</dependency>


Have you managed to get the web front-end in local mode?


Best regards,

Miguel E. Coimbra
Email: [hidden email]


On 16 April 2018 at 05:12, Chesnay Schepler <[hidden email]> wrote:
The thing with createLocalEnvironmentWithWebUI is that it requires flink-runtime-web to be on the classpath, which is rarely the class when running things in the IDE.
It should work fine in the IDE if you add it as a dependency to your project. This should've been logged as a warning.

Chaining is unrelated to this issue as join operators are never chained to one another.
Lambda functions are also not the issue, if they were the job would fail much earlier.

It is reasonable that T3 is blocked if T1 is blocked. T1 gets no input hence produces no output, which now also blocks T3.

There are multiple possible explanations i can come up with:
* the preceding operators are blocked on something or really slow
* the preceding operators are actually finished, but aren't shutting down due to an implementation error
* a deadlock in Flink's join logic
* a deadlock in Flink's network stack

For the first 2 we will have to consult the UI or logs. You said you were dumping the input DataSets into files, but were they actually complete?

A deadlock in the network stack should appear as all existing operator threads being blocked.
We can probably rule out a problem with the join logic by removing the second join and trying again.



On 16.04.2018 03:10, Miguel Coimbra wrote:
Hello,

It would seem that the function which is supposed to launch local mode with the web front-end doesn't launch the front-end at all...
This function seems not to be doing what it is supposed to do, if I'm not mistaken:

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

Regarding the preceding operators, the thread dumps I got were pointing to a specific set of operations over DataSet instances that were passed into my function.
Below I show the code segment and put the lines where threads are waiting in bold:

public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final Graph<K, VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) {
    return vertices
            .joinWithHuge(originalGraph.getEdges())
            .where(0).equalTo(0)
            .with((source, edge) -> edge) // Thread 1 is blocked here
            .returns(originalGraph.getEdges().getType())
            .join(vertices)
            .where(1).equalTo(0)
            .with((e, v) -> e) // Thread 3 is blocked here
            .returns(originalGraph.getEdges().getType())
            .distinct(0, 1);
}


Note: the edges inside the graph originalGraph edge DataSet are much greater in number than the elements of the vertices DataSet, so I believe that function is being used correctly.

I will try testing with remote (cluster) mode to have access to the web front-end, but I have some questions for now:

- The fact that they are blocked in different ​JoinOperator instances that are chained, is this a result of Flink's default pipeline mechanism?
- Could there be a problem stemming from the fact they are both waiting on lambdas?
- I have tried dumping both DataSet variables originalGraph and vertices into files (the ones being used in this code), and they produced correct values (non-empty files), so I don't have a clue what the threads inside Flink's runtime are waiting on.

​Thanks for the help so far Chesnay.​


Miguel E. Coimbra
Email: [hidden email]


---------- Forwarded message ----------
From: Chesnay Schepler <[hidden email]>
To: [hidden email]
Cc: 
Bcc: 
Date: Sun, 15 Apr 2018 18:54:33 +0200
Subject: Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING
Hello,

Thread #1-3 are waiting for input, Thread #4 is waiting for the job to finish.

To further debug this I would look into what the preceding operators are doing, whether they are blocked on something or are emitting records (which you can check in the UI/metrics).

On 15.04.2018 18:40, Miguel Coimbra wrote:
​Hello,

I am running into a situation where the Flink threads responsible for my operator execution are all stuck on WAITING mode.
Before anything else, this is my machine's spec:

Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz GenuineIntel GNU/Linux
256 GB RAM

I am running in local mode on a machine with a considerable amount of memory, so perhaps that may be triggering some execution edge-case?

Moving on, this is my Java:

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)


Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT with LocalEnvironment on this large-memory machine, with parallelism set to one:

Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
ExecutionEnvironment env = lenv;
env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
env.setParallelism(1);

This initializes the execution environment for a series of sequential jobs (any data dependency between jobs is flushed to disk on job i and read back from disk into a DataSet in job i + 1).
To reiterate, I am not launching a Flink cluster, I am just executing in local mode from a code base compiled with Maven.

I have tested this program via mvn exec:exec with different values of memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and the result is always the same: the process' memory fills up completely and then the process' CPU usage drops to 0%.
This is strange because if it was lack of memory, I would expect an OutOfMemoryError.

I have debugged with IntelliJ IDEA and obtained thread dumps from different executions, and realized quite a few operator threads are stuck on
​​
java.lang.Thread.State: WAITING
.

There are four major threads that I find to be in this waiting state.
The thread dumps I obtained show me where the wait calls originated:

Number 1:

"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)


Number 2:

"Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

Number 3:

"Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 tid=0xd75 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

Number 4:

"Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
  java.lang.Thread.State: WAITING
      at sun.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
      at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
      at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
      at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:519)
      at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:231)
      - locked <0x23eb> (a java.lang.Object)
      at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
      at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
      at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
      at my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(Misc.java:103)
      at my.package.algorithm.Sample.computeApproximateDeltaFast(Sample.java:492)
      at my.package.algorithm.Sample.run(Sample.java:291).
      at java.lang.Thread.run(Thread.java:748)


While I realize these dumps on their own may not be helpful, they at least (as far as I know) indicate that the threads are all waiting on something.
But if it was resource scarcity I believe the program would terminate with an exception.
And if it was garbage collection activity, I believe the JVM process would not be at 0% CPU usage.

Note: I realize I didn't provide the user-code code that generates the execution plan for Flink which led to the contexts in which the threads are waiting, but I hope it may not be necessary.
My problem now is that I am unsure on how to proceed to further debug this issue:
- The assigned memory is fully used, but there are no exceptions about lack of memory.
- The CPU usage is at 0% and all threads are all in a waiting state, but I don't understand what signal they're waiting for exactly.

Hoping anyone might be able to give me a hint.

Thank you very much for your time.

Best regards,

Miguel E. Coimbra








Reply | Threaded
Open this post in threaded view
|

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

Fabian Hueske-2
Hi Miguel,

Actually, a lot has changed since 1.4.
Flink 1.5 will feature a completely (cluster) setup and deployment model. The dev effort is known as FLIP-6 [1].
So it is not unlikely that you discovered a regression.

Would you mind opening a JIRA ticker for the issue?

Thank you very much,
Fabian

Best,
Fabian

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077


2018-04-21 20:08 GMT+02:00 Miguel Coimbra <[hidden email]>:
Hello,

Just to provide a brief update: I got this working by moving to the stable version 1.4.2.
I previously tested under 1.5-SNAPSHOT and 1.6-SNAPSHOT and the problem occurred in both.

​If I'm not mistaken, LocalEnvironment is primarily ​targeted at debugging scenarios.
In my case, I explicitly want to use it on a complex series of jobs for now.
However, it seems some sort of bug was introduced after 1.4.2?

I ask this because my same code leads to the operators stuck on
java.lang.Thread.State: WAITING
in the snapshot versions but it works fine in 1.4.2.
Was there any specific design change after 1.4.2 regarding the way the Flink cluster is simulated (LocalFlinkMiniCluster if I'm not mistaken?) when using LocalEnvironment?

I would like to explore this issue and perhaps contribute to fixing it or at least understand.

Thank you very much.​


Miguel E. Coimbra
Email: [hidden email]


On 17 April 2018 at 22:52, Miguel Coimbra <[hidden email]> wrote:
Hello James,

Thanks for the information.
I noticed something suspicious as well: I have chains of operators where the first operator will ingest the expected amount of records but will not emit any, leaving the following operator empty in a "RUNNING" state.
For example:



I will get back if I find out more.


Best regards,

Miguel E. Coimbra
Email: [hidden email]


On 17 April 2018 at 20:59, James Yu <[hidden email]> wrote:
Miguel, I and my colleague ran into same problem yesterday.
We were expecting Flink to get 4 inputs from Kafka and write the inputs to Cassandra, but the operators got stuck after the 1st input is written into Cassandra.
This is how DAG looks like:
Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Cassandra Sink)
After we disable the auto chaining (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#task-chaining-and-resource-groups), all 4 inputs are read from Kafka and written into Cassandra.
We are still figuring out why the chaining causes the blocking.


This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275

2018-04-18 6:57 GMT+08:00 Miguel Coimbra <[hidden email]>:
Chesnay, following your suggestions I got access to the web interface and also took a closer look at the debugging logs.
I have noticed one problem regarding the web interface port - it keeps changing port now and then during my Java program's execution.

Not sure if that is due to my program launching several job executions sequentially, but the fact is that it happened.
Since I am accessing the web interface via tunneling, it becomes rather cumbersome to keep adapting it.

Another particular problem I'm noticing is that this exception frequently pops up (debugging with log4j):

00:17:54,368 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing slot with slot request id 9055ef473251505dac04c99727106dc9.
org.apache.flink.util.FlinkException: Slot is being returned to the SlotPool.
        at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.returnAllocatedSlot(SlotPool.java:1521)
        at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$releaseSlot$0(SingleLogicalSlot.java:130)
        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
        at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834)
        at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155)
        at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
        at org.apache.flink.runtime.executiongraph.Execution.releaseAssignedResource(Execution.java:1239)
        at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:946)
        at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1588)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:593)
        at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
        at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Don't know if the internals of Flink are explicitly using an exception for control flow, but there are several occurrences of this as time goes by.

Regarding my program itself, I've achieved some progress.
In my program I need to do a sequence of series of Flink jobs, and need extra care to make sure no DataSet instance from job i is being used in an operator in job i + 1.
I believe this was generating the waiting scenarios I describe in an earlier email.
The bottom line is to be extra careful about when job executions are actually triggered and to make sure that a DataSet which will need to be used in different Flink jobs is available for example as a file in secondary storage (possibly masked as a memory-mapping) and is exclusively read from that source.
This means ensuring the job that originally produces a DataSet (for reuse on a later job) assigns to it a DataSink for secondary storage.

I'm going to keep digging taking this in account - if will report back if I manage to fix everything or find a new problem.

Thanks again,



Miguel E. Coimbra
Email: [hidden email]


On 16 April 2018 at 10:26, Chesnay Schepler <[hidden email]> wrote:
ah yes, currently when you use that method the UI is started on a random port. I'm currently fixing that in this PR that will be merged today. For now you will enable logging and search for something along the lines of "http://<host>:<port> was granted leadership"

Sorry for the inconvenience.

On 16.04.2018 15:04, Miguel Coimbra wrote:
Thanks for the suggestions Chesnay, I will try them out.

However, I have already tried your suggestion with the dependency flink-runtime-web and nothing happened.
If I understood you correctly, adding that dependency in the pom.xml would make it so the web front-end is running when I call the following line?

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

I added flink-runtime-web  in my pom.xml, recompiled and launched the program but I simply got "Unable to connect" in my browser (Firefox) on localhost:8081.
Performing wget on localhost:8081 resulted in this:

$ wget localhost:8081
--2018-04-16 12:47:26--  http://localhost:8081/
Resolving localhost (localhost)... ::1, 127.0.0.1
Connecting to localhost (localhost)|::1|:8081... failed: Connection refused.
Connecting to localhost (localhost)|127.0.0.1|:8081... failed: Connection refused.


It seems something was bound to localhost:8081 but the connection is not working for some reason.
I probably am skipping some important detail.
These are some of my dependencies:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-gelly_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-gelly-examples_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
</dependency>


Have you managed to get the web front-end in local mode?


Best regards,

Miguel E. Coimbra
Email: [hidden email]


On 16 April 2018 at 05:12, Chesnay Schepler <[hidden email]> wrote:
The thing with createLocalEnvironmentWithWebUI is that it requires flink-runtime-web to be on the classpath, which is rarely the class when running things in the IDE.
It should work fine in the IDE if you add it as a dependency to your project. This should've been logged as a warning.

Chaining is unrelated to this issue as join operators are never chained to one another.
Lambda functions are also not the issue, if they were the job would fail much earlier.

It is reasonable that T3 is blocked if T1 is blocked. T1 gets no input hence produces no output, which now also blocks T3.

There are multiple possible explanations i can come up with:
* the preceding operators are blocked on something or really slow
* the preceding operators are actually finished, but aren't shutting down due to an implementation error
* a deadlock in Flink's join logic
* a deadlock in Flink's network stack

For the first 2 we will have to consult the UI or logs. You said you were dumping the input DataSets into files, but were they actually complete?

A deadlock in the network stack should appear as all existing operator threads being blocked.
We can probably rule out a problem with the join logic by removing the second join and trying again.



On 16.04.2018 03:10, Miguel Coimbra wrote:
Hello,

It would seem that the function which is supposed to launch local mode with the web front-end doesn't launch the front-end at all...
This function seems not to be doing what it is supposed to do, if I'm not mistaken:

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

Regarding the preceding operators, the thread dumps I got were pointing to a specific set of operations over DataSet instances that were passed into my function.
Below I show the code segment and put the lines where threads are waiting in bold:

public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final Graph<K, VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) {
    return vertices
            .joinWithHuge(originalGraph.getEdges())
            .where(0).equalTo(0)
            .with((source, edge) -> edge) // Thread 1 is blocked here
            .returns(originalGraph.getEdges().getType())
            .join(vertices)
            .where(1).equalTo(0)
            .with((e, v) -> e) // Thread 3 is blocked here
            .returns(originalGraph.getEdges().getType())
            .distinct(0, 1);
}


Note: the edges inside the graph originalGraph edge DataSet are much greater in number than the elements of the vertices DataSet, so I believe that function is being used correctly.

I will try testing with remote (cluster) mode to have access to the web front-end, but I have some questions for now:

- The fact that they are blocked in different ​JoinOperator instances that are chained, is this a result of Flink's default pipeline mechanism?
- Could there be a problem stemming from the fact they are both waiting on lambdas?
- I have tried dumping both DataSet variables originalGraph and vertices into files (the ones being used in this code), and they produced correct values (non-empty files), so I don't have a clue what the threads inside Flink's runtime are waiting on.

​Thanks for the help so far Chesnay.​


Miguel E. Coimbra
Email: [hidden email]


---------- Forwarded message ----------
From: Chesnay Schepler <[hidden email]>
To: [hidden email]
Cc: 
Bcc: 
Date: Sun, 15 Apr 2018 18:54:33 +0200
Subject: Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING
Hello,

Thread #1-3 are waiting for input, Thread #4 is waiting for the job to finish.

To further debug this I would look into what the preceding operators are doing, whether they are blocked on something or are emitting records (which you can check in the UI/metrics).

On 15.04.2018 18:40, Miguel Coimbra wrote:
​Hello,

I am running into a situation where the Flink threads responsible for my operator execution are all stuck on WAITING mode.
Before anything else, this is my machine's spec:

Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz GenuineIntel GNU/Linux
256 GB RAM

I am running in local mode on a machine with a considerable amount of memory, so perhaps that may be triggering some execution edge-case?

Moving on, this is my Java:

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)


Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT with LocalEnvironment on this large-memory machine, with parallelism set to one:

Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
ExecutionEnvironment env = lenv;
env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
env.setParallelism(1);

This initializes the execution environment for a series of sequential jobs (any data dependency between jobs is flushed to disk on job i and read back from disk into a DataSet in job i + 1).
To reiterate, I am not launching a Flink cluster, I am just executing in local mode from a code base compiled with Maven.

I have tested this program via mvn exec:exec with different values of memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and the result is always the same: the process' memory fills up completely and then the process' CPU usage drops to 0%.
This is strange because if it was lack of memory, I would expect an OutOfMemoryError.

I have debugged with IntelliJ IDEA and obtained thread dumps from different executions, and realized quite a few operator threads are stuck on
​​
java.lang.Thread.State: WAITING
.

There are four major threads that I find to be in this waiting state.
The thread dumps I obtained show me where the wait calls originated:

Number 1:

"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)


Number 2:

"Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

Number 3:

"Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 tid=0xd75 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
      at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

Number 4:

"Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
  java.lang.Thread.State: WAITING
      at sun.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
      at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
      at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
      at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:519)
      at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:231)
      - locked <0x23eb> (a java.lang.Object)
      at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
      at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
      at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
      at my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(Misc.java:103)
      at my.package.algorithm.Sample.computeApproximateDeltaFast(Sample.java:492)
      at my.package.algorithm.Sample.run(Sample.java:291).
      at java.lang.Thread.run(Thread.java:748)


While I realize these dumps on their own may not be helpful, they at least (as far as I know) indicate that the threads are all waiting on something.
But if it was resource scarcity I believe the program would terminate with an exception.
And if it was garbage collection activity, I believe the JVM process would not be at 0% CPU usage.

Note: I realize I didn't provide the user-code code that generates the execution plan for Flink which led to the contexts in which the threads are waiting, but I hope it may not be necessary.
My problem now is that I am unsure on how to proceed to further debug this issue:
- The assigned memory is fully used, but there are no exceptions about lack of memory.
- The CPU usage is at 0% and all threads are all in a waiting state, but I don't understand what signal they're waiting for exactly.

Hoping anyone might be able to give me a hint.

Thank you very much for your time.

Best regards,

Miguel E. Coimbra









Reply | Threaded
Open this post in threaded view
|

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

Nico Kruber
In reply to this post by James Yu
Hi James,
it is unlikely that your issue is the same as the one Miguel is having.
His one https://issues.apache.org/jira/browse/FLINK-9242 is probably the
same as https://issues.apache.org/jira/browse/FLINK-9144 and happens
only in batch programs spilling data in Flink 1.5 and 1.6 versions
before last Friday.

From the information you provided, I suppose you are running a streaming
job in Flink 1.4, do you? Your example looks like a simpler setup: can
you try to minimise it so that you can share the code and we can have a
look?


Regards
Nico

On 18/04/18 01:59, James Yu wrote:

> Miguel, I and my colleague ran into same problem yesterday.
> We were expecting Flink to get 4 inputs from Kafka and write the inputs
> to Cassandra, but the operators got stuck after the 1st input is written
> into Cassandra.
> This is how DAG looks like:
> Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Cassandra Sink)
> After we disable the auto chaining
> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#task-chaining-and-resource-groups),
> all 4 inputs are read from Kafka and written into Cassandra.
> We are still figuring out why the chaining causes the blocking.
>
>
> This is a UTF-8 formatted mail
> -----------------------------------------------
> James C.-C.Yu
> +886988713275
>
> 2018-04-18 6:57 GMT+08:00 Miguel Coimbra <[hidden email]
> <mailto:[hidden email]>>:
>
>     Chesnay, following your suggestions I got access to the web
>     interface and also took a closer look at the debugging logs.
>     I have noticed one problem regarding the web interface port - it
>     keeps changing port now and then during my Java program's execution.
>
>     Not sure if that is due to my program launching several job
>     executions sequentially, but the fact is that it happened.
>     Since I am accessing the web interface via tunneling, it becomes
>     rather cumbersome to keep adapting it.
>
>     Another particular problem I'm noticing is that this exception
>     frequently pops up (debugging with log4j):
>
>     00:17:54,368 DEBUG
>     org.apache.flink.runtime.jobmaster.slotpool.SlotPool          -
>     Releasing slot with slot request id 9055ef473251505dac04c99727106dc9.
>     org.apache.flink.util.FlinkException: Slot is being returned to the
>     SlotPool.
>             at
>     org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.returnAllocatedSlot(SlotPool.java:1521)
>             at
>     org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$releaseSlot$0(SingleLogicalSlot.java:130)
>             at
>     java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>             at
>     java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834)
>             at
>     java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155)
>             at
>     org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>             at
>     org.apache.flink.runtime.executiongraph.Execution.releaseAssignedResource(Execution.java:1239)
>             at
>     org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:946)
>             at
>     org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1588)
>             at
>     org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:593)
>             at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
>             at
>     sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>             at java.lang.reflect.Method.invoke(Method.java:498)
>             at
>     org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
>             at
>     org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>             at
>     org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
>             at
>     org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>             at
>     akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>             at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>             at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>             at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>             at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>             at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>             at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>             at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>             at
>     scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>             at
>     scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>             at
>     scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>             at
>     scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>     Don't know if the internals of Flink are explicitly using an
>     exception for control flow, but there are several occurrences of
>     this as time goes by.
>
>     Regarding my program itself, I've achieved some progress.
>     In my program I need to do a sequence of series of Flink jobs, and
>     need extra care to make sure no DataSet instance from job /i/ is
>     being used in an operator in job /i + 1/.
>     I believe this was generating the waiting scenarios I describe in an
>     earlier email.
>     The bottom line is to be extra careful about when job executions are
>     actually triggered and to make sure that a DataSet which will need
>     to be used in different Flink jobs is available for example as a
>     file in secondary storage (possibly masked as a memory-mapping) and
>     is exclusively read from that source.
>     This means ensuring the job that originally produces a DataSet (for
>     reuse on a later job) assigns to it a DataSink for secondary storage.
>
>     I'm going to keep digging taking this in account - if will report
>     back if I manage to fix everything or find a new problem.
>
>     Thanks again,
>
>
>
>     Miguel E. Coimbra
>     Email: [hidden email] <mailto:[hidden email]>
>
>     On 16 April 2018 at 10:26, Chesnay Schepler <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         ah yes, currently when you use that method the UI is started on
>         a random port. I'm currently fixing that in this PR
>         <https://github.com/apache/flink/pull/5814> that will be merged
>         today. For now you will enable logging and search for something
>         along the lines of "http://<host>:<port> was granted leadership"
>
>         Sorry for the inconvenience.
>
>         On 16.04.2018 15:04, Miguel Coimbra wrote:
>>         Thanks for the suggestions Chesnay, I will try them out.
>>
>>         However, I have already tried your suggestion with the
>>         dependency flink-runtime-web and nothing happened.
>>         If I understood you correctly, adding that dependency in the
>>         pom.xml would make it so the web front-end is running when I
>>         call the following line?
>>
>>         LocalEnvironment lenv = (LocalEnvironment)
>>         ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>
>>         I added flink-runtime-web  in my pom.xml, recompiled and
>>         launched the program but I simply got "Unable to connect" in
>>         my browser (Firefox) on localhost:8081.
>>         Performing wget on localhost:8081 resulted in this:
>>
>>         $ wget localhost:8081
>>         --2018-04-16 12:47:26--  http://localhost:8081/
>>         Resolving localhost (localhost)... ::1, 127.0.0.1
>>         Connecting to localhost (localhost)|::1|:8081... failed:
>>         Connection refused.
>>         Connecting to localhost (localhost)|127.0.0.1|:8081... failed:
>>         Connection refused.
>>
>>         It seems something was bound to localhost:8081 but the
>>         connection is not working for some reason.
>>         I probably am skipping some important detail.
>>         These are some of my dependencies:
>>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-java</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-core</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-clients_${scala.binary.version}</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-gelly_${scala.binary.version}</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>            
>>         <artifactId>flink-gelly-examples_${scala.binary.version}</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>            
>>         <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         <dependency>
>>              <groupId>org.apache.flink</groupId>
>>            
>>          <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>>              <version>${flink.version}</version>
>>         </dependency>
>>         <!--
>>         https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web
>>         <https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web>
>>         -->
>>         *<dependency>
>>              <groupId>org.apache.flink</groupId>
>>            
>>          <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
>>              <version>${flink.version}</version>
>>         </dependency>**
>>         *
>>
>>         Have you managed to get the web front-end in local mode?
>>
>>
>>         Best regards,
>>
>>         Miguel E. Coimbra
>>         Email: [hidden email]
>>         <mailto:[hidden email]>
>>
>>         On 16 April 2018 at 05:12, Chesnay Schepler
>>         <[hidden email] <mailto:[hidden email]>> wrote:
>>
>>             The thing with createLocalEnvironmentWithWebUI is that it
>>             requires flink-runtime-web to be on the classpath, which
>>             is rarely the class when running things in the IDE.
>>             It should work fine in the IDE if you add it as a
>>             dependency to your project. This should've been logged as
>>             a warning.
>>
>>             Chaining is unrelated to this issue as join operators are
>>             never chained to one another.
>>             Lambda functions are also not the issue, if they were the
>>             job would fail much earlier.
>>
>>             It is reasonable that T3 is blocked if T1 is blocked. T1
>>             gets no input hence produces no output, which now also
>>             blocks T3.
>>
>>             There are multiple possible explanations i can come up with:
>>             * the preceding operators are blocked on something or
>>             /really /slow
>>             * the preceding operators are actually finished, but
>>             aren't shutting down due to an implementation error
>>             * a deadlock in Flink's join logic
>>             * a deadlock in Flink's network stack
>>
>>             For the first 2 we will have to consult the UI or logs.
>>             You said you were dumping the input DataSets into files,
>>             but were they actually complete?
>>
>>             A deadlock in the network stack should appear as all
>>             existing operator threads being blocked.
>>             We can probably rule out a problem with the join logic by
>>             removing the second join and trying again.
>>
>>
>>
>>             On 16.04.2018 03:10, Miguel Coimbra wrote:
>>>             Hello,
>>>
>>>             It would seem that the function which is supposed to
>>>             launch local mode with the web front-end doesn't launch
>>>             the front-end at all...
>>>             This function seems not to be doing what it is supposed
>>>             to do, if I'm not mistaken:
>>>
>>>             LocalEnvironment lenv = (LocalEnvironment)
>>>             ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>
>>>             Regarding the preceding operators, the thread dumps I got
>>>             were pointing to a specific set of operations over
>>>             DataSet instances that were passed into my function.
>>>             Below I show the code segment and put the lines where
>>>             threads are waiting in *bold*:
>>>
>>>             public static <K, VV, EV> DataSet<Edge<K, EV>>
>>>             selectEdges(final Graph<K, VV, EV> originalGraph, final
>>>             DataSet<Vertex<K, VV>> vertices) {
>>>                 return vertices
>>>                         .joinWithHuge(originalGraph.getEdges())
>>>                         .where(0).equalTo(0)
>>>             *            .with((source, edge) -> edge)* *// Thread 1
>>>             is blocked here*
>>>                         .returns(originalGraph.getEdges().getType())
>>>                         .join(vertices)
>>>                         .where(1).equalTo(0)
>>>             *            .with((e, v) -> e) // Thread 3 is blocked here*
>>>                         .returns(originalGraph.getEdges().getType())
>>>                         .distinct(0, 1);
>>>             }
>>>
>>>             Note: the edges inside the graph originalGraph edge
>>>             DataSet are much greater in number than the elements of
>>>             the vertices DataSet, so I believe that function is being
>>>             used correctly.
>>>
>>>             I will try testing with remote (cluster) mode to have
>>>             access to the web front-end, but I have some questions
>>>             for now:
>>>
>>>             - The fact that they are blocked in different ​
>>>             JoinOperator instances that are chained, is this a result
>>>             of Flink's default pipeline mechanism?
>>>             - Could there be a problem stemming from the fact they
>>>             are both waiting on lambdas?
>>>             - I have tried dumping both DataSet variables
>>>             originalGraph and vertices into files (the ones being
>>>             used in this code), and they produced correct values
>>>             (non-empty files), so I don't have a clue what the
>>>             threads inside Flink's runtime are waiting on.
>>>
>>>             ​Thanks for the help so far Chesnay.​
>>>
>>>
>>>             Miguel E. Coimbra
>>>             Email: [hidden email]
>>>             <mailto:[hidden email]>
>>>
>>>             ---------- Forwarded message ----------
>>>
>>>                 From: Chesnay Schepler <[hidden email]
>>>                 <mailto:[hidden email]>>
>>>                 To: [hidden email] <mailto:[hidden email]>
>>>                 Cc: 
>>>                 Bcc: 
>>>                 Date: Sun, 15 Apr 2018 18:54:33 +0200
>>>                 Subject: Re: Unsure how to further debug - operator
>>>                 threads stuck on java.lang.Thread.State: WAITING
>>>                 Hello,
>>>
>>>                 Thread #1-3 are waiting for input, Thread #4 is
>>>                 waiting for the job to finish.
>>>
>>>                 To further debug this I would look into what the
>>>                 preceding operators are doing, whether they are
>>>                 blocked on something or are emitting records (which
>>>                 you can check in the UI/metrics).
>>>
>>>                 On 15.04.2018 18:40, Miguel Coimbra wrote:
>>>>                 ​Hello,
>>>>
>>>>                 I am running into a situation where the Flink
>>>>                 threads responsible for my operator execution are
>>>>                 all stuck on WAITING mode.
>>>>                 Before anything else, this is my machine's spec:
>>>>
>>>>                 Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7-
>>>>                 4830  @ 2.13GHz GenuineIntel GNU/Linux
>>>>                 256 GB RAM
>>>>
>>>>                 I am running in local mode on a machine with a
>>>>                 considerable amount of memory, so perhaps that may
>>>>                 be triggering some execution edge-case?
>>>>
>>>>                 Moving on, this is my Java:
>>>>
>>>>                 openjdk version "1.8.0_151"
>>>>                 OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
>>>>                 OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
>>>>
>>>>                 Getting back to the problem: I am currently using
>>>>                 Flink 1.5-SNAPSHOT with LocalEnvironment on this
>>>>                 large-memory machine, with parallelism set to one:
>>>>
>>>>                 Configuration conf = new Configuration();
>>>>                 LocalEnvironment lenv = (LocalEnvironment)
>>>>                 ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>>                 ExecutionEnvironment env = lenv;
>>>>                 env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
>>>>                 env.setParallelism(1);
>>>>
>>>>                 This initializes the execution environment for a
>>>>                 series of sequential jobs (any data dependency
>>>>                 between jobs is flushed to disk on job /i /and read
>>>>                 back from disk into a DataSet in job /i + 1/).
>>>>                 To reiterate, I am not launching a Flink cluster, I
>>>>                 am just executing in local mode from a code base
>>>>                 compiled with Maven.
>>>>
>>>>                 I have tested this program via mvn exec:exec with
>>>>                 different values of memory (from -Xmx20000m to
>>>>                 -Xmx120000m, from 20GB to 120GB) and the result is
>>>>                 always the same: the process' memory fills up
>>>>                 completely and then the process' CPU usage drops to 0%.
>>>>                 This is strange because if it was lack of memory, I
>>>>                 would expect an OutOfMemoryError.
>>>>
>>>>                 I have debugged with IntelliJ IDEA and obtained
>>>>                 thread dumps from different executions, and realized
>>>>                 quite a few operator threads are stuck on
>>>>                 java.lang.Thread.State: WAITING.
>>>>
>>>>                 There are four major threads that I find to be in
>>>>                 this waiting state.
>>>>                 The thread dumps I obtained show me where the wait
>>>>                 calls originated:
>>>>
>>>>                 *Number 1:
>>>>
>>>>                 *"CHAIN Join (Join at
>>>>                 selectEdges(GraphUtils.java:328)) -> Combine
>>>>                 (Distinct at selectEdges(GraphUtils.java:330))
>>>>                 (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
>>>>                   java.lang.Thread.State: WAITING
>>>>                       at java.lang.Object.wait(Object.java:-1)
>>>>                       at java.lang.Object.wait(Object.java:502)
>>>>                       at org.apache.flink.runtime.io
>>>>                 <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>>>                       at org.apache.flink.runtime.io
>>>>                 <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>>>                       at org.apache.flink.runtime.io
>>>>                 <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
>>>>                       at org.apache.flink.runtime.io
>>>>                 <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>>>>                 <http://tors.hash.MutableHashTable.pro>cessProbeIter(MutableHashTable.java:505)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>>>>                       at
>>>>                 org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>                       at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>
>>>>                 *Number 2:*
>>>>
>>>>                 "Join (Join at
>>>>                 summaryGraph(SummaryGraphBuilder.java:92))
>>>>                 (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting
>>>>                   java.lang.Thread.State: WAITING
>>>>                       at java.lang.Object.wait(Object.java:-1)
>>>>                       at java.lang.Object.wait(Object.java:502)
>>>>                       at org.apache.flink.runtime.io
>>>>                 <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>>>                       at org.apache.flink.runtime.io
>>>>                 <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>>>                       at org.apache.flink.runtime.io
>>>>                 <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
>>>>                       at org.apache.flink.runtime.io
>>>>                 <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>>>>                 <http://tors.hash.MutableHashTable.pro>cessProbeIter(MutableHashTable.java:505)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>>>>                       at
>>>>                 org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>                       at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>                 *Number 3:*
>>>>
>>>>                 "Join (Join at selectEdges(GraphUtils.java:324))
>>>>                 (1/1)@9118" prio=5 tid=0xd75 nid=NA waiting
>>>>                   java.lang.Thread.State: WAITING
>>>>                       at java.lang.Object.wait(Object.java:-1)
>>>>                       at java.lang.Object.wait(Object.java:502)
>>>>                       at org.apache.flink.runtime.io
>>>>                 <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>>>                       at org.apache.flink.runtime.io
>>>>                 <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>>>                       at org.apache.flink.runtime.io
>>>>                 <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
>>>>                       at org.apache.flink.runtime.io
>>>>                 <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>>>>                 <http://tors.hash.MutableHashTable.pro>cessProbeIter(MutableHashTable.java:505)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>>>>                       at
>>>>                 org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>>>>                       at
>>>>                 org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>                       at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>                 *Number 4:*
>>>>
>>>>                 "Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA
>>>>                 waiting
>>>>                   java.lang.Thread.State: WAITING
>>>>                       at sun.misc.Unsafe.park(Unsafe.java:-1)
>>>>                       at
>>>>                 java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>>>                       at
>>>>                 java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>>>>                       at
>>>>                 java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>>>>                       at
>>>>                 java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>>>>                       at
>>>>                 java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>>>>                       at
>>>>                 org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:519)
>>>>                       at
>>>>                 org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:231)
>>>>                       - locked <0x23eb> (a java.lang.Object)
>>>>                       at
>>>>                 org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
>>>>                       at
>>>>                 org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
>>>>                       at
>>>>                 org.apache.flink.api.java.DataSet.count(DataSet.java:398)
>>>>                       at
>>>>                 my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(Misc.java:103)
>>>>                       at my.package.algorithm.Sample.co
>>>>                 <http://my.package.algorithm.Sample.co>mputeApproximateDeltaFast(Sample.java:492)
>>>>                       at my.package.algorithm.Sample.ru
>>>>                 <http://my.package.algorithm.Sample.ru>n(Sample.java:291).
>>>>                       at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>                 While I realize these dumps on their own may not be
>>>>                 helpful, they at least (as far as I know) indicate
>>>>                 that the threads are all waiting on something.
>>>>                 But if it was resource scarcity I believe the
>>>>                 program would terminate with an exception.
>>>>                 And if it was garbage collection activity, I believe
>>>>                 the JVM process would not be at 0% CPU usage.
>>>>
>>>>                 *Note: *I realize I didn't provide the user-code
>>>>                 code that generates the execution plan for Flink
>>>>                 which led to the contexts in which the threads are
>>>>                 waiting, but I hope it may not be necessary.
>>>>                 My problem now is that I am unsure on how to proceed
>>>>                 to further debug this issue:
>>>>                 - The assigned memory is fully used, but there are
>>>>                 no exceptions about lack of memory.
>>>>                 - The CPU usage is at 0% and all threads are all in
>>>>                 a waiting state, but I don't understand what signal
>>>>                 they're waiting for exactly.
>>>>
>>>>                 Hoping anyone might be able to give me a hint.
>>>>
>>>>                 Thank you very much for your time.
>>>>
>>>>                 Best regards,
>>>>
>>>>                 Miguel E. Coimbra
>>>
>>
>>
>
>
>
--
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


signature.asc (201 bytes) Download Attachment