Hi,
I've just tried to run the basic example for Apache Flink on an Apple Mac Pro with the new M1 Processor. I only need this for development purposes. The actual thing is going to run on a Linux server later on, so I would not mind if it only runs using the Rosetta compatibility layer. Unfortunately it failed with the following Stack Trace:
Without Rosetta I get: ./bin/flink run ./examples/streaming/WordCount.jar Executing WordCount example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/muthmann/Development/Flink/flink-1.12.2/lib/flink-dist_2.11-1.12.2.jar) to field java.lang.String.value WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute job 'Streaming WordCount'. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'Streaming WordCount'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782) at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:97) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) ... 8 more Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400) at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) at org.apache.flink.runtime.rest.RestClient$ClientHandler.exceptionCaught(RestClient.java:613) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireExceptionCaught(CombinedChannelDuplexHandler.java:424) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:92) at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$1.fireExceptionCaught(CombinedChannelDuplexHandler.java:145) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:143) at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.exceptionCaught(CombinedChannelDuplexHandler.java:231) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted. at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386) ... 29 more Caused by: java.util.concurrent.CompletionException: java.io.IOException: Connection reset by peer at java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367) at java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:376) at java.base/java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1019) ... 27 more Caused by: java.io.IOException: Connection reset by peer at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method) at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276) at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233) at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223) at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:358) at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148) ... 7 more Funny thing is, I am able to execute a Flink Pipeline from within my IntelliJ IDE, so I thought it should be possible somehow. So my question is, did anyone get this to run? Is there any chance of executing Flink jobs on Apple Silicon? I’ve posted the same question on StackOverflow. So If you’d like some points there you are free to head over and post a reply. ;) Thanks and Regards Klemens Muthmann |
Hey Klemens, I'm sorry that you are running into this. Looks like you are the first (of probably many people) who use Flink on a M1 chip. If you are up for it, we would really appreciate a fix for this issue, as a contribution to Flink. Maybe you can distill the problem into an integration test, so that you can look into fixes right from your IDE. (It seems that the RestClient is causing the problems. The client is used by the command line interface to upload the job to the cluster (that's not happening when executing the job from the IDE)) My first guess is that a newer netty version might be required? Or maybe there's some DEBUG log output that's helpful in understanding the issue? On Tue, Apr 13, 2021 at 5:34 PM Klemens Muthmann <[hidden email]> wrote:
|
Hi,
Since kindergarden time is shortened due to the pandemic I only get four hours of work into each day and I am supposed to do eight. So unfortunately I will not be able to develop a fix at the moment. -.- I am happy to provide any debug log you need or test adaptations and provide fixes as pull requests. But I will sadly have no time to do any research into the problem. :( So for now I guess I will be using one of our Linux servers to test the Flink Pipelines until Silicon is supported. Nevertheless, thanks for your answer. If there is anything I can provide you to narrow down the problem, I am happy to help. Regards Klemens
|
Hi, a DEBUG log of the client would indeed be nice. Can you adjust this file: conf/log4j-cli.properties to the following contents: (basically TRACE logging with netty logs enabled) ################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ # Allows this configuration to be modified at runtime. The file will be checked every 30 seconds. monitorInterval=30 rootLogger.level = TRACE rootLogger.appenderRef.file.ref = FileAppender # Log all infos in the given file appender.file.name = FileAppender appender.file.type = FILE appender.file.append = false appender.file.fileName = ${sys:log.file} appender.file.layout.type = PatternLayout appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Log output from org.apache.flink.yarn to the console. This is used by the # CliFrontend class when using a per-job YARN cluster. logger.yarn.name = org.apache.flink.yarn logger.yarn.level = INFO logger.yarn.appenderRef.console.ref = ConsoleAppender logger.yarncli.name = org.apache.flink.yarn.cli.FlinkYarnSessionCli logger.yarncli.level = INFO logger.yarncli.appenderRef.console.ref = ConsoleAppender logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.hadoop.appenderRef.console.ref = ConsoleAppender # Make sure hive logs go to the file. logger.hive.name = org.apache.hadoop.hive logger.hive.level = INFO logger.hive.additivity = false logger.hive.appenderRef.file.ref = FileAppender # Log output from org.apache.flink.kubernetes to the console. logger.kubernetes.name = org.apache.flink.kubernetes logger.kubernetes.level = INFO logger.kubernetes.appenderRef.console.ref = ConsoleAppender appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # suppress the warning that hadoop native libraries are not loaded (irrelevant for the client) logger.hadoopnative.name = org.apache.hadoop.util.NativeCodeLoader logger.hadoopnative.level = OFF # Suppress the irrelevant (wrong) warnings from the Netty channel handler #logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline #logger.netty.level = OFF And then submit a job locally, and send me the respective log file (containing the "client" string in the file name). Thanks a lot, and stay healthy through the pandemic! Best, Robert On Thu, Apr 15, 2021 at 9:12 PM Klemens Muthmann <[hidden email]> wrote:
|
Hi,
I’ve appended you two log files. One is from a run without the Rosetta 2 compatibility layer while the other is with. As I said it would be great if everything works without Rosetta 2, but at first it might be sufficient to make it work with the compatibility layer. Regards Klemens
flink-muthmann-client-KlemensMac.local.log_native (84K) Download Attachment flink-muthmann-client-KlemensMac.local.log_rosetta (84K) Download Attachment |
Thanks a lot for the logs. I filed a ticket to track the issue: https://issues.apache.org/jira/browse/FLINK-22331 I hope somebody with M1 hardware will soon have time to look into it. On Fri, Apr 16, 2021 at 11:02 AM Klemens Muthmann <[hidden email]> wrote:
|
Have you tried by using the command?
for example: ./bin/flink run /Users/abubakarsiddiqurrahman/Documents/flink/examples/streaming/WordCount.jar --input file:///Users/abubakarsiddiqurrahman/Documents/flink/examples/streaming/input.txt --output file:///Users/abubakarsiddiqurrahman/Documents/flink/examples/streaming/streamingoutput
Thank you On Sat, Apr 17, 2021 at 1:17 PM Robert Metzger <[hidden email]> wrote:
Regards, Abu Bakar Siddiqur Rahman |
Free forum by Nabble | Edit this page |