StreamingFileSink closed file exception

Posted by Billy Bain on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/StreamingFileSink-closed-file-exception-tp40291.html

I am new to Flink and am trying to process a file and write it out formatted as JSON. 

This is a much simplified version. 

public class AndroidReader {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);
        DataStreamSource<String> lines =  env.readTextFile("file:///path/to/file/input.json");

        SingleOutputStreamOperator<AndroidData> android = lines.map(new AndroidDataMapper());
        StreamingFileSink<AndroidData> sink =  StreamingFileSink.forRowFormat(new Path("file:///path/to/file/output"), new AndroidDataEncoder() )
                .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix("json").build())
                .withRollingPolicy(DefaultRollingPolicy.builder()
                        .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                        .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                        .withMaxPartSize(1024 * 1024 * 1024)
                        .build())
                .build();
        android.addSink(sink);
        env.execute("Android");
    }
}

@JsonIgnoreProperties(ignoreUnknown = true)
public class AndroidData {
    public AndroidData() {
    }
    private String packageName;
    public String getPackageName() {
        return packageName;
    }
    public void setPackageName(String packageName) {
        this.packageName = packageName;
    }
}
public class AndroidDataMapper implements MapFunction<String, AndroidData> {

    private static final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    public AndroidData map(String value) throws Exception {
        return objectMapper.readValue(value, AndroidData.class);
    }
}
AndroidDataEncoder class:
public class AndroidDataEncoder implements Encoder<AndroidData> {

    private static final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void encode(AndroidData element, OutputStream stream) throws IOException {
        objectMapper.writeValue(stream, element);
    }
}

The issue is that I get an ClosedChannelException. I see the folder get created, but then no files are written to it. 
 
java.nio.channels.ClosedChannelException
    at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
    at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
    at org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
    at org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnEvent(DefaultRollingPolicy.java:76)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

Any help would be appreciated. Thanks!

--
Wayne D. Young
aka Billy Bob Bain
[hidden email]