And it works now
The problem was that I was setting jobmaneger.rest.address jobmanager.rpc.address that was creating actor system on the local host Although I am still getting the below messages in the job manager periodically, but they seem to be harmless ERROR org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler - Caught exception java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) at java.lang.Thread.run(Thread.java:748) 2019-02-22 16:49:22,016 WARN org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Unhandled exception java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) at java.lang.Thread.run(Thread.java:748)
|
In reply to this post by Dawid Wysakowicz-2
And its working now.
Sorry about all confusion. The root cause of the problem was that in “standard” Link image directory /opt/flink/log does not have write permissions and as a result the local cluster was never created
|
And there are more explanations and concerns there
It turns out that the execution depends on where my jar is located. If I put it in lib directory (I need this to be able to run bin/standalone-job.sh ) It fails. With the same error. Currently I have the following in this directory fdp-flink-taxiride-assembly-2.0.0.jar flink-metrics-prometheus-1.7.2.jar flink-queryable-state-runtime_2.11-1.7.2.jar log4j-1.2.17.jar flink-dist_2.11-1.7.2.jar flink-python_2.11-1.7.2.jar flink-table_2.11-1.7.2.jar slf4j-log4j12-1.7.15.jar And no one of them seem to directly contain Kafka (except for my jar fdp-flink-taxiride-assembly-2.0.0.jar) But id I move my jar in any other directory it works fine.
|
Any progress on this one?
|
Hi Boris,
1. I assume you’ve tried marking the Kafka dependencies as “provided”, and that also failed, yes? 2. Did you try shading the Kafka dependencies? — Ken PS - Moving this conversation back to the user list.
-------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra |
I am not including Kafka per se - I am using Flink Kafka connector, that pulls in Kafka.
I tried exclude and it failed. I can’t quite shade Kafka and it does not seem like the right thing to do. I tried to see where it is loaded from, and its my jar, but I see its being loaded twice when it is in Lib. You guys should be able to easily reproduce it on any Kafka sample that you have
|
Hi Boris, I was not able tot reproduce your problem. I am using the docker-entrypoint.sh and Dockerfile from the flink-container project [1]. The content of my fat-jar: flinkShadowJar +--- project :fe-flink-common +--- org.apache.flink:flink-metrics-influxdb_2.11:1.8-SNAPSHOT +--- org.apache.flink:flink-metrics-slf4j:1.7.2 +--- org.apache.flink:flink-connector-kafka_2.11:1.7.2 | +--- org.apache.flink:flink-connector-kafka-base_2.11:1.7.2 | \--- org.apache.kafka:kafka-clients:2.0.1 | +--- org.lz4:lz4-java:1.4.1 | \--- org.xerial.snappy:snappy-java:1.1.7.1 \--- org.apache.flink:flink-s3-fs-presto:1.7.2 In the standard Docker image the job.jar is "only" linked into the Flink lib directory. I also tested it with a modified Dokerfile, which copies the job.jar into the lib directory of Flink. bash-4.4$ ls -lia lib total 135140 611154 drwxrwxr-x 1 flink flink 4096 Mar 6 10:20 . 606092 drwxrwxr-x 1 flink flink 4096 Feb 11 15:38 .. 26477916 -rw-r--r-- 1 flink flink 93445474 Feb 11 15:38 flink-dist_2.11-1.7.2.jar 26477917 -rw-r--r-- 1 flink flink 141937 Feb 11 15:37 flink-python_2.11-1.7.2.jar 26477525 -rw-r--r-- 1 flink flink 44278062 Mar 6 10:20 job.jar 26477918 -rw-rw-r-- 1 flink flink 489884 Feb 11 14:32 log4j-1.2.17.jar 26477919 -rw-rw-r-- 1 flink flink 9931 Feb 11 14:32 slf4j-log4j12-1.7.15.jar This also works. In that case you only need to make sure, that the job.jar is owned by the flink user by epxlicitly changing the ownership in the Dockerfile. Given your description of the Dockerfile, I suspect this is the problem. Cheers, Konstantin On Wed, Feb 27, 2019 at 11:19 PM Boris Lublinsky <[hidden email]> wrote:
-- Konstantin Knauf | Solutions Architect +49 160 91394525 Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Data Artisans GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen |
Thanks Konstanin,
A couple of question? 1. Is your fat jar in lib directory? If it is not it works fine 2. The error occurs only when you start pushing traffic on Kafka
|
Hi Boris, in the first case it is linked into the lib directory, in the second case it is located in the lib directory. Both work. My job uses both the Kafka producer as well the Kafka consumer and both are processing records. Best, Konstantin On Wed, Mar 6, 2019 at 3:16 PM Boris Lublinsky <[hidden email]> wrote:
-- Konstantin Knauf | Solutions Architect +49 160 91394525 Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Data Artisans GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen |
Free forum by Nabble | Edit this page |