Re: OutputFormat vs SinkFunction

Posted by Maximilian Michels on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/OutputFormat-vs-SinkFunction-tp4760p4777.html

Changing the class hierarchy would break backwards-compatibility of the API. However, we could add another method to DataStream to easily use OutputFormats in streaming.

How did you write your adapter? I came up with the one below. Admittedly, it is sort of a hack but works fine. By the way, you can also use the DataStream.write(OutputFormat format) method to use any OutputFormat. The code is below is just if you really only want to use DataStream.addSink(SinkFunction function).

Cheers,
Max

import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.ArrayList;
import java.util.Collection;

public class OutputFormatAdapter<T> extends LocalCollectionOutputFormat<T>
implements SinkFunction<T>, RichFunction {

public OutputFormatAdapter(Collection<T> out) {
super(out);
}

@Override
public void invoke(T value) throws Exception {
super.writeRecord(value);
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
}

@Override
public IterationRuntimeContext getIterationRuntimeContext() {
throw new UnsupportedOperationException("This is not supported.");
}


/** Small test */
public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

final DataStreamSource<Long> longDataStreamSource = env.generateSequence(0, 1000);

final ArrayList<Long> longs = new ArrayList<>();

longDataStreamSource.addSink(new OutputFormatAdapter<>(longs));

env.execute();

for (long l : longs) {
System.out.println(l);
}
}
}


On Mon, Feb 8, 2016 at 6:07 PM, Nick Dimiduk <[hidden email]> wrote:
In my case, I have my application code that is calling addSink, for which I'm writing a test that needs to use LocalCollectionOutputFormat. Having two separate class hierarchies is not helpful, hence the adapter. Much of this code already exists in the implementation of FileSinkFunction, so the project already supports it in a limited way.

On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels <[hidden email]> wrote:
Hi Nick,

SinkFunction just implements user-defined functions on incoming
elements. OutputFormat offers more lifecycle methods. Thus it is a
more powerful interface. The OutputFormat originally comes from the
batch API, whereas the SinkFunction originates from streaming. Those
were more separate code paths in the past. Ultimately, it would make
sense to have only the OutputFormat interface but I think we have to
keep it to not break the API.

If you need the lifecycle methods in streaming, there is
RichSinkFunction, which implements OutputFormat and SinkFunction. In
addition, it gives you access to the RuntimeContext. You can pass this
directly to the "addSink(sinkFunction)" API method.

Cheers,
Max

On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk <[hidden email]> wrote:
> Heya,
>
> Is there a plan to consolidate these two interfaces? They appear to provide
> identical functionality, differing only in lifecycle management. I found
> myself writing an adaptor so I can consume an OutputFormat where a
> SinkFunction is expected; there's not much to it. This seems like code that
> Flink should ship.
>
> Maybe one interface or the other can be deprecated for 1.0 API?
>
> Thanks,
> Nick