|
Hi Qingxiang,
getSideOuput is only available in SingleOutputOperator class. You might consider update your DataStream<...> window to SingleOutputOperator and it should works fine.
Thanks, Chen
Code sample attached handy. import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;
import javax.annotation.Nullable;
public class SideOutputExample {
public static void main(String argv[]) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final OutputTag<Tuple2<String, Long>> lateTag = new OutputTag<Tuple2<String, Long>>("tag"){};
SingleOutputStreamOperator output = env.addSource(new SourceFunction<Tuple2<String, Long>>() { public void run(SourceContext<Tuple2<String, Long>> sourceContext) throws Exception { // emit three events synchronized (sourceContext.getCheckpointLock()) { sourceContext.collect(new Tuple2<String, Long>("a", 0l)); sourceContext.collect(new Tuple2<String, Long>("c", 2l)); sourceContext.collect(new Tuple2<String, Long>("b", 1l)); } }
public void cancel() { //SKIP } }).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<String, Long>>() { @Nullable public Watermark checkAndGetNextWatermark(Tuple2<String, Long> stringLongTuple2, long l) { return new Watermark(stringLongTuple2.f1); }
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) { return stringLongTuple2.f1; } }).timeWindowAll(Time.milliseconds(1)).sideOutputLateData(lateTag) .apply(new AllWindowFunction<Tuple2<String,Long>, String, TimeWindow>() { public void apply(TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception { for(Tuple2<String, Long> it : iterable) { collector.collect(it.f0); } } });
//print on time event output.print();
// print late arriving event output.getSideOutput(lateTag).print();
env.execute(); } }
|