i Incurred in situation and i need you help.
first my timeWindow is set for 15sec, but the assertyEquals doesnt wait for the window getting the answer,
so everything is telling me index 0 out of bounds (cuze its didnt get to place my object in the list yet)
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.util.AbstractTestBase;
import org.com.CameraEvent;
import org.com.StreamingJob;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class IntergrationTest extends AbstractTestBase {
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure your test environment
env.setParallelism(1);
// values are collected in a static variable
CollectSink.values.clear();
LinkedList<CameraEvent> events = GenerateEvents();
env.fromCollection(events)
.keyBy(new StreamingJob.GetKey())
.timeWindow(Time.seconds(10))
.minBy("dateTime")
.addSink(new CollectSink());
env.execute("lala");
assertEquals(events.get(1), CollectSink.values.get(0));
}
private static LinkedList<CameraEvent> GenerateEvents() {
LinkedList<CameraEvent> linkedList;
CameraEvent cameraEvent;
linkedList = new LinkedList<>();
for (int i = 0; i < 2; i++) {
cameraEvent = new CameraEvent("123-123-12", 1, new Date(), "OUT", "CAR");
linkedList.add(cameraEvent);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return linkedList;
}
private static class CollectSink implements SinkFunction<CameraEvent> {
// must be static
public static final List<CameraEvent> values = new ArrayList<>();
@Override
public synchronized void invoke(CameraEvent value) throws Exception {
values.add(value);
}
}
}