How to test window

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to test window

עדן שרקון
Hey guys,
i Incurred in situation and i need you help.

im trying Using unit test inorder to check my results,
first my  timeWindow is set for 15sec, but the assertyEquals doesnt wait for the window getting the answer,

so everything is telling me index 0 out of bounds (cuze its didnt get to place my object in the list yet)

thank you all!

 import org.apache.flink.annotation.Public;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.util.AbstractTestBase;
import org.com.CameraEvent;
import org.com.StreamingJob;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;

import static org.junit.Assert.assertEquals;

public class IntergrationTest extends AbstractTestBase {



@Test
public void test() throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// configure your test environment
env.setParallelism(1);

// values are collected in a static variable
CollectSink.values.clear();
LinkedList<CameraEvent> events = GenerateEvents();
env.fromCollection(events)
.keyBy(new StreamingJob.GetKey())
.timeWindow(Time.seconds(10))
.minBy("dateTime")
.addSink(new CollectSink());

env.execute("lala");
assertEquals(events.get(1), CollectSink.values.get(0));
}

private static LinkedList<CameraEvent> GenerateEvents() {
LinkedList<CameraEvent> linkedList;
CameraEvent cameraEvent;
linkedList = new LinkedList<>();
for (int i = 0; i < 2; i++) {

cameraEvent = new CameraEvent("123-123-12", 1, new Date(), "OUT", "CAR");

linkedList.add(cameraEvent);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return linkedList;
}
private static class CollectSink implements SinkFunction<CameraEvent> {

// must be static
public static final List<CameraEvent> values = new ArrayList<>();

@Override
public synchronized void invoke(CameraEvent value) throws Exception {
values.add(value);
}
}


}


Reply | Threaded
Open this post in threaded view
|

Re: How to test window

Chesnay Schepler
Since you define a 15 second window you have to ensure that your source generates at least 15 seconds worth of data; otherwise the window will never fire.
Since you do not use event-time your source has to actually run for at least 15 seconds; for this case collection sources will simply not work. You need a custom SourceFunction that emits your data over a 15 + x seconds period.

On 20.12.2018 15:12, עדן שרקון wrote:
Hey guys,
i Incurred in situation and i need you help.

im trying Using unit test inorder to check my results,
first my  timeWindow is set for 15sec, but the assertyEquals doesnt wait for the window getting the answer,

so everything is telling me index 0 out of bounds (cuze its didnt get to place my object in the list yet)

thank you all!

 import org.apache.flink.annotation.Public;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.util.AbstractTestBase;
import org.com.CameraEvent;
import org.com.StreamingJob;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class IntergrationTest extends AbstractTestBase {
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure your test environment
env.setParallelism(1);
// values are collected in a static variable
CollectSink.values.clear();
LinkedList<CameraEvent> events = GenerateEvents();
env.fromCollection(events)
.keyBy(new StreamingJob.GetKey())
.timeWindow(Time.seconds(10))
.minBy("dateTime")
.addSink(new CollectSink());
env.execute("lala");
assertEquals(events.get(1), CollectSink.values.get(0));
}
private static LinkedList<CameraEvent> GenerateEvents() {
LinkedList<CameraEvent> linkedList;
CameraEvent cameraEvent;
linkedList = new LinkedList<>();
for (int i = 0; i < 2; i++) {
cameraEvent = new CameraEvent("123-123-12", 1, new Date(), "OUT", "CAR");
linkedList.add(cameraEvent);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return linkedList;
}
private static class CollectSink implements SinkFunction<CameraEvent> {
// must be static
public static final List<CameraEvent> values = new ArrayList<>();
@Override
public synchronized void invoke(CameraEvent value) throws Exception {
values.add(value);
}
}
}