What is the best way to run unit tests on streams that contain ProcessTimeWindows?
Example:
def bufferDataStreamByProcessId(ds: DataStream[MaalkaRecord]): DataStream[MaalkaRecord] = {
ds.map { r =>
println(s"data in: $r") // Data shows up here
r
}.keyBy { mr =>
val r = mr.asInstanceOf[MaalkaDataRecord]
r.dataProcessingId -> r.isBefore
}
.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(10)))
.reduce { (_, r2) =>
println(s"Reducing: r2") // Data does NOT show up here
r2
}
.map { r =>
println(s"Emitted: $r") // Data does NOT show up here
r
}
}
This stream completes about 100ms after the first element is received by the ProcessingTimeSessionWindow but no data is emitted from the sessionWindow.
If i change the window to use a TumblingProcessTimeWindow.of(Time.milliseconds(1)) some of windows do emit, but the many of the expected values are missing.
Any ideas?
Cheers,
Clay