Unit testing filter function in flink
Posted by
Vishwas Siravara on
Dec 19, 2019; 10:26pm
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Unit-testing-filter-function-in-flink-tp31778.html
Hi guys,
I want to test a function like :
private[flink] def filterStream(dataStream: DataStream[GenericRecord]): DataStream[GenericRecord] = {
dataStream.filter(new FilterFunction[GenericRecord] {
override def filter(value: GenericRecord): Boolean = {
if (value == null || value.get(StipFields.requestMessageType) == null) {
return false;
} else {
ExecutionEnv.messageTypeList.contains(value.get(StipFields.requestMessageType)
.toString) && ExecutionEnv.pcrList.contains(value.get(StipFields.pcrList).toString) && (value.get(StipFields
.rejectCode).asInstanceOf[Int] == 0) && !(value.get(StipFields.processingCode).toString.equals("33"))
}
}
})
}
How can I do this ?
Best,
Vishwas