I am new to Flink and am trying to process a file and write it out formatted as JSON.
This is a much simplified version.
public class AndroidReader {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
DataStreamSource<String> lines = env.readTextFile("file:///path/to/file/input.json");
SingleOutputStreamOperator<AndroidData> android = lines.map(new AndroidDataMapper());
StreamingFileSink<AndroidData> sink = StreamingFileSink.forRowFormat(new Path("file:///path/to/file/output"), new AndroidDataEncoder() )
.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix("json").build())
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();
android.addSink(sink);
env.execute("Android");
}
}
@JsonIgnoreProperties(ignoreUnknown = true)
public class AndroidData {
public AndroidData() {
}
private String packageName;
public String getPackageName() {
return packageName;
}
public void setPackageName(String packageName) {
this.packageName = packageName;
}
}
public class AndroidDataMapper implements MapFunction<String, AndroidData> {
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public AndroidData map(String value) throws Exception {
return objectMapper.readValue(value, AndroidData.class);
}
}
AndroidDataEncoder class:
public class AndroidDataEncoder implements Encoder<AndroidData> {
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void encode(AndroidData element, OutputStream stream) throws IOException {
objectMapper.writeValue(stream, element);
}
}
The issue is that I get an ClosedChannelException. I see the folder get created, but then no files are written to it.
java.nio.channels.ClosedChannelException
at java.base/
sun.nio.ch.
FileChannelImpl.ensureOpen(
FileChannelImpl.java:
150)
at java.base/
sun.nio.ch.
FileChannelImpl.position(
FileChannelImpl.java:
325)
at org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
at org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnEvent(DefaultRollingPolicy.java:76)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
Any help would be appreciated. Thanks!
--