Fwd: some question about side output

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Fwd: some question about side output

Marvin777
Hi, all:

Using Flink’s side output feature we can get a stream of the data that was discarded as late.But when i use the getSideOutput() method, i have the following error message:

Thanks & Regards
Qingxiang Ma

Reply | Threaded
Open this post in threaded view
|

Re: Fwd: some question about side output

Biplob Biswas
Change the type of the mainstream from DataStream to
SingleOutputStreamOperator

The getSideOutput() function is not part of the base class DataStream rather
the extended Class SingleOutputStreamOperator



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: some question about side output

Chen Qin
In reply to this post by Marvin777
Hi Qingxiang,

getSideOuput is only available in SingleOutputOperator class. You might consider update your DataStream<...> window to SingleOutputOperator and it should works fine.

Thanks,
Chen

Code sample attached handy.
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import javax.annotation.Nullable;

public class SideOutputExample {

public static void main(String argv[]) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

final OutputTag<Tuple2<String, Long>> lateTag = new OutputTag<Tuple2<String, Long>>("tag"){};

SingleOutputStreamOperator output = env.addSource(new SourceFunction<Tuple2<String, Long>>() {
public void run(SourceContext<Tuple2<String, Long>> sourceContext) throws Exception {
// emit three events
synchronized (sourceContext.getCheckpointLock()) {
sourceContext.collect(new Tuple2<String, Long>("a", 0l));
sourceContext.collect(new Tuple2<String, Long>("c", 2l));
sourceContext.collect(new Tuple2<String, Long>("b", 1l));
}
}

public void cancel() {
//SKIP
}
}).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<String, Long>>() {
@Nullable
public Watermark checkAndGetNextWatermark(Tuple2<String, Long> stringLongTuple2, long l) {
return new Watermark(stringLongTuple2.f1);
}

public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}).timeWindowAll(Time.milliseconds(1)).sideOutputLateData(lateTag)
                .apply(new AllWindowFunction<Tuple2<String,Long>, String, TimeWindow>() {
public void apply(TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {
for(Tuple2<String, Long> it : iterable) {
collector.collect(it.f0);
}
}
});

//print on time event
output.print();

// print late arriving event
output.getSideOutput(lateTag).print();

env.execute();
}
}

On Wed, Sep 6, 2017 at 6:41 AM, 马庆祥 <[hidden email]> wrote:
Hi, all:

Using Flink’s side output feature we can get a stream of the data that was discarded as late.But when i use the getSideOutput() method, i have the following error message:

Thanks & Regards
Qingxiang Ma