Hey guys, I have a pipeline that generates two different types of data (but both use the same trait) and I need to save each on a different sink.class MetricTypeSplitter(accountingTag:OutputTag[Metric], analysingTag:OutputTag[Metric]) extends ProcessFunction[Metric, Metric] { val logger = LoggerFactory.getLogger(this.getClass) override def processElement( value:Metric, ctx:ProcessFunction[Metric, Metric]#Context, out:Collector[Metric] ): Unit = { out.collect(value) value match { case record:AccountingMetric => { logger.info(s"Sending ${record} to Accounting") ctx.output(accountingTag, record) } case record:AnalysingMetric => { logger.info(s"Sending ${record} to Analysis") ctx.output(analysingTag, record) } case _ => { logger.error(s"Don't know the type of ${value}") } } } } And at the end of the pipeline I add the splitter: So far, this works and I can see the logs of which tag each metric in being sent being generated. The second part, in which I capture the side output and send the data to sink, doesn't seem to work, though:pipeline .process(new MetricTypeSplitter(accountTag, analysisTag)) pipeline .getSideOutput(accountTag) .map { tuple => AccountingSink.toRow(tuple) }.name("Accounting rows") .writeUsingOutputFormat(accountingSink.output).name(s"${accountingSink}") -- Julio Biason, Sofware Engineer AZION | Deliver. Accelerate. Protect. Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101 | Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554 |
Hi Julio,
I tried to reproduce your problem locally but everything run correctly. Could you share a little example job with us? This worked for me: class TestingClass { var hello: Int = 0 } class TestA extends TestingClass { var test: String = _ } def main(args: Array[String]) { // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment // get input data val text = env.fromElements(WordCountData.WORDS: _*) val outputTag = OutputTag[(String, Int)]("side-output") val outputTag2 = OutputTag[TestingClass]("side-output2") val counts: DataStream[(String, Int)] = text // split up the lines in pairs (2-tuples) containing: (word,1) .flatMap(_.toLowerCase.split("\\W+")) .filter(_.nonEmpty) .map((_, 1)) // group by the tuple field "0" and sum up tuple field "1" .keyBy(0) .sum(1) .process(new ProcessFunction[(String, Int), (String, Int)] { override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = { ctx.output(outputTag, value) ctx.output(outputTag2, new TestingClass) ctx.output(outputTag2, new TestA) } }) counts.getSideOutput(outputTag).print() counts.getSideOutput(outputTag2).print() // execute program env.execute("Streaming WordCount") } Are the Metric classes proper POJO types? Regards, Timo Am 02.04.18 um 21:53 schrieb Julio Biason:
|
Hey Timo, To be completely honest, I _think_ they are POJO, although I use case classes (because I want our data to be immutable).On Tue, Apr 3, 2018 at 10:17 AM, Timo Walther <[hidden email]> wrote:
-- Julio Biason, Sofware Engineer AZION | Deliver. Accelerate. Protect. Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101 | Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554 |
Hi Julio,
thanks for this great example. I could reproduce it on my machine and I could find the problem. You need to store the newly created branch of your pipeline in some variable like `val test = pipeline.process()` in order to access the side outputs via `test.getSideOutput(outputSimple)`. Right now your program expects a a side output from the wrong operator (namely the window operation). Regards, Timo Am 04.04.18 um 16:35 schrieb Julio Biason:
|
Free forum by Nabble | Edit this page |