Side outputs never getting consumed

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Side outputs never getting consumed

Julio Biason
Hey guys,

I have a pipeline that generates two different types of data (but both use the same trait) and I need to save each on a different sink.

So far, things were working with splits, but it seems using splits with side outputs (for the late data, which we'll plug a late arrival handling) causes errors, so I changed everything to side outputs.

To select a side output based on type, I did the following:

class MetricTypeSplitter(accountingTag:OutputTag[Metric], analysingTag:OutputTag[Metric]) extends ProcessFunction[Metric, Metric] {

  val logger = LoggerFactory.getLogger(this.getClass)

  override def processElement(
    value:Metric,
    ctx:ProcessFunction[Metric, Metric]#Context,
    out:Collector[Metric]
  ): Unit = {
    out.collect(value)
    value match {
      case record:AccountingMetric => {
        logger.info(s"Sending ${record} to Accounting")
        ctx.output(accountingTag, record)
      }
      case record:AnalysingMetric => {
        logger.info(s"Sending ${record} to Analysis")
        ctx.output(analysingTag, record)
      }
      case _ => {
        logger.error(s"Don't know the type of ${value}")
      }
    }
  }
}

And at the end of the pipeline I add the splitter:

    pipeline
      .process(new MetricTypeSplitter(accountTag, analysisTag))

So far, this works and I can see the logs of which tag each metric in being sent being generated. The second part, in which I capture the side output and send the data to sink, doesn't seem to work, though:

    pipeline
      .getSideOutput(accountTag)
      .map { tuple => AccountingSink.toRow(tuple) }.name("Accounting rows")
      .writeUsingOutputFormat(accountingSink.output).name(s"${accountingSink}")
                                                                                                                                                       
And here is the problem: It seems .getSideOutput() is never actually getting the side output because a the logger in AccoutingSink.toRow() is never happening and the data is not showing on our database (toRow() convers the Metric to a Row and accountingSInk.output returns the JDBCOutputFormat).

Any ideas what I need to do for side outputs to be actually captured?

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
Reply | Threaded
Open this post in threaded view
|

Re: Side outputs never getting consumed

Timo Walther
Hi Julio,

I tried to reproduce your problem locally but everything run correctly. Could you share a little example job with us?

This worked for me:

class TestingClass {
  var hello: Int = 0
}

class TestA extends TestingClass {
  var test: String = _
}

def main(args: Array[String]) {

  // set up the execution environment
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  // get input data
  val text = env.fromElements(WordCountData.WORDS: _*)

  val outputTag = OutputTag[(String, Int)]("side-output")
  val outputTag2 = OutputTag[TestingClass]("side-output2")

  val counts: DataStream[(String, Int)] = text
    // split up the lines in pairs (2-tuples) containing: (word,1)
    .flatMap(_.toLowerCase.split("\\W+"))
    .filter(_.nonEmpty)
    .map((_, 1))
    // group by the tuple field "0" and sum up tuple field "1"
    .keyBy(0)
    .sum(1)
      .process(new ProcessFunction[(String, Int), (String, Int)] {
        override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = {
          ctx.output(outputTag, value)
          ctx.output(outputTag2, new TestingClass)
          ctx.output(outputTag2, new TestA)
        }
      })

  counts.getSideOutput(outputTag).print()
  counts.getSideOutput(outputTag2).print()

  // execute program
  env.execute("Streaming WordCount")
}

Are the Metric classes proper POJO types?

Regards,
Timo


Am 02.04.18 um 21:53 schrieb Julio Biason:
Hey guys,

I have a pipeline that generates two different types of data (but both use the same trait) and I need to save each on a different sink.

So far, things were working with splits, but it seems using splits with side outputs (for the late data, which we'll plug a late arrival handling) causes errors, so I changed everything to side outputs.

To select a side output based on type, I did the following:

class MetricTypeSplitter(accountingTag:OutputTag[Metric], analysingTag:OutputTag[Metric]) extends ProcessFunction[Metric, Metric] {

  val logger = LoggerFactory.getLogger(this.getClass)

  override def processElement(
    value:Metric,
    ctx:ProcessFunction[Metric, Metric]#Context,
    out:Collector[Metric]
  ): Unit = {
    out.collect(value)
    value match {
      case record:AccountingMetric => {
        logger.info(s"Sending ${record} to Accounting")
        ctx.output(accountingTag, record)
      }
      case record:AnalysingMetric => {
        logger.info(s"Sending ${record} to Analysis")
        ctx.output(analysingTag, record)
      }
      case _ => {
        logger.error(s"Don't know the type of ${value}")
      }
    }
  }
}

And at the end of the pipeline I add the splitter:

    pipeline
      .process(new MetricTypeSplitter(accountTag, analysisTag))

So far, this works and I can see the logs of which tag each metric in being sent being generated. The second part, in which I capture the side output and send the data to sink, doesn't seem to work, though:

    pipeline
      .getSideOutput(accountTag)
      .map { tuple => AccountingSink.toRow(tuple) }.name("Accounting rows")
      .writeUsingOutputFormat(accountingSink.output).name(s"${accountingSink}")
                                                                                                                                                       
And here is the problem: It seems .getSideOutput() is never actually getting the side output because a the logger in AccoutingSink.toRow() is never happening and the data is not showing on our database (toRow() convers the Metric to a Row and accountingSInk.output returns the JDBCOutputFormat).

Any ideas what I need to do for side outputs to be actually captured?

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank" moz-do-not-send="true">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank" moz-do-not-send="true">+55 51 99907 0554


Reply | Threaded
Open this post in threaded view
|

Re: Side outputs never getting consumed

Julio Biason
Hey Timo,

To be completely honest, I _think_ they are POJO, although I use case classes (because I want our data to be immutable).

I wrote a sample code, which basically reflects our pipeline: https://github.com/jbiason/FlinkSample/blob/master/src/main/scala/net/juliobiason/SideoutputSample.scala

The thing to notice is that we do the split to side outputs _after_ the window functions -- because we want to split the results just before the sinks (we had a split there instead, but the job would, sometimes, crash because "splits can't be used with side outputs", or something around those lines). Are we correct in assume that there can't be side outputs once a window is processed?

On Tue, Apr 3, 2018 at 10:17 AM, Timo Walther <[hidden email]> wrote:
Hi Julio,

I tried to reproduce your problem locally but everything run correctly. Could you share a little example job with us?

This worked for me:

class TestingClass {
  var hello: Int = 0
}

class TestA extends TestingClass {
  var test: String = _
}

def main(args: Array[String]) {

  // set up the execution environment
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  // get input data
  val text = env.fromElements(WordCountData.WORDS: _*)

  val outputTag = OutputTag[(String, Int)]("side-output")
  val outputTag2 = OutputTag[TestingClass]("side-output2")

  val counts: DataStream[(String, Int)] = text
    // split up the lines in pairs (2-tuples) containing: (word,1)
    .flatMap(_.toLowerCase.split("\\W+"))
    .filter(_.nonEmpty)
    .map((_, 1))
    // group by the tuple field "0" and sum up tuple field "1"
    .keyBy(0)
    .sum(1)
      .process(new ProcessFunction[(String, Int), (String, Int)] {
        override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = {
          ctx.output(outputTag, value)
          ctx.output(outputTag2, new TestingClass)
          ctx.output(outputTag2, new TestA)
        }
      })

  counts.getSideOutput(outputTag).print()
  counts.getSideOutput(outputTag2).print()

  // execute program
  env.execute("Streaming WordCount")
}

Are the Metric classes proper POJO types?

Regards,
Timo


Am 02.04.18 um 21:53 schrieb Julio Biason:
Hey guys,

I have a pipeline that generates two different types of data (but both use the same trait) and I need to save each on a different sink.

So far, things were working with splits, but it seems using splits with side outputs (for the late data, which we'll plug a late arrival handling) causes errors, so I changed everything to side outputs.

To select a side output based on type, I did the following:

class MetricTypeSplitter(accountingTag:OutputTag[Metric], analysingTag:OutputTag[Metric]) extends ProcessFunction[Metric, Metric] {

  val logger = LoggerFactory.getLogger(this.getClass)

  override def processElement(
    value:Metric,
    ctx:ProcessFunction[Metric, Metric]#Context,
    out:Collector[Metric]
  ): Unit = {
    out.collect(value)
    value match {
      case record:AccountingMetric => {
        logger.info(s"Sending ${record} to Accounting")
        ctx.output(accountingTag, record)
      }
      case record:AnalysingMetric => {
        logger.info(s"Sending ${record} to Analysis")
        ctx.output(analysingTag, record)
      }
      case _ => {
        logger.error(s"Don't know the type of ${value}")
      }
    }
  }
}

And at the end of the pipeline I add the splitter:

    pipeline
      .process(new MetricTypeSplitter(accountTag, analysisTag))

So far, this works and I can see the logs of which tag each metric in being sent being generated. The second part, in which I capture the side output and send the data to sink, doesn't seem to work, though:

    pipeline
      .getSideOutput(accountTag)
      .map { tuple => AccountingSink.toRow(tuple) }.name("Accounting rows")
      .writeUsingOutputFormat(accountingSink.output).name(s"${accountingSink}")
                                                                                                                                                       
And here is the problem: It seems .getSideOutput() is never actually getting the side output because a the logger in AccoutingSink.toRow() is never happening and the data is not showing on our database (toRow() convers the Metric to a Row and accountingSInk.output returns the JDBCOutputFormat).

Any ideas what I need to do for side outputs to be actually captured?

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554





--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
Reply | Threaded
Open this post in threaded view
|

Re: Side outputs never getting consumed

Timo Walther
Hi Julio,

thanks for this great example. I could reproduce it on my machine and I could find the problem.

You need to store the newly created branch of your pipeline in some variable like `val test = pipeline.process()` in order to access the side outputs via `test.getSideOutput(outputSimple)`. Right now your program expects a a side output from the wrong operator (namely the window operation).

Regards,
Timo


Am 04.04.18 um 16:35 schrieb Julio Biason:
Hey Timo,

To be completely honest, I _think_ they are POJO, although I use case classes (because I want our data to be immutable).

I wrote a sample code, which basically reflects our pipeline: https://github.com/jbiason/FlinkSample/blob/master/src/main/scala/net/juliobiason/SideoutputSample.scala

The thing to notice is that we do the split to side outputs _after_ the window functions -- because we want to split the results just before the sinks (we had a split there instead, but the job would, sometimes, crash because "splits can't be used with side outputs", or something around those lines). Are we correct in assume that there can't be side outputs once a window is processed?

On Tue, Apr 3, 2018 at 10:17 AM, Timo Walther <[hidden email]> wrote:
Hi Julio,

I tried to reproduce your problem locally but everything run correctly. Could you share a little example job with us?

This worked for me:

class TestingClass {
  var hello: Int = 0
}

class TestA extends TestingClass {
  var test: String = _
}

def main(args: Array[String]) {

  // set up the execution environment
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  // get input data
  val text = env.fromElements(WordCountData.WORDS: _*)

  val outputTag = OutputTag[(String, Int)]("side-output")
  val outputTag2 = OutputTag[TestingClass]("side-output2")

  val counts: DataStream[(String, Int)] = text
    // split up the lines in pairs (2-tuples) containing: (word,1)
    .flatMap(_.toLowerCase.split("\\W+"))
    .filter(_.nonEmpty)
    .map((_, 1))
    // group by the tuple field "0" and sum up tuple field "1"
    .keyBy(0)
    .sum(1)
      .process(new ProcessFunction[(String, Int), (String, Int)] {
        override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = {
          ctx.output(outputTag, value)
          ctx.output(outputTag2, new TestingClass)
          ctx.output(outputTag2, new TestA)
        }
      })

  counts.getSideOutput(outputTag).print()
  counts.getSideOutput(outputTag2).print()

  // execute program
  env.execute("Streaming WordCount")
}

Are the Metric classes proper POJO types?

Regards,
Timo


Am 02.04.18 um 21:53 schrieb Julio Biason:
Hey guys,

I have a pipeline that generates two different types of data (but both use the same trait) and I need to save each on a different sink.

So far, things were working with splits, but it seems using splits with side outputs (for the late data, which we'll plug a late arrival handling) causes errors, so I changed everything to side outputs.

To select a side output based on type, I did the following:

class MetricTypeSplitter(accountingTag:OutputTag[Metric], analysingTag:OutputTag[Metric]) extends ProcessFunction[Metric, Metric] {

  val logger = LoggerFactory.getLogger(this.getClass)

  override def processElement(
    value:Metric,
    ctx:ProcessFunction[Metric, Metric]#Context,
    out:Collector[Metric]
  ): Unit = {
    out.collect(value)
    value match {
      case record:AccountingMetric => {
        logger.info(s"Sending ${record} to Accounting")
        ctx.output(accountingTag, record)
      }
      case record:AnalysingMetric => {
        logger.info(s"Sending ${record} to Analysis")
        ctx.output(analysingTag, record)
      }
      case _ => {
        logger.error(s"Don't know the type of ${value}")
      }
    }
  }
}

And at the end of the pipeline I add the splitter:

    pipeline
      .process(new MetricTypeSplitter(accountTag, analysisTag))

So far, this works and I can see the logs of which tag each metric in being sent being generated. The second part, in which I capture the side output and send the data to sink, doesn't seem to work, though:

    pipeline
      .getSideOutput(accountTag)
      .map { tuple => AccountingSink.toRow(tuple) }.name("Accounting rows")
      .writeUsingOutputFormat(accountingSink.output).name(s"${accountingSink}")
                                                                                                                                                       
And here is the problem: It seems .getSideOutput() is never actually getting the side output because a the logger in AccoutingSink.toRow() is never happening and the data is not showing on our database (toRow() convers the Metric to a Row and accountingSInk.output returns the JDBCOutputFormat).

Any ideas what I need to do for side outputs to be actually captured?

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank" moz-do-not-send="true">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank" moz-do-not-send="true">+55 51 99907 0554





--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank" moz-do-not-send="true">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank" moz-do-not-send="true">+55 51 99907 0554