Hi. I'm a bit confused: When launching my flink streaming application on EMR release 5.24 (which have flink 1.8 version) that write Kafka messages to s3 parquet files i'm getting the exception below, but when i'm installing flink 1.8 on EMR custom wise it works. What could be the difference behavior? Thanks, Yitzchak. Caused by: java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) |
The code in HadoopRecoverableWriter is:
if (!"hdfs".equalsIgnoreCase(fs.getScheme()) || !HadoopUtils.isMinHadoopVersion(2, 7)) { throw new UnsupportedOperationException( "Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer"); } So one possibility is that your sink path doesn’t have the explicit <a href="hdfs://xxx" class="">hdfs://xxx protocol. Another is that you’re in classpath hell, and your job jar contains an older version of Hadoop jars. — Ken
-------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra |
Hi. I found that the problem is that i didn't have flink-s3-fs-hadoop-<version>.jar in flink lib directory, with that i can use 's3a' protocol. On Tue, Jun 11, 2019 at 4:48 PM Ken Krugler <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |