Re: OutputFormat vs SinkFunction
Posted by
Maximilian Michels on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/OutputFormat-vs-SinkFunction-tp4760p4777.html
Changing the class hierarchy would break backwards-compatibility of the API. However, we could add another method to DataStream to easily use OutputFormats in streaming.
How did you write your adapter? I came up with the one below. Admittedly, it is sort of a hack but works fine. By the way, you can also use the DataStream.write(OutputFormat format) method to use any OutputFormat. The code is below is just if you really only want to use DataStream.addSink(SinkFunction function).
Cheers,
Max
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.util.ArrayList;
import java.util.Collection;
public class OutputFormatAdapter<T> extends LocalCollectionOutputFormat<T>
implements SinkFunction<T>, RichFunction {
public OutputFormatAdapter(Collection<T> out) {
super(out);
}
@Override
public void invoke(T value) throws Exception {
super.writeRecord(value);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
}
@Override
public IterationRuntimeContext getIterationRuntimeContext() {
throw new UnsupportedOperationException("This is not supported.");
}
/** Small test */
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final DataStreamSource<Long> longDataStreamSource = env.generateSequence(0, 1000);
final ArrayList<Long> longs = new ArrayList<>();
longDataStreamSource.addSink(new OutputFormatAdapter<>(longs));
env.execute();
for (long l : longs) {
System.out.println(l);
}
}
}