Hi guys,
I want to test a function like :
private[flink] def filterStream(dataStream: DataStream[GenericRecord]): DataStream[GenericRecord] = {
dataStream.filter(new FilterFunction[GenericRecord] {
override def filter(value: GenericRecord): Boolean = {
if (value == null || value.get(StipFields.requestMessageType) == null) {
return false;
} else {
ExecutionEnv.messageTypeList.contains(value.get(StipFields.requestMessageType)
.toString) && ExecutionEnv.pcrList.contains(value.get(StipFields.pcrList).toString) && (value.get(StipFields
.rejectCode).asInstanceOf[Int] == 0) && !(value.get(StipFields.processingCode).toString.equals("33"))
}
}
})
}
How can I do this ?
Best,
Vishwas