MiniCluster with ProcessingTimeTrigger

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

MiniCluster with ProcessingTimeTrigger

John Morrow
Hi All,

I'm trying to test a pipeline that consists of two Flink tasks with a MiniCluster. The 1st task has a WindowAll operator which groups items into batches every second, and the 2nd task does an async operation with each batch and flatMaps the result.

I've whittled it down to the bare bones below. There are two tests:
  • testPipelineWithCountTrigger - this one works fine 🙂
  • testPipelineWithProcessingTimeTrigger - this one doesn't give any output 🙁

It seems like a timing issue. If I step through the failing one slowly I can see that the ProcessingTimeTrigger's onElement/onProcessingTime/clear methods do get called, and the asyncInvoke method also gets called, but when I run it the 2nd test fails as it produces no output. I've tried setting the MiniCluster timeout to 1 day, the same with my AsyncUDF timeout, and sleeping for 3 * window after env.execute but no difference. I'm running this with Flink 1.9.0 and OpenJDK8 on Ubuntu (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10).


Any idea how I can get the 2nd test to wait to process the output?


Thanks 🙂

John.






import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static org.junit.jupiter.api.Assertions.assertEquals;


public class StreamTest {

  @Test // :)
  @Tag("unit")
  public void testPipelineWithCountTrigger() throws Exception {
    runPipeline(10, CountTrigger.of(10));
  }

  @Test // :(
  @Tag("unit")
  public void testPipelineWithProcessingTimeTrigger() throws Exception {
    runPipeline(10, ProcessingTimeTrigger.create());
  }


  private void runPipeline(int inputSize, Trigger<Object, TimeWindow> trigger) throws Exception {

    MiniClusterWithClientResource miniCluster = new MiniClusterWithClientResource(
        new MiniClusterResourceConfiguration.Builder()
            .setNumberSlotsPerTaskManager(1)
            .setNumberTaskManagers(1)
            .setShutdownTimeout(org.apache.flink.api.common.time.Time.of(1, TimeUnit.DAYS))
            .build()
    );
    miniCluster.before();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    CollectSink.values.clear();

    List<Integer> listOfNumbers = IntStream.rangeClosed(1, inputSize).boxed().collect(Collectors.toList());

    // 1st half of pipeline
    DataStream<List<Integer>> pipeA = env.fromCollection(listOfNumbers)
        .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
        .trigger(trigger)
        .process(new Batcher());

    // 2nd half of pipeline
    DataStream<Integer> pipeB = AsyncDataStream.unorderedWait(pipeA, new AsyncUDF(), 1L, TimeUnit.DAYS, 1 )
        .flatMap((List<Integer> records, Collector<Integer> out) -> records.forEach(out::collect)).returns(Types.INT);
    pipeB.addSink(new CollectSink());

    env.execute();

    try {
      Thread.sleep(1000L * 3);
    } catch (InterruptedException e) {
      System.out.println();
    }
    miniCluster.after();

    assertEquals(inputSize, CollectSink.values.size());
  }


  public static class Batcher extends ProcessAllWindowFunction<Integer, List<Integer>, TimeWindow> {
    @Override
    public void process(Context context, Iterable<Integer> elements, Collector<List<Integer>> out) throws Exception {
      out.collect(StreamSupport.stream(elements.spliterator(), false).collect(Collectors.toList()));
    }
  }

  private static class AsyncUDF extends RichAsyncFunction<List<Integer>, List<Integer>> {

    private CompletableFuture<List<Integer>> doAsyncStuff(List<Integer> value) {
      return CompletableFuture.supplyAsync(() -> {
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        return value;
      });
    }

    @Override
    public void asyncInvoke(List<Integer> input, ResultFuture<List<Integer>> resultFuture) throws Exception {
      doAsyncStuff(input).thenAccept(stuff -> resultFuture.complete(Collections.singleton(stuff)));
    }

    @Override
    public void timeout(List<Integer> input, ResultFuture<List<Integer>> resultFuture) throws Exception {
      resultFuture.completeExceptionally(new RuntimeException("Timeout!"));
    }
  }

  // create a testing sink
  private static class CollectSink implements SinkFunction<Integer> {
    public static final List<Integer> values = new ArrayList<>();

    @Override
    public synchronized void invoke(Integer value) throws Exception {
      values.add(value);
    }
  }

}
Reply | Threaded
Open this post in threaded view
|

Re: MiniCluster with ProcessingTimeTrigger

Biao Liu
Hi John,

