StreamingFileSink closed file exception

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

StreamingFileSink closed file exception

Billy Bain
I am new to Flink and am trying to process a file and write it out formatted as JSON. 

This is a much simplified version. 

public class AndroidReader {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);
        DataStreamSource<String> lines =  env.readTextFile("file:///path/to/file/input.json");

        SingleOutputStreamOperator<AndroidData> android = lines.map(new AndroidDataMapper());
        StreamingFileSink<AndroidData> sink =  StreamingFileSink.forRowFormat(new Path("file:///path/to/file/output"), new AndroidDataEncoder() )
                .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix("json").build())
                .withRollingPolicy(DefaultRollingPolicy.builder()
                        .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                        .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                        .withMaxPartSize(1024 * 1024 * 1024)
                        .build())
                .build();
        android.addSink(sink);
        env.execute("Android");
    }
}

@JsonIgnoreProperties(ignoreUnknown = true)
public class AndroidData {
    public AndroidData() {
    }
    private String packageName;
    public String getPackageName() {
        return packageName;
    }
    public void setPackageName(String packageName) {
        this.packageName = packageName;
    }
}
public class AndroidDataMapper implements MapFunction<String, AndroidData> {

    private static final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    public AndroidData map(String value) throws Exception {
        return objectMapper.readValue(value, AndroidData.class);
    }
}
AndroidDataEncoder class:
public class AndroidDataEncoder implements Encoder<AndroidData> {

    private static final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void encode(AndroidData element, OutputStream stream) throws IOException {
        objectMapper.writeValue(stream, element);
    }
}

The issue is that I get an ClosedChannelException. I see the folder get created, but then no files are written to it. 
 
java.nio.channels.ClosedChannelException
    at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
    at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
    at org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
    at org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnEvent(DefaultRollingPolicy.java:76)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

Any help would be appreciated. Thanks!

--
Wayne D. Young
aka Billy Bob Bain
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink closed file exception

Yun Gao
Hi Billy,
    
    StreamingFileSink does not expect the Encoder to close the stream passed in in encode method. However, ObjectMapper would close it at the end of the write method. Thus I think you think disable the close action for ObjectMapper, or change the encode implementation to 

 objectMapper.writeValue(new CloseShieldOutputStream(stream), element);

to avoid the stream get closed actually.
------------------Original Mail ------------------
Sender:Billy Bain <[hidden email]>
Send Date:Thu Dec 24 22:32:06 2020
Recipients:User <[hidden email]>
Subject:StreamingFileSink closed file exception
I am new to Flink and am trying to process a file and write it out formatted as JSON. 

This is a much simplified version. 

public class AndroidReader {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);
        DataStreamSource<String> lines =  env.readTextFile("file:///path/to/file/input.json");

        SingleOutputStreamOperator<AndroidData> android = lines.map(new AndroidDataMapper());
        StreamingFileSink<AndroidData> sink =  StreamingFileSink.forRowFormat(new Path("file:///path/to/file/output"), new AndroidDataEncoder() )
                .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix("json").build())
                .withRollingPolicy(DefaultRollingPolicy.builder()
                        .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                        .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                        .withMaxPartSize(1024 * 1024 * 1024)
                        .build())
                .build();
        android.addSink(sink);
        env.execute("Android");
    }
}

@JsonIgnoreProperties(ignoreUnknown = true)
public class AndroidData {
    public AndroidData() {
    }
    private String packageName;
    public String getPackageName() {
        return packageName;
    }
    public void setPackageName(String packageName) {
        this.packageName = packageName;
    }
}
public class AndroidDataMapper implements MapFunction<String, AndroidData> {

    private static final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    public AndroidData map(String value) throws Exception {
        return objectMapper.readValue(value, AndroidData.class);
    }
}
AndroidDataEncoder class:
public class AndroidDataEncoder implements Encoder<AndroidData> {

    private static final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void encode(AndroidData element, OutputStream stream) throws IOException {
        objectMapper.writeValue(stream, element);
    }
}

The issue is that I get an ClosedChannelException. I see the folder get created, but then no files are written to it. 
 
java.nio.channels.ClosedChannelException
    at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
    at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
    at org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
    at org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnEvent(DefaultRollingPolicy.java:76)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

Any help would be appreciated. Thanks!

--
Wayne D. Young
aka Billy Bob Bain
[hidden email]