Unit testing filter function in flink

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Unit testing filter function in flink

Vishwas Siravara
Hi guys,
I want to test a function like : 
private[flink] def filterStream(dataStream: DataStream[GenericRecord]): DataStream[GenericRecord] = {
dataStream.filter(new FilterFunction[GenericRecord] {
override def filter(value: GenericRecord): Boolean = {
if (value == null || value.get(StipFields.requestMessageType) == null) {
return false;
} else {
ExecutionEnv.messageTypeList.contains(value.get(StipFields.requestMessageType)
.toString) && ExecutionEnv.pcrList.contains(value.get(StipFields.pcrList).toString) && (value.get(StipFields
.rejectCode).asInstanceOf[Int] == 0) && !(value.get(StipFields.processingCode).toString.equals("33"))
}
}
})
}
How can I do this ? 

Best,
Vishwas 

Reply | Threaded
Open this post in threaded view
|

Re: Unit testing filter function in flink

vino yang
Hi Vishwas,

Apache Flink provides some test harness to test your application code on multiple levels of the testing pyramid.

You can use them to test your UDF. Please see more examples offered by the official documentation[1].

Best,
Vino


Vishwas Siravara <[hidden email]> 于2019年12月20日周五 上午6:27写道:
Hi guys,
I want to test a function like : 
private[flink] def filterStream(dataStream: DataStream[GenericRecord]): DataStream[GenericRecord] = {
dataStream.filter(new FilterFunction[GenericRecord] {
override def filter(value: GenericRecord): Boolean = {
if (value == null || value.get(StipFields.requestMessageType) == null) {
return false;
} else {
ExecutionEnv.messageTypeList.contains(value.get(StipFields.requestMessageType)
.toString) && ExecutionEnv.pcrList.contains(value.get(StipFields.pcrList).toString) && (value.get(StipFields
.rejectCode).asInstanceOf[Int] == 0) && !(value.get(StipFields.processingCode).toString.equals("33"))
}
}
})
}
How can I do this ? 

Best,
Vishwas