The root cause is the collection source exits too fast. The window would also exit without being triggered.

You could verify that by waiting a second before releasing the window. For example, insert a map operator between source and window operator. Blocking a second or more in the "close" method of this map operator. You will see the window would work well.

Thanks,
Biao /'bɪ.aʊ/



On Wed, 18 Dec 2019 at 06:24, John Morrow <[hidden email]> wrote:
Hi All,

I'm trying to test a pipeline that consists of two Flink tasks with a MiniCluster. The 1st task has a WindowAll operator which groups items into batches every second, and the 2nd task does an async operation with each batch and flatMaps the result.

I've whittled it down to the bare bones below. There are two tests:
  • testPipelineWithCountTrigger - this one works fine 🙂
  • testPipelineWithProcessingTimeTrigger - this one doesn't give any output 🙁

It seems like a timing issue. If I step through the failing one slowly I can see that the ProcessingTimeTrigger's onElement/onProcessingTime/clear methods do get called, and the asyncInvoke method also gets called, but when I run it the 2nd test fails as it produces no output. I've tried setting the MiniCluster timeout to 1 day, the same with my AsyncUDF timeout, and sleeping for 3 * window after env.execute but no difference. I'm running this with Flink 1.9.0 and OpenJDK8 on Ubuntu (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10).


Any idea how I can get the 2nd test to wait to process the output?


Thanks 🙂

John.






import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static org.junit.jupiter.api.Assertions.assertEquals;


public class StreamTest {

  @Test // :)
  @Tag("unit")
  public void testPipelineWithCountTrigger() throws Exception {
    runPipeline(10, CountTrigger.of(10));
  }

  @Test // :(
  @Tag("unit")
  public void testPipelineWithProcessingTimeTrigger() throws Exception {
    runPipeline(10, ProcessingTimeTrigger.create());
  }


  private void runPipeline(int inputSize, Trigger<Object, TimeWindow> trigger) throws Exception {

    MiniClusterWithClientResource miniCluster = new MiniClusterWithClientResource(
        new MiniClusterResourceConfiguration.Builder()
            .setNumberSlotsPerTaskManager(1)
            .setNumberTaskManagers(1)
            .setShutdownTimeout(org.apache.flink.api.common.time.Time.of(1, TimeUnit.DAYS))
            .build()
    );
    miniCluster.before();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    CollectSink.values.clear();

    List<Integer> listOfNumbers = IntStream.rangeClosed(1, inputSize).boxed().collect(Collectors.toList());

    // 1st half of pipeline
    DataStream<List<Integer>> pipeA = env.fromCollection(listOfNumbers)
        .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
        .trigger(trigger)
        .process(new Batcher());

    // 2nd half of pipeline
    DataStream<Integer> pipeB = AsyncDataStream.unorderedWait(pipeA, new AsyncUDF(), 1L, TimeUnit.DAYS, 1 )
        .flatMap((List<Integer> records, Collector<Integer> out) -> records.forEach(out::collect)).returns(Types.INT);
    pipeB.addSink(new CollectSink());

    env.execute();

    try {
      Thread.sleep(1000L * 3);
    } catch (InterruptedException e) {
      System.out.println();
    }
    miniCluster.after();

    assertEquals(inputSize, CollectSink.values.size());
  }


  public static class Batcher extends ProcessAllWindowFunction<Integer, List<Integer>, TimeWindow> {
    @Override
    public void process(Context context, Iterable<Integer> elements, Collector<List<Integer>> out) throws Exception {
      out.collect(StreamSupport.stream(elements.spliterator(), false).collect(Collectors.toList()));
    }
  }

  private static class AsyncUDF extends RichAsyncFunction<List<Integer>, List<Integer>> {

    private CompletableFuture<List<Integer>> doAsyncStuff(List<Integer> value) {
      return CompletableFuture.supplyAsync(() -> {
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        return value;
      });
    }

    @Override
    public void asyncInvoke(List<Integer> input, ResultFuture<List<Integer>> resultFuture) throws Exception {
      doAsyncStuff(input).thenAccept(stuff -> resultFuture.complete(Collections.singleton(stuff)));
    }

    @Override
    public void timeout(List<Integer> input, ResultFuture<List<Integer>> resultFuture) throws Exception {
      resultFuture.completeExceptionally(new RuntimeException("Timeout!"));
    }
  }

  // create a testing sink
  private static class CollectSink implements SinkFunction<Integer> {
    public static final List<Integer> values = new ArrayList<>();

