Hello,
I have a process that works fine with flink 0.8.1 but I decided to test it against 0.9.0-milestone-1. I have 12 task managers across 3 machines - so it's a small setup. The process fails with the following message. It appears that it's attempting to do a shuffle in response to my join request. I checked all 3 machines and there are no issues with the hostname on any of them. But the host being reported as "localhost" seems to make me wonder if I haven't missed something obvious. I noticed this exception in one of the Travis CI builds, so I'm hoping it's something obvious I've missed. 06/23/2015 05:03:00 Join (Join at run(Job.java:137))(11/12) switched to RUNNING 06/23/2015 05:03:00 Join (Join at run(Job.java:176))(9/12) switched to RUNNING 06/23/2015 05:03:00 Join (Join at run(Job.java:176))(12/12) switched to RUNNING 06/23/2015 05:03:00 Join (Join at run(Job.java:137))(12/12) switched to FAILED java.lang.Exception: The data preparation for task 'Join (Join at run(Job.java:137))' , caused an error: Connecting the channel failed: Connection refused: localhost/127.0.0.1:46229 at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:472) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Connecting the channel failed: Connection refused: localhost/127.0.0.1:46229 at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:193) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:129) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:65) at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:57) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:106) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:305) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:328) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:76) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:696) at org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:440) at org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator.open(NonReusingBuildSecondHashMatchIterator.java:85) at org.apache.flink.runtime.operators.MatchDriver.prepare(MatchDriver.java:160) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466) ... 3 more Thanks |
Hey Aaron,
thanks for reporting the issue. You are right that the Exception is thrown during a shuffle. The receiver initiates a TCP connection to receive all the data for the join. A failing connect usually means that there respective TaskManager is not running. Can you check whether all expected task managers are running? You can use the web interface of the job manager for this (http://jm-address:8081). Some further questions: - Are you running in stand alone/cluster mode (e.g. slaves files configured and bin/start-cluster.sh script used)? - Is this reproducible? – Ufuk On 23 Jun 2015, at 07:11, Aaron Jackson <[hidden email]> wrote: > Hello, > > I have a process that works fine with flink 0.8.1 but I decided to test it against 0.9.0-milestone-1. I have 12 task managers across 3 machines - so it's a small setup. > > The process fails with the following message. It appears that it's attempting to do a shuffle in response to my join request. I checked all 3 machines and there are no issues with the hostname on any of them. But the host being reported as "localhost" seems to make me wonder if I haven't missed something obvious. > > I noticed this exception in one of the Travis CI builds, so I'm hoping it's something obvious I've missed. > > 06/23/2015 05:03:00 Join (Join at run(Job.java:137))(11/12) switched to RUNNING > 06/23/2015 05:03:00 Join (Join at run(Job.java:176))(9/12) switched to RUNNING > 06/23/2015 05:03:00 Join (Join at run(Job.java:176))(12/12) switched to RUNNING > 06/23/2015 05:03:00 Join (Join at run(Job.java:137))(12/12) switched to FAILED > java.lang.Exception: The data preparation for task 'Join (Join at run(Job.java:137))' , caused an error: Connecting the channel failed: Connection refused: localhost/127.0.0.1:46229 > at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:472) > at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Connecting the channel failed: Connection refused: localhost/127.0.0.1:46229 > at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:193) > at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:129) > at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:65) > at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:57) > at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:106) > at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:305) > at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:328) > at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:76) > at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > at org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:696) > at org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:440) > at org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator.open(NonReusingBuildSecondHashMatchIterator.java:85) > at org.apache.flink.runtime.operators.MatchDriver.prepare(MatchDriver.java:160) > at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466) > ... 3 more > > Thanks |
Yes, the task manager continues running. I have put together a test app to demonstrate the problem and in doing so noticed some oddities. The problem manifests itself on a simple join (I originally believed it was the distinct, I was wrong).
In all cases, I stopped the cluster and restarted the cluster. Then ran the application twice, once to make sure the error occurred on a clean cluster and then once again on a cluster that had previously had a failed job. You can find the application at https://github.com/ajaxx/flink-examples/tree/master/FlinkErrorWithFile. Please let me know if there is anything I can do to help. Aaron On Tue, Jun 23, 2015 at 1:50 AM, Ufuk Celebi <[hidden email]> wrote: Hey Aaron, |
Hey Aaron,
thanks for preparing the example. I've checked it out and tried it with a similar setup (12 task managers with 1 slots each, running the job with parallelism of 12). I couldn't reproduce the problem. What have you configured in the "slaves" file? I think Flink does not allow you to run multiple task managers on a single machine with the startup scripts. Can you provide some information on how you start the system? Thanks for helping out with this. – Ufuk On 24 Jun 2015, at 05:09, Aaron Jackson <[hidden email]> wrote: > Yes, the task manager continues running. I have put together a test app to demonstrate the problem and in doing so noticed some oddities. The problem manifests itself on a simple join (I originally believed it was the distinct, I was wrong). > • When the source is generated via fromCollection(), it works fine. > • When the source is generated via readCsvFile() where the file URL is of the form file:/// it fails. > • When the source is generated via JDBCInputFormat it fails. > • My real app uses the JDBCInputFormat but I converted it to work off data that might be in a file. > In all cases, I stopped the cluster and restarted the cluster. Then ran the application twice, once to make sure the error occurred on a clean cluster and then once again on a cluster that had previously had a failed job. You can find the application at https://github.com/ajaxx/flink-examples/tree/master/FlinkErrorWithFile. > > Please let me know if there is anything I can do to help. > > Aaron > > On Tue, Jun 23, 2015 at 1:50 AM, Ufuk Celebi <[hidden email]> wrote: > Hey Aaron, > > thanks for reporting the issue. > > You are right that the Exception is thrown during a shuffle. The receiver initiates a TCP connection to receive all the data for the join. A failing connect usually means that there respective TaskManager is not running. > > Can you check whether all expected task managers are running? You can use the web interface of the job manager for this (http://jm-address:8081). > > Some further questions: > - Are you running in stand alone/cluster mode (e.g. slaves files configured and bin/start-cluster.sh script used)? > - Is this reproducible? > > – Ufuk > > On 23 Jun 2015, at 07:11, Aaron Jackson <[hidden email]> wrote: > > > Hello, > > > > I have a process that works fine with flink 0.8.1 but I decided to test it against 0.9.0-milestone-1. I have 12 task managers across 3 machines - so it's a small setup. > > > > The process fails with the following message. It appears that it's attempting to do a shuffle in response to my join request. I checked all 3 machines and there are no issues with the hostname on any of them. But the host being reported as "localhost" seems to make me wonder if I haven't missed something obvious. > > > > I noticed this exception in one of the Travis CI builds, so I'm hoping it's something obvious I've missed. > > > > 06/23/2015 05:03:00 Join (Join at run(Job.java:137))(11/12) switched to RUNNING > > 06/23/2015 05:03:00 Join (Join at run(Job.java:176))(9/12) switched to RUNNING > > 06/23/2015 05:03:00 Join (Join at run(Job.java:176))(12/12) switched to RUNNING > > 06/23/2015 05:03:00 Join (Join at run(Job.java:137))(12/12) switched to FAILED > > java.lang.Exception: The data preparation for task 'Join (Join at run(Job.java:137))' , caused an error: Connecting the channel failed: Connection refused: localhost/127.0.0.1:46229 > > at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:472) > > at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.io.IOException: Connecting the channel failed: Connection refused: localhost/127.0.0.1:46229 > > at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:193) > > at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:129) > > at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:65) > > at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:57) > > at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:106) > > at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:305) > > at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:328) > > at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:76) > > at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > > at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > > at org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:696) > > at org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:440) > > at org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator.open(NonReusingBuildSecondHashMatchIterator.java:85) > > at org.apache.flink.runtime.operators.MatchDriver.prepare(MatchDriver.java:160) > > at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466) > > ... 3 more > > > > Thanks > > |
Thanks. My setup is actually 3 task managers x 4 slots. I played with the parallelism and found that at low values, the error did not occur. I can only conclude that there is some form of data shuffling that is occurring that is sensitive to the data source. Yes, seems a little odd to me as well. OOC, did you load the file into HDFS or use it from a local file system (e.g. file:///tmp/data.csv) - my results have shown that so far, HDFS does not appear to be sensitive to this issue. I updated the example to include my configuration and slaves, but for brevity, I'll include the configurable bits here: jobmanager.rpc.address: host01 jobmanager.rpc.port: 6123 jobmanager.heap.mb: 512 taskmanager.heap.mb: 2048 taskmanager.numberOfTaskSlots: 4 parallelization.degree.default: 1 jobmanager.web.port: 8081 webclient.port: 8080 taskmanager.network.numberOfBuffers: 8192 taskmanager.tmp.dirs: /datassd/flink/tmp And the slaves ... host01 host02 host03 I did notice an extra empty line at the end of the slaves. And while I highly doubt it makes ANY difference, I'm still going to re-run with it removed. Thanks for looking into it. Aaron On Wed, Jun 24, 2015 at 4:26 AM, Ufuk Celebi <[hidden email]> wrote: Hey Aaron, |
In reply to this post by Aaron Jackson
On 24 Jun 2015, at 16:22, Aaron Jackson <[hidden email]> wrote:
> Thanks. My setup is actually 3 task managers x 4 slots. I played with the parallelism and found that at low values, the error did not occur. I can only conclude that there is some form of data shuffling that is occurring that is sensitive to the data source. Yes, seems a little odd to me as well. OOC, did you load the file into HDFS or use it from a local file system (e.g. file:///tmp/data.csv) - my results have shown that so far, HDFS does not appear to be sensitive to this issue. > > I updated the example to include my configuration and slaves, but for brevity, I'll include the configurable bits here: > > jobmanager.rpc.address: host01 > jobmanager.rpc.port: 6123 > jobmanager.heap.mb: 512 > taskmanager.heap.mb: 2048 > taskmanager.numberOfTaskSlots: 4 > parallelization.degree.default: 1 > jobmanager.web.port: 8081 > webclient.port: 8080 > taskmanager.network.numberOfBuffers: 8192 > taskmanager.tmp.dirs: /datassd/flink/tmp > > And the slaves ... > > host01 > host02 > host03 > > I did notice an extra empty line at the end of the slaves. And while I highly doubt it makes ANY difference, I'm still going to re-run with it removed. > > Thanks for looking into it. Thank you for being so helpful. I've tried it with the local filesystem. On 23 Jun 2015, at 07:11, Aaron Jackson <[hidden email]> wrote: > I have 12 task managers across 3 machines - so it's a small setup. Sorry for my misunderstanding. I've tried it with both 12 task managers and 3 as well now. What's odd is that the stack trace shows that it is trying to connect to "localhost" for the remote channel although localhost is not configured anywhere. Let me think about that. ;) – Ufuk |
Aaron, Can you check how the TaskManagers register at the JobManager? When you look at the 'TaskManagers' section in the JobManager's web Interface (at port 8081), what does it say as the TaskManager host names? Does it list "host1", "host2", "host3"...? Thanks, Am 24.06.2015 20:31 schrieb "Ufuk Celebi" <[hidden email]>:
On 24 Jun 2015, at 16:22, Aaron Jackson <[hidden email]> wrote: |
That was it. host3 was showing localhost - looked a little further and it was missing an entry in /etc/hosts. Thanks for looking into this. Aaron On Wed, Jun 24, 2015 at 2:13 PM, Stephan Ewen <[hidden email]> wrote:
|
Nice! TaskManagers need to announce where they listen for connections. We do not yet block "localhost" as an acceptable address, to not prohibit local test setups. There are some routines that try to select an interface that can communicate with the outside world. Is host3 running on the same machine as the JobManager? Or did you experience a long delay until TaskManager 3 was registered? Thanks for helping us debug this, Stephan On Wed, Jun 24, 2015 at 11:58 PM, Aaron Jackson <[hidden email]> wrote:
|
So the JobManager was running on host1. This also explains why I didn't see the problem until I had asked for a sizeable degree of parallelism since it probably never assigned a task to host3. Thanks for your help On Thu, Jun 25, 2015 at 3:34 AM, Stephan Ewen <[hidden email]> wrote:
|
That makes perfect sense, thanks! Am 25.06.2015 21:39 schrieb "Aaron Jackson" <[hidden email]>:
|
Free forum by Nabble | Edit this page |