Hey guys,
I have a pipeline that generates two different types of data (but both use the same trait) and I need to save each on a different sink.
So far, things were working with splits, but it seems using splits with side outputs (for the late data, which we'll plug a late arrival handling) causes errors, so I changed everything to side outputs.
To select a side output based on type, I did the following:
class MetricTypeSplitter(accountingTag:OutputTag[Metric], analysingTag:OutputTag[Metric]) extends ProcessFunction[Metric, Metric] {
val logger = LoggerFactory.getLogger(this.getClass)
override def processElement(
value:Metric,
ctx:ProcessFunction[Metric, Metric]#Context,
out:Collector[Metric]
): Unit = {
out.collect(value)
value match {
case record:AccountingMetric => {
logger.info(s"Sending ${record} to Accounting")
ctx.output(accountingTag, record)
}
case record:AnalysingMetric => {
logger.info(s"Sending ${record} to Analysis")
ctx.output(analysingTag, record)
}
case _ => {
logger.error(s"Don't know the type of ${value}")
}
}
}
}
So far, this works and I can see the logs of which tag each metric in being sent being generated. The second part, in which I capture the side output and send the data to sink, doesn't seem to work, though:
.writeUsingOutputFormat(accountingSink.output).name(s"${accountingSink}")