    @Override
    public synchronized void invoke(Integer value) throws Exception {
      values.add(value);
    }
  }

}
Reply | Threaded
Open this post in threaded view
|

Re: MiniCluster with ProcessingTimeTrigger

John Morrow
Thanks Biao!

I tried slowing down the input stream by replacing the env.fromCollection() with a custom SourceFunction (below) which drip feeds the data a bit slower. By the way, in my real scenario the datasource for the pipeline will be a RabbitMQ source.


I do get better results, but it seems like a timing issue still exists:

  +-- StreamTest [OK]
  | +-- testPipelineWithProcessingTimeTrigger() 10480 ms [X] expected: <10> but was: <6>
  | '-- testPipelineWithCountTrigger() [OK]


I can probably play around with the window-size & sleep times below and get my tests to pass but I'm more concerned if there's a potential race condition/coordination step, outside of the MiniCluster test environment, that I should be dealing with.

My pipeline is broken into two parts: pipeA & pipeB. I've done this because the 2nd part has an async operator so it needs to be at the start of a chain. For a non-MiniCluster environment, would it be possible for records to flow through pipeA and not reach pipeB, as I'm seeing with the MiniCluster? i.e. is there's something I need to do to explicitly connect/sync pipeA & pipeB before calling env.execute(), besides the fact that:

pipeB = AsyncDataStream.unorderedWait(pipeA, ...


Thanks!
John.




public class StreamTest {

  private static class DripFeed extends RichSourceFunction<Integer> {

    private volatile boolean isRunning = false;
    private final int inputSize;

    public DripFeed(int inputSize) {
      this.inputSize = inputSize;
    }

    @Override
    public void open(Configuration parameters) {
      isRunning = true;
    }

    @Override
    public void run(SourceContext<Integer> ctx) throws Exception {
      List<Integer> listOfNumbers = IntStream.rangeClosed(1, inputSize).boxed().collect(Collectors.toList());
      Iterator<Integer> iterator = listOfNumbers.iterator();
      while (isRunning && iterator.hasNext()) {
        try {
          Thread.sleep(100L);
        } catch (InterruptedException e) {
          System.out.println();
        }
        ctx.collect(iterator.next());
      }
      try {
        Thread.sleep(1000L);
      } catch (InterruptedException e) {
        System.out.println();
      }
    }

    @Override
    public void cancel() {
      isRunning = false;
    }

  }

  @Test // :)
  @Tag("unit")
  public void testPipelineWithCountTrigger() throws Exception {
    runPipeline(10, CountTrigger.of(10));
  }

  @Test // :(
  @Tag("unit")
  public void testPipelineWithProcessingTimeTrigger() throws Exception {
    runPipeline(10, ProcessingTimeTrigger.create());
  }


  private void runPipeline(int inputSize, Trigger<Object, TimeWindow> trigger) throws Exception {

    MiniClusterWithClientResource miniCluster = new MiniClusterWithClientResource(
        new MiniClusterResourceConfiguration.Builder()
            .setNumberSlotsPerTaskManager(1)
            .setNumberTaskManagers(1)
            .setShutdownTimeout(org.apache.flink.api.common.time.Time.of(1, TimeUnit.DAYS))
            .build()
    );
    miniCluster.before();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    CollectSink.values.clear();

    List<Integer> listOfNumbers = IntStream.rangeClosed(1, inputSize).boxed().collect(Collectors.toList());

    // 1st half of pipeline
    //DataStream<List<Integer>> pipeA = env.fromCollection(listOfNumbers)
    DataStream<List<Integer>> pipeA = env.addSource(new StreamTest.DripFeed(inputSize))
    .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)))

    ...(same as before...)




From: Biao Liu <[hidden email]>
Sent: Tuesday 17 December 2019 21:50
To: John Morrow <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: MiniCluster with ProcessingTimeTrigger
 
Hi John,

The root cause is the collection source exits too fast. The window would also exit without being triggered.

You could verify that by waiting a second before releasing the window. For example, insert a map operator between source and window operator. Blocking a second or more in the "close" method of this map operator. You will see the window would work well.

Thanks,
Biao /'bɪ.aʊ/



On Wed, 18 Dec 2019 at 06:24, John Morrow <[hidden email]> wrote:
Hi All,

I'm trying to test a pipeline that consists of two Flink tasks with a MiniCluster. The 1st task has a WindowAll operator which groups items into batches every second, and the 2nd task does an async operation with each batch and flatMaps the result.

