I am new to Flink and am trying to process a file and write it out formatted as JSON. This is a much simplified version. public class AndroidReader { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); DataStreamSource<String> lines = env.readTextFile("file:///path/to/file/input.json"); SingleOutputStreamOperator<AndroidData> android = lines.map(new AndroidDataMapper()); StreamingFileSink<AndroidData> sink = StreamingFileSink.forRowFormat(new Path("file:///path/to/file/output"), new AndroidDataEncoder() ) .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix("json").build()) .withRollingPolicy(DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) .withMaxPartSize(1024 * 1024 * 1024) .build()) .build(); android.addSink(sink); env.execute("Android"); } } @JsonIgnoreProperties(ignoreUnknown = true) public class AndroidDataMapper implements MapFunction<String, AndroidData> {public class AndroidData { public AndroidData() { } private String packageName; public String getPackageName() { return packageName; } public void setPackageName(String packageName) { this.packageName = packageName; } } private static final ObjectMapper objectMapper = new ObjectMapper(); @Override public AndroidData map(String value) throws Exception { return objectMapper.readValue(value, AndroidData.class); } } AndroidDataEncoder class: public class AndroidDataEncoder implements Encoder<AndroidData> { private static final ObjectMapper objectMapper = new ObjectMapper(); @Override public void encode(AndroidData element, OutputStream stream) throws IOException { objectMapper.writeValue(stream, element); } } The issue is that I get an ClosedChannelException. I see the folder get created, but then no files are written to it. java.nio.channels.ClosedChannelException at org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101) at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70) at org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnEvent(DefaultRollingPolicy.java:76) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) Any help would be appreciated. Thanks! -- |
Hi Billy, StreamingFileSink does not expect the Encoder to close the stream passed in in encode method. However, ObjectMapper would close it at the end of the write method. Thus I think you think disable the close action for ObjectMapper, or change the encode implementation to objectMapper.writeValue(new CloseShieldOutputStream(stream), element); to avoid the stream get closed actually.
|
Free forum by Nabble | Edit this page |