Hi Julio,
I tried to reproduce your problem locally but everything run correctly. Could you share a little example job with us?
This worked for me:
class TestingClass { var hello: Int = 0 } class TestA extends TestingClass { var test: String = _ } def main(args: Array[String]) { // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment // get input data val text = env.fromElements(WordCountData.WORDS: _*) val outputTag = OutputTag[(String, Int)]("side-output") val outputTag2 = OutputTag[TestingClass]("side- output2" ) val counts: DataStream[(String, Int)] = text // split up the lines in pairs (2-tuples) containing: (word,1) .flatMap(_.toLowerCase.split("\\ W+")) .filter(_.nonEmpty) .map((_, 1)) // group by the tuple field "0" and sum up tuple field "1" .keyBy(0) .sum(1) .process(new ProcessFunction[(String, Int), (String, Int)] { override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = { ctx.output(outputTag, value) ctx.output(outputTag2, new TestingClass) ctx.output(outputTag2, new TestA) } }) counts.getSideOutput(outputTag).print() counts.getSideOutput( outputTag2).print() // execute program env.execute("Streaming WordCount") }
Are the Metric classes proper POJO types?
Regards,
Timo
Am 02.04.18 um 21:53 schrieb Julio Biason:
Any ideas what I need to do for side outputs to be actually captured?And here is the problem: It seems .getSideOutput() is never actually getting the side output because a the logger in AccoutingSink.toRow() is never happening and the data is not showing on our database (toRow() convers the Metric to a Row and accountingSInk.output returns the JDBCOutputFormat).To select a side output based on type, I did the following:So far, things were working with splits, but it seems using splits with side outputs (for the late data, which we'll plug a late arrival handling) causes errors, so I changed everything to side outputs.Hey guys,I have a pipeline that generates two different types of data (but both use the same trait) and I need to save each on a different sink.
class MetricTypeSplitter(accountingTag:OutputTag[ Metric], analysingTag:OutputTag[Metric] ) extends ProcessFunction[Metric, Metric] {
val logger = LoggerFactory.getLogger(this.getClass)
override def processElement(
value:Metric,
ctx:ProcessFunction[Metric, Metric]#Context,
out:Collector[Metric]
): Unit = {
out.collect(value)
value match {
case record:AccountingMetric => {
logger.info(s"Sending ${record} to Accounting")
ctx.output(accountingTag, record)
}
case record:AnalysingMetric => {
logger.info(s"Sending ${record} to Analysis")
ctx.output(analysingTag, record)
}
case _ => {
logger.error(s"Don't know the type of ${value}")
}
}
}
}
And at the end of the pipeline I add the splitter:So far, this works and I can see the logs of which tag each metric in being sent being generated. The second part, in which I capture the side output and send the data to sink, doesn't seem to work, though:
pipeline
.process(new MetricTypeSplitter(accountTag, analysisTag))
pipeline
.getSideOutput(accountTag)
.map { tuple => AccountingSink.toRow(tuple) }.name("Accounting rows")
.writeUsingOutputFormat(accountingSink.output).name(s" ${accountingSink}")
--
Julio Biason, Sofware Engineer
AZION | Deliver. Accelerate. Protect.Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101 | Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
Free forum by Nabble | Edit this page |