StreamingFileSink in version 1.8

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

StreamingFileSink in version 1.8

Yitzchak Lieberman

Hi.


I'm a bit confused:

When launching my flink streaming application on EMR release 5.24 (which have flink 1.8 version) that write Kafka messages to s3 parquet files i'm getting the exception below, but when i'm installing flink 1.8 on EMR custom wise it works.

What could be the difference behavior?


Thanks,

Yitzchak.


Caused by: java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer

at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)

at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)

at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)

at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)

at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)

at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)

at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)

at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)

at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)

at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:748)

Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink in version 1.8

Ken Krugler
The code in HadoopRecoverableWriter is:

if (!"hdfs".equalsIgnoreCase(fs.getScheme()) || !HadoopUtils.isMinHadoopVersion(2, 7)) {
throw new UnsupportedOperationException(
"Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer");
}

So one possibility is that your sink path doesn’t have the explicit <a href="hdfs://xxx" class="">hdfs://xxx protocol. 

Another is that you’re in classpath hell, and your job jar contains an older version of Hadoop jars.

— Ken


On Jun 11, 2019, at 12:16 AM, Yitzchak Lieberman <[hidden email]> wrote:

Hi.

I'm a bit confused:
When launching my flink streaming application on EMR release 5.24 (which have flink 1.8 version) that write Kafka messages to s3 parquet files i'm getting the exception below, but when i'm installing flink 1.8 on EMR custom wise it works.
What could be the difference behavior?

Thanks,
Yitzchak.

Caused by: java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink in version 1.8

Yitzchak Lieberman
Hi.

I found that the problem is that i didn't have flink-s3-fs-hadoop-<version>.jar in flink lib directory, with that i can use 's3a' protocol.

On Tue, Jun 11, 2019 at 4:48 PM Ken Krugler <[hidden email]> wrote:
The code in HadoopRecoverableWriter is:

if (!"hdfs".equalsIgnoreCase(fs.getScheme()) || !HadoopUtils.isMinHadoopVersion(2, 7)) {
throw new UnsupportedOperationException(
"Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer");
}

So one possibility is that your sink path doesn’t have the explicit hdfs://xxx protocol. 

Another is that you’re in classpath hell, and your job jar contains an older version of Hadoop jars.

— Ken


On Jun 11, 2019, at 12:16 AM, Yitzchak Lieberman <[hidden email]> wrote:

Hi.

I'm a bit confused:
When launching my flink streaming application on EMR release 5.24 (which have flink 1.8 version) that write Kafka messages to s3 parquet files i'm getting the exception below, but when i'm installing flink 1.8 on EMR custom wise it works.
What could be the difference behavior?

Thanks,
Yitzchak.

Caused by: java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra