import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class StreamTest {
@Test // :)
@Tag("unit")
public void testPipelineWithCountTrigger() throws Exception {
runPipeline(10, CountTrigger.of(10));
}
@Test // :(
@Tag("unit")
public void testPipelineWithProcessingTimeTrigger() throws Exception {
runPipeline(10, ProcessingTimeTrigger.create());
}
private void runPipeline(int inputSize, Trigger<Object, TimeWindow> trigger) throws Exception {
MiniClusterWithClientResource miniCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.setShutdownTimeout(org.apache.flink.api.common.time.Time.of(1, TimeUnit.DAYS))
.build()
);
miniCluster.before();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
CollectSink.values.clear();
List<Integer> listOfNumbers = IntStream.rangeClosed(1, inputSize).boxed().collect(Collectors.toList());
// 1st half of pipeline
DataStream<List<Integer>> pipeA = env.fromCollection(listOfNumbers)
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.trigger(trigger)
.process(new Batcher());
// 2nd half of pipeline
DataStream<Integer> pipeB = AsyncDataStream.unorderedWait(pipeA, new AsyncUDF(), 1L, TimeUnit.DAYS, 1 )
.flatMap((List<Integer> records, Collector<Integer> out) -> records.forEach(out::collect)).returns(Types.INT);
pipeB.addSink(new CollectSink());
env.execute();
try {
Thread.sleep(1000L * 3);
} catch (InterruptedException e) {
System.out.println();
}
miniCluster.after();
assertEquals(inputSize, CollectSink.values.size());
}
public static class Batcher extends ProcessAllWindowFunction<Integer, List<Integer>, TimeWindow> {
@Override
public void process(Context context, Iterable<Integer> elements, Collector<List<Integer>> out) throws Exception {
out.collect(StreamSupport.stream(elements.spliterator(), false).collect(Collectors.toList()));
}
}
private static class AsyncUDF extends RichAsyncFunction<List<Integer>, List<Integer>> {
private CompletableFuture<List<Integer>> doAsyncStuff(List<Integer> value) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return value;
});
}
@Override
public void asyncInvoke(List<Integer> input, ResultFuture<List<Integer>> resultFuture) throws Exception {
doAsyncStuff(input).thenAccept(stuff -> resultFuture.complete(Collections.singleton(stuff)));
}
@Override
public void timeout(List<Integer> input, ResultFuture<List<Integer>> resultFuture) throws Exception {
resultFuture.completeExceptionally(new RuntimeException("Timeout!"));
}
}
// create a testing sink
private static class CollectSink implements SinkFunction<Integer> {
public static final List<Integer> values = new ArrayList<>();
@Override
public synchronized void invoke(Integer value) throws Exception {
values.add(value);
}
}
}