Hello! We have a real-time streaming workflow that has been running for about 2.5 weeks. Then, we began to get the exception below from taskmanagers (random) since yesterday, and the job began to fail/restart every hour or so. The job does recover after each restart, but sometimes it takes more time to recover than allowed in our environment. On a few occasions, it took more than a few restarts to fully recover. Can you provide some insight into what this error means and also what we can do to prevent this in future? Thank you! ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered error while consuming partitions java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1140) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:748) |
I forgot to mention: this is Flink 1.10. -K On Mon, Dec 7, 2020 at 5:08 PM Kye Bae <[hidden email]> wrote:
|
Hi Kye, Almost for sure this error is not the primary cause of the failure. This error means that the node reporting it, has detected some fatal failure on the other side of the wire (connection reset by peer), but the original error is somehow too slow or unable to propagate to the JobManager before this secondary exception. Something else must have failed/crashed/caused, so you should look for that something. This something can be: 1. TaskManager on the other end has crashed with some error - please look for some errors or warning in other task managers logs 2. OOM or some other JVM failure - again please look at the logs on other machines (maybe system logs) 3. Some OS failure - please look at the system logs on other machines 4. Some hardware failure (restart / crash) 5. Network problems Piotrek pon., 7 gru 2020 o 23:31 Kye Bae <[hidden email]> napisał(a):
|
Hello, Piotr. Thank you. This is an error logged to the taskmanager just before it became "lost" to the jobmanager (i.e., reported as "lost" in the jobmanager log just before the job restart). In what context would this particular error (not the root-root cause you referred to) be thrown from a taskmanager? E.g., any point in the pipeline that involves communicating with other non-collocated tasks running on other taskmanagers? Or with the jobmanager? -K On Tue, Dec 8, 2020 at 3:19 AM Piotr Nowojski <[hidden email]> wrote:
|
Hi, This exception looks like it was thrown by a downstream Task/TaskManager when trying to read a message/packet from some upstream Task/TaskManager and that connection between two TaskManagers was reseted (closed abruptly). So it's the case: > involves communicating with other non-collocated tasks running on other taskmanagers Piotrek wt., 8 gru 2020 o 18:56 Kye Bae <[hidden email]> napisał(a):
|
Hello, Piotr. Thanks again for your continued support. We did look through the log files from the other taskmanagers for any additional errors at or around the time of the said exception but haven't found any other errors. However, we do see INFO-level "Direct memory stats" like this one (Used Memory is 2 bytes greater than Total Capacity, but there are no errors around it): Direct memory stats: Count: 32968, Total Capacity: 1104834342, Used Memory: 1104834344 Could this be an indication of a problem? Or is this normal? We see no other errors, memory-related or otherwise around these entries. Based on the Flink 1.10 memory model attached below, we have these Flink taskmanager memory options (16 GB total physical memory for each taskmanager). I am not aware of a Flink option to set the "direct memory" for Flink taskmanagers, and I don't know how Flink derives the total capacity for it or if ~1 GB is appropriate. taskmanager.memory.task.off-heap.size: 1536m taskmanager.memory.managed.size: 3g taskmanager.memory.task.heap.size: 6g taskmanager.memory.jvm-metaspace.size: 1536m taskmanager.memory.jvm-overhead.max: 2816m On Tue, Dec 8, 2020 at 3:57 PM Piotr Nowojski <[hidden email]> wrote:
Screen Shot 2020-12-08 at 5.44.17 PM.png (117K) Download Attachment |
Hi, At the first glance I can not find anything wrong with those settings. If it was some memory configuration problem that caused this error, I guess it would be visible as an exception somewhere. It's unlikely a GC issue, as if some machine froze and stopped responding for a longer period of time, I think it wouldn't cause "connection reset by peer" on the other end. But have you tried looking into this? Have you enabled GC stats logging and checked if there are no problems? Also have you looked at the stdout/stderr of the TaskManagers and system logs? Are you using a standalone cluster to which you are submitting a job (1)? Or are you using per job cluster (spawning TaskManagers on demand when submitting a job) (2)? In both cases you can also carefully track the JobManager log and try to find if there is some other exception, error or some connection lost message from some OTHER TaskManager (different then the one that threw "connection reset by peer"). But in case of (2), such kind of connection loss might not be detected by JobManager in time, before the cluster is shut down (due to detection of the "connection reset by peer" error). If you can not find anything suspicious (errors, some longer periods of inactivity) in JobManager logs (a) or other TM logs around the timestamp when "connection reset by peer" has occurred, and all of the other TM were still working correctly after the "connection reset by peer" happened, it would mean a couple of things: - it's probably not problem with the Flink itself, but rather with the environment (unstable network) - you could try searching for solutions how to deal with "connection reset by peer" in general, not necessarily just in the Flink context - it wasn't a total machine/TaskManager lost, it was rather single network connection lost between just two TaskManagers Best, Piotrek śr., 9 gru 2020 o 00:03 Kye Bae <[hidden email]> napisał(a):
|
Free forum by Nabble | Edit this page |