I've whittled it down to the bare bones below. There are two tests:
  • testPipelineWithCountTrigger - this one works fine 🙂
  • testPipelineWithProcessingTimeTrigger - this one doesn't give any output 🙁

It seems like a timing issue. If I step through the failing one slowly I can see that the ProcessingTimeTrigger's onElement/onProcessingTime/clear methods do get called, and the asyncInvoke method also gets called, but when I run it the 2nd test fails as it produces no output. I've tried setting the MiniCluster timeout to 1 day, the same with my AsyncUDF timeout, and sleeping for 3 * window after env.execute but no difference. I'm running this with Flink 1.9.0 and OpenJDK8 on Ubuntu (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10).


Any idea how I can get the 2nd test to wait to process the output?


Thanks 🙂

John.






import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static org.junit.jupiter.api.Assertions.assertEquals;


public class StreamTest {

  @Test // :)
  @Tag("unit")
  public void testPipelineWithCountTrigger() throws Exception {
    runPipeline(10, CountTrigger.of(10));
  }

  @Test // :(
  @Tag("unit")
  public void testPipelineWithProcessingTimeTrigger() throws Exception {
    runPipeline(10, ProcessingTimeTrigger.create());
  }


  private void runPipeline(int inputSize, Trigger<Object, TimeWindow> trigger) throws Exception {

    MiniClusterWithClientResource miniCluster = new MiniClusterWithClientResource(
        new MiniClusterResourceConfiguration.Builder()
            .setNumberSlotsPerTaskManager(1)
            .setNumberTaskManagers(1)
            .setShutdownTimeout(org.apache.flink.api.common.time.Time.of(1, TimeUnit.DAYS))
            .build()
    );
    miniCluster.before();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    CollectSink.values.clear();

    List<Integer> listOfNumbers = IntStream.rangeClosed(1, inputSize).boxed().collect(Collectors.toList());

    // 1st half of pipeline
    DataStream<List<Integer>> pipeA = env.fromCollection(listOfNumbers)
        .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
        .trigger(trigger)
        .process(new Batcher());

    // 2nd half of pipeline
    DataStream<Integer> pipeB = AsyncDataStream.unorderedWait(pipeA, new AsyncUDF(), 1L, TimeUnit.DAYS, 1 )
        .flatMap((List<Integer> records, Collector<Integer> out) -> records.forEach(out::collect)).returns(Types.INT);
    pipeB.addSink(new CollectSink());

    env.execute();

    try {
      Thread.sleep(1000L * 3);
    } catch (InterruptedException e) {
      System.out.println();
    }
    miniCluster.after();

    assertEquals(inputSize, CollectSink.values.size());
  }


  public static class Batcher extends ProcessAllWindowFunction<Integer, List<Integer>, TimeWindow> {
    @Override
    public void process(Context context, Iterable<Integer> elements, Collector<List<Integer>> out) throws Exception {
      out.collect(StreamSupport.stream(elements.spliterator(), false).collect(Collectors.toList()));
    }
  }

  private static class AsyncUDF extends RichAsyncFunction<List<Integer>, List<Integer>> {

    private CompletableFuture<List<Integer>> doAsyncStuff(List<Integer> value) {
      return CompletableFuture.supplyAsync(() -> {
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        return value;
      });
    }

    @Override
    public void asyncInvoke(List<Integer> input, ResultFuture<List<Integer>> resultFuture) throws Exception {
      doAsyncStuff(input).thenAccept(stuff -> resultFuture.complete(Collections.singleton(stuff)));
    }

    @Override
    public void timeout(List<Integer> input, ResultFuture<List<Integer>> resultFuture) throws Exception {
      resultFuture.completeExceptionally(new RuntimeException("Timeout!"));
    }
  }

  // create a testing sink
  private static class CollectSink implements SinkFunction<Integer> {
    public static final List<Integer> values = new ArrayList<>();

    @Override
    public synchronized void invoke(Integer value) throws Exception {
      values.add(value);
    }
  }

}
Reply | Threaded
Open this post in threaded view
|

Re: MiniCluster with ProcessingTimeTrigger

Biao Liu
Hi John,

The critical issue of your test case is that it's a finite streaming job. The mini cluster or distributed cluster does not matter.

When the job is finishing, there are some windows not triggered yet. The current behavior is dropping these windows. It's acceptable from the perspective of window semantics. Because the condition (count/time based) is not fulfilled, the window should not be triggered.

Normally we use window in infinite (or long running) streaming job. That means there should be rare scenarios of finishing or stopping. Even if the job is finished or stopped, the windows not triggered would also be dropped. BUT, they could be replayed if the job recovers from a checkpoint or savepoint.

Let's get back to your test case. If you want the correct result, you have to fulfill the trigger condition of window. If the condition is processing time based with 2 seconds. You have to guarantee the window could run at-least 2 seconds. However it's not a good idea to design a time based test case. I would suggest to design the case in the way: the source would not exit until the window is triggered. The source could block on some signal (CountDownLatch?) which would be triggered after window has been triggered.

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Dec 2019 at 06:06, John Morrow <[hidden email]> wrote:
Thanks Biao!

I tried slowing down the input stream by replacing the env.fromCollection() with a custom SourceFunction (below) which drip feeds the data a bit slower. By the way, in my real scenario the datasource for the pipeline will be a RabbitMQ source.


I do get better results, but it seems like a timing issue still exists:

  +-- StreamTest [OK]
  | +-- testPipelineWithProcessingTimeTrigger() 10480 ms [X] expected: <10> but was: <6>
  | '-- testPipelineWithCountTrigger() [OK]


I can probably play around with the window-size & sleep times below and get my tests to pass but I'm more concerned if there's a potential race condition/coordination step, outside of the MiniCluster test environment, that I should be dealing with.

My pipeline is broken into two parts: pipeA & pipeB. I've done this because the 2nd part has an async operator so it needs to be at the start of a chain. For a non-MiniCluster environment, would it be possible for records to flow through pipeA and not reach pipeB, as I'm seeing with the MiniCluster? i.e. is there's something I need to do to explicitly connect/sync pipeA & pipeB before calling env.execute(), besides the fact that:

pipeB = AsyncDataStream.unorderedWait(pipeA, ...


Thanks!
John.




public class StreamTest {

  private static class DripFeed extends RichSourceFunction<Integer> {

    private volatile boolean isRunning = false;
    private final int inputSize;

    public DripFeed(int inputSize) {
      this.inputSize = inputSize;
    }

    @Override
    public void open(Configuration parameters) {
      isRunning = true;
    }

    @Override
    public void run(SourceContext<Integer> ctx) throws Exception {
      List<Integer> listOfNumbers = IntStream.rangeClosed(1, inputSize).boxed().collect(Collectors.toList());
      Iterator<Integer> iterator = listOfNumbers.iterator();
      while (isRunning && iterator.hasNext()) {
        try {
          Thread.sleep(100L);
        } catch (InterruptedException e) {
          System.out.println();
        }
        ctx.collect(iterator.next());
      }
      try {
        Thread.sleep(1000L);
      } catch (InterruptedException e) {
        System.out.println();
      }
    }

    @Override
    public void cancel() {
      isRunning = false;
    }

  }

  @Test // :)
  @Tag("unit")
  public void testPipelineWithCountTrigger() throws Exception {
    runPipeline(10, CountTrigger.of(10));
  }

  @Test // :(
  @Tag("unit")
  public void testPipelineWithProcessingTimeTrigger() throws Exception {
    runPipeline(10, ProcessingTimeTrigger.create());
  }


  private void runPipeline(int inputSize, Trigger<Object, TimeWindow> trigger) throws Exception {

    MiniClusterWithClientResource miniCluster = new MiniClusterWithClientResource(
        new MiniClusterResourceConfiguration.Builder()
            .setNumberSlotsPerTaskManager(1)
            .setNumberTaskManagers(1)
            .setShutdownTimeout(org.apache.flink.api.common.time.Time.of(1, TimeUnit.DAYS))
            .build()
    );
    miniCluster.before();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    CollectSink.values.clear();

    List<Integer> listOfNumbers = IntStream.rangeClosed(1, inputSize).boxed().collect(Collectors.toList());

    // 1st half of pipeline
    //DataStream<List<Integer>> pipeA = env.fromCollection(listOfNumbers)
    DataStream<List<Integer>> pipeA = env.addSource(new StreamTest.DripFeed(inputSize))
    .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)))

    ...(same as before...)




From: Biao Liu <[hidden email]>
Sent: Tuesday 17 December 2019 21:50
To: John Morrow <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: MiniCluster with ProcessingTimeTrigger
 
Hi John,

The root cause is the collection source exits too fast. The window would also exit without being triggered.

You could verify that by waiting a second before releasing the window. For example, insert a map operator between source and window operator. Blocking a second or more in the "close" method of this map operator. You will see the window would work well.

Thanks,
Biao /'bɪ.aʊ/



On Wed, 18 Dec 2019 at 06:24, John Morrow <[hidden email]> wrote:
Hi All,

I'm trying to test a pipeline that consists of two Flink tasks with a MiniCluster. The 1st task has a WindowAll operator which groups items into batches every second, and the 2nd task does an async operation with each batch and flatMaps the result.

I've whittled it down to the bare bones below. There are two tests:
  • testPipelineWithCountTrigger - this one works fine 🙂
  • testPipelineWithProcessingTimeTrigger - this one doesn't give any output 🙁

It seems like a timing issue. If I step through the failing one slowly I can see that the ProcessingTimeTrigger's onElement/onProcessingTime/clear methods do get called, and the asyncInvoke method also gets called, but when I run it the 2nd test fails as it produces no output. I've tried setting the MiniCluster timeout to 1 day, the same with my AsyncUDF timeout, and sleeping for 3 * window after env.execute but no difference. I'm running this with Flink 1.9.0 and OpenJDK8 on Ubuntu (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10).


Any idea how I can get the 2nd test to wait to process the output?


Thanks 🙂

John.






import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static org.junit.jupiter.api.Assertions.assertEquals;


public class StreamTest {

  @Test // :)
  @Tag("unit")
  public void testPipelineWithCountTrigger() throws Exception {
    runPipeline(10, CountTrigger.of(10));
  }

  @Test // :(
  @Tag("unit")
  public void testPipelineWithProcessingTimeTrigger() throws Exception {
    runPipeline(10, ProcessingTimeTrigger.create());
  }


  private void runPipeline(int inputSize, Trigger<Object, TimeWindow> trigger) throws Exception {

    MiniClusterWithClientResource miniCluster = new MiniClusterWithClientResource(
        new MiniClusterResourceConfiguration.Builder()
            .setNumberSlotsPerTaskManager(1)
            .setNumberTaskManagers(1)
            .setShutdownTimeout(org.apache.flink.api.common.time.Time.of(1, TimeUnit.DAYS))
            .build()
    );
    miniCluster.before();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    CollectSink.values.clear();

    List<Integer> listOfNumbers = IntStream.rangeClosed(1, inputSize).boxed().collect(Collectors.toList());

    // 1st half of pipeline
    DataStream<List<Integer>> pipeA = env.fromCollection(listOfNumbers)
        .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
        .trigger(trigger)
        .process(new Batcher());

    // 2nd half of pipeline
    DataStream<Integer> pipeB = AsyncDataStream.unorderedWait(pipeA, new AsyncUDF(), 1L, TimeUnit.DAYS, 1 )
        .flatMap((List<Integer> records, Collector<Integer> out) -> records.forEach(out::collect)).returns(Types.INT);
    pipeB.addSink(new CollectSink());

    env.execute();

    try {
      Thread.sleep(1000L * 3);
    } catch (InterruptedException e) {
      System.out.println();
    }
    miniCluster.after();

    assertEquals(inputSize, CollectSink.values.size());
  }


  public static class Batcher extends ProcessAllWindowFunction<Integer, List<Integer>, TimeWindow> {
    @Override
    public void process(Context context, Iterable<Integer> elements, Collector<List<Integer>> out) throws Exception {
      out.collect(StreamSupport.stream(elements.spliterator(), false).collect(Collectors.toList()));
    }
  }

  private static class AsyncUDF extends RichAsyncFunction<List<Integer>, List<Integer>> {

    private CompletableFuture<List<Integer>> doAsyncStuff(List<Integer> value) {
      return CompletableFuture.supplyAsync(() -> {
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        return value;
      });
    }

    @Override
    public void asyncInvoke(List<Integer> input, ResultFuture<List<Integer>> resultFuture) throws Exception {
      doAsyncStuff(input).thenAccept(stuff -> resultFuture.complete(Collections.singleton(stuff)));
    }

    @Override
    public void timeout(List<Integer> input, ResultFuture<List<Integer>> resultFuture) throws Exception {
      resultFuture.completeExceptionally(new RuntimeException("Timeout!"));
    }
  }

  // create a testing sink
  private static class CollectSink implements SinkFunction<Integer> {
    public static final List<Integer> values = new ArrayList<>();

    @Override
    public synchronized void invoke(Integer value) throws Exception {
      values.add(value);
    }
  }

}