UnitTests and ProcessTimeWindows - Missing results

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

UnitTests and ProcessTimeWindows - Missing results

Clay Teeter
What is the best way to run unit tests on streams that contain ProcessTimeWindows? 

Example:
def bufferDataStreamByProcessId(ds: DataStream[MaalkaRecord]): DataStream[MaalkaRecord] = {
ds.map { r =>
println(s"data in: $r") // Data shows up here
r
}.keyBy { mr =>
val r = mr.asInstanceOf[MaalkaDataRecord]
r.dataProcessingId -> r.isBefore
}
.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(
10)))
.reduce { (_
, r2) =>
println(s"Reducing: r2") // Data does NOT show up here
r2
}
.map { r =>
println(s"Emitted: $r") // Data does NOT show up here
r
}
}

This stream completes about 100ms after the first element is received by the ProcessingTimeSessionWindow but no data is emitted from the sessionWindow.

If i change the window to use a TumblingProcessTimeWindow.of(Time.milliseconds(1)) some of windows do emit, but the many of the expected values are missing.

Any ideas? 

Cheers,
Clay