valuestate(descriptor) using a custom caseclass

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

valuestate(descriptor) using a custom caseclass

Martin Frank Hansen
Hi,

I am trying to calculate some different metrics using the state backend to control if events have been seen before. I am using the ProcessWindowFunction, but nothing gets through, it is as if the .process-function is ignored. Is it not possible to store a custom case class as ValueState? Or do I need to implement a serializer for the case-class? Or ...

Any help is much appreciated.

My code: 

class MetricsProcessFunction extends ProcessWindowFunction[Event,PageviewBasedMetrics,PageviewBasedMetricsGroup,TimeWindow]()  {

var pageviewMetricState: ValueState[PageviewBasedMetrics] = _
  override def open(parameters: Configuration): Unit = {
pageviewMetricState = this.getRuntimeContext.getState(new ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics", classOf[PageviewBasedMetrics]))
}

override def process(key: PageviewBasedMetricsGroup, context: Context, elements: Iterable[Event], out: Collector[PageviewBasedMetrics]): Unit = {

if(elements.head.event.getOrElse("") == "page_view"){
val tmp = pwbm.pageviews + 1
val tmpPBM = pwbm.copy(pageviews = tmp,
startTime = Instant.ofEpochMilli(context.window.getStart).atOffset(ZoneOffset.UTC).toInstant,
endTime = Instant.ofEpochMilli(context.window.getEnd).atOffset(ZoneOffset.UTC).toInstant)

pageviewMetricState.update(SnowplowPickler.write(tmpPBM))
}
out.collect(SnowplowPickler.read(pageviewMetricState.value()))
}
}

object AggregateMultipleMetrics  {


def main(args: Array[String]) {
val env: StreamEnvironment = StreamEnvironment.getStreamEnvironment("AggregateMetrics")
val executionEnv: StreamExecutionEnvironment = env.streamExecutionEnv
val appProps: Properties = env.appProps

val inputStream: String = appProps.getProperty("input_topic")
val outputTopic1Min: String = appProps.getProperty("output_topic_1_min")
val outputSerializer1Min: KafkaSerializationSchemaPageviewBasedMetrics = new KafkaSerializationSchemaPageviewBasedMetrics(outputTopic1Min)
val partitioner: FlinkKafkaPartitioner[PageviewBasedMetrics] = new FlinkKafkaKeyPartitioner[PageviewBasedMetrics]()

val snowplowEventSource = new SnowplowEventSource().getStream(inputStream, appProps, executionEnv)

val target1Min: SinkFunction[PageviewBasedMetrics] = new KafkaSink[PageviewBasedMetrics, KafkaSerializationSchemaPageviewBasedMetrics]().getSinkFunction(
outputTopic1Min,
outputSerializer1Min,
partitioner,
appProps)

mainDataStream
.keyBy[PageviewBasedMetricsGroup]((e: Event) => Util.getPageviewBasedMetricsGroup(e))
.timeWindow(Time.minutes(1))
.process(new MetricsProcessFunction)
.addSink(target1Min)

// execute program
executionEnv.execute("Count pageview-based metrics")

}
}




--

Martin Frank Hansen

Data Engineer
Digital Service
M: +45 25 57 14 18
E: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: valuestate(descriptor) using a custom caseclass

Dawid Wysakowicz-2

Hi Martin,

I am not sure what is the exact problem. Is it that the ProcessFunction is not invoked or is the problem with values in your state?

As for the question of the case class and ValueState. The best way to do it, is to provide the TypeInformation explicitly. If you do not provide the TypeInformation, the ValueStateDescriptor will use java type extraction stack, which can not handle case classes well and if I am not mistaken they will end up serialized with a generic serializer using Kryo.

You can create a proper TypeInformation for a scala case class like this:

   import org.apache.flink.streaming.api.scala._ // important to import for the implicit scala type extraction

    val caseClassTypeInfo = implicitly[TypeInformation[PageviewBasedMetrics]]
    this.pageviewMetricState = this.getRuntimeContext
      .getState(new ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics", caseClassTypeInfo))

If the problem is that the function is not being invoked, I'd recommend checking what TimeCharacteristic you are using (I don't necessarily know what is going on in the StreamEnvironment.getStreamEnvironment). If you use the ProcessingTime the results will be emitted only after a minute passes. (You are using TimeWindow.minutes(1))

Hope that helps.

Best,

Dawid

On 18/09/2020 10:42, Martin Frank Hansen wrote:
Hi,

I am trying to calculate some different metrics using the state backend to control if events have been seen before. I am using the ProcessWindowFunction, but nothing gets through, it is as if the .process-function is ignored. Is it not possible to store a custom case class as ValueState? Or do I need to implement a serializer for the case-class? Or ...

Any help is much appreciated.

My code: 

class MetricsProcessFunction extends ProcessWindowFunction[Event,PageviewBasedMetrics,PageviewBasedMetricsGroup,TimeWindow]()  {

  var pageviewMetricState: ValueState[PageviewBasedMetrics] = _
  override def open(parameters: Configuration): Unit = {
    pageviewMetricState = this.getRuntimeContext.getState(new ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics", classOf[PageviewBasedMetrics]))
  }

  override def process(key: PageviewBasedMetricsGroup, context: Context, elements: Iterable[Event], out: Collector[PageviewBasedMetrics]): Unit = {

    if(elements.head.event.getOrElse("") == "page_view"){
      val tmp = pwbm.pageviews + 1
      val tmpPBM = pwbm.copy(pageviews = tmp,
        startTime = Instant.ofEpochMilli(context.window.getStart).atOffset(ZoneOffset.UTC).toInstant,
        endTime = Instant.ofEpochMilli(context.window.getEnd).atOffset(ZoneOffset.UTC).toInstant)

      pageviewMetricState.update(SnowplowPickler.write(tmpPBM))
    }
    out.collect(SnowplowPickler.read(pageviewMetricState.value()))
  }
}

object AggregateMultipleMetrics  {


  def main(args: Array[String]) {
    val env: StreamEnvironment = StreamEnvironment.getStreamEnvironment("AggregateMetrics")
    val executionEnv: StreamExecutionEnvironment = env.streamExecutionEnv
    val appProps: Properties = env.appProps

    val inputStream: String = appProps.getProperty("input_topic")
    val outputTopic1Min: String = appProps.getProperty("output_topic_1_min")
    val outputSerializer1Min: KafkaSerializationSchemaPageviewBasedMetrics = new KafkaSerializationSchemaPageviewBasedMetrics(outputTopic1Min)
    val partitioner: FlinkKafkaPartitioner[PageviewBasedMetrics] = new FlinkKafkaKeyPartitioner[PageviewBasedMetrics]()

    val snowplowEventSource = new SnowplowEventSource().getStream(inputStream, appProps, executionEnv)

    val target1Min: SinkFunction[PageviewBasedMetrics] = new KafkaSink[PageviewBasedMetrics, KafkaSerializationSchemaPageviewBasedMetrics]().getSinkFunction(
      outputTopic1Min,
      outputSerializer1Min,
      partitioner,
      appProps)

    mainDataStream
      .keyBy[PageviewBasedMetricsGroup]((e: Event) => Util.getPageviewBasedMetricsGroup(e))
      .timeWindow(Time.minutes(1))
      .process(new MetricsProcessFunction)
      .addSink(target1Min)

    // execute program
    executionEnv.execute("Count pageview-based metrics")

  }
}




--

Martin Frank Hansen

Data Engineer
Digital Service
M: +45 25 57 14 18
E: [hidden email]



signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: valuestate(descriptor) using a custom caseclass

Martin Frank Hansen
Hi Dawid, 

Thanks for your reply, much appreciated.

I tried using your implementation for TypeInformation, but still nothing gets through. There are no errors either, but it simply runs without sending data to the sink. I have checked that there is data in the input topic, and I have used the code to run a similar job (with a simple string type as ValueState). 

I have added a print-statement to the process function, but nothing gets written to the console which suggests that the method is never called, which it should be by this: 

mainDataStream
.keyBy[PageviewBasedMetricsGroup]((e: Event) => Util.getPageviewBasedMetricsGroup(e))
.timeWindow(Time.minutes(1))
.process(new MetricsProcessFunction)
.addSink(target1Min)

Could the problem be in the open-method? 

best regards


Den fre. 18. sep. 2020 kl. 12.30 skrev Dawid Wysakowicz <[hidden email]>:

Hi Martin,

I am not sure what is the exact problem. Is it that the ProcessFunction is not invoked or is the problem with values in your state?

As for the question of the case class and ValueState. The best way to do it, is to provide the TypeInformation explicitly. If you do not provide the TypeInformation, the ValueStateDescriptor will use java type extraction stack, which can not handle case classes well and if I am not mistaken they will end up serialized with a generic serializer using Kryo.

You can create a proper TypeInformation for a scala case class like this:

   import org.apache.flink.streaming.api.scala._ // important to import for the implicit scala type extraction

    val caseClassTypeInfo = implicitly[TypeInformation[PageviewBasedMetrics]]
    this.pageviewMetricState = this.getRuntimeContext
      .getState(new ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics", caseClassTypeInfo))

If the problem is that the function is not being invoked, I'd recommend checking what TimeCharacteristic you are using (I don't necessarily know what is going on in the StreamEnvironment.getStreamEnvironment). If you use the ProcessingTime the results will be emitted only after a minute passes. (You are using TimeWindow.minutes(1))

Hope that helps.

Best,

Dawid

On 18/09/2020 10:42, Martin Frank Hansen wrote:
Hi,

I am trying to calculate some different metrics using the state backend to control if events have been seen before. I am using the ProcessWindowFunction, but nothing gets through, it is as if the .process-function is ignored. Is it not possible to store a custom case class as ValueState? Or do I need to implement a serializer for the case-class? Or ...

Any help is much appreciated.

My code: 

class MetricsProcessFunction extends ProcessWindowFunction[Event,PageviewBasedMetrics,PageviewBasedMetricsGroup,TimeWindow]()  {

  var pageviewMetricState: ValueState[PageviewBasedMetrics] = _
  override def open(parameters: Configuration): Unit = {
    pageviewMetricState = this.getRuntimeContext.getState(new ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics", classOf[PageviewBasedMetrics]))
  }

  override def process(key: PageviewBasedMetricsGroup, context: Context, elements: Iterable[Event], out: Collector[PageviewBasedMetrics]): Unit = {

    if(elements.head.event.getOrElse("") == "page_view"){
      val tmp = pwbm.pageviews + 1
      val tmpPBM = pwbm.copy(pageviews = tmp,
        startTime = Instant.ofEpochMilli(context.window.getStart).atOffset(ZoneOffset.UTC).toInstant,
        endTime = Instant.ofEpochMilli(context.window.getEnd).atOffset(ZoneOffset.UTC).toInstant)

      pageviewMetricState.update(SnowplowPickler.write(tmpPBM))
    }
    out.collect(SnowplowPickler.read(pageviewMetricState.value()))
  }
}

object AggregateMultipleMetrics  {


  def main(args: Array[String]) {
    val env: StreamEnvironment = StreamEnvironment.getStreamEnvironment("AggregateMetrics")
    val executionEnv: StreamExecutionEnvironment = env.streamExecutionEnv
    val appProps: Properties = env.appProps

    val inputStream: String = appProps.getProperty("input_topic")
    val outputTopic1Min: String = appProps.getProperty("output_topic_1_min")
    val outputSerializer1Min: KafkaSerializationSchemaPageviewBasedMetrics = new KafkaSerializationSchemaPageviewBasedMetrics(outputTopic1Min)
    val partitioner: FlinkKafkaPartitioner[PageviewBasedMetrics] = new FlinkKafkaKeyPartitioner[PageviewBasedMetrics]()

    val snowplowEventSource = new SnowplowEventSource().getStream(inputStream, appProps, executionEnv)

    val target1Min: SinkFunction[PageviewBasedMetrics] = new KafkaSink[PageviewBasedMetrics, KafkaSerializationSchemaPageviewBasedMetrics]().getSinkFunction(
      outputTopic1Min,
      outputSerializer1Min,
      partitioner,
      appProps)

    mainDataStream
      .keyBy[PageviewBasedMetricsGroup]((e: Event) => Util.getPageviewBasedMetricsGroup(e))
      .timeWindow(Time.minutes(1))
      .process(new MetricsProcessFunction)
      .addSink(target1Min)

    // execute program
    executionEnv.execute("Count pageview-based metrics")

  }
}




--

Martin Frank Hansen

Data Engineer
Digital Service
M: +45 25 57 14 18
E: [hidden email]




--

Martin Frank Hansen

Data Engineer
Digital Service
M: +45 25 57 14 18
E: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: valuestate(descriptor) using a custom caseclass

Martin Frank Hansen
Another note, the case class in hand has about 40 fields in it, is there a maximum limit for the number of fields? 

best regards

Den fre. 18. sep. 2020 kl. 13.05 skrev Martin Frank Hansen <[hidden email]>:
Hi Dawid, 

Thanks for your reply, much appreciated.

I tried using your implementation for TypeInformation, but still nothing gets through. There are no errors either, but it simply runs without sending data to the sink. I have checked that there is data in the input topic, and I have used the code to run a similar job (with a simple string type as ValueState). 

I have added a print-statement to the process function, but nothing gets written to the console which suggests that the method is never called, which it should be by this: 

mainDataStream
.keyBy[PageviewBasedMetricsGroup]((e: Event) => Util.getPageviewBasedMetricsGroup(e))
.timeWindow(Time.minutes(1))
.process(new MetricsProcessFunction)
.addSink(target1Min)

Could the problem be in the open-method? 

best regards


Den fre. 18. sep. 2020 kl. 12.30 skrev Dawid Wysakowicz <[hidden email]>:

Hi Martin,

I am not sure what is the exact problem. Is it that the ProcessFunction is not invoked or is the problem with values in your state?

As for the question of the case class and ValueState. The best way to do it, is to provide the TypeInformation explicitly. If you do not provide the TypeInformation, the ValueStateDescriptor will use java type extraction stack, which can not handle case classes well and if I am not mistaken they will end up serialized with a generic serializer using Kryo.

You can create a proper TypeInformation for a scala case class like this:

   import org.apache.flink.streaming.api.scala._ // important to import for the implicit scala type extraction

    val caseClassTypeInfo = implicitly[TypeInformation[PageviewBasedMetrics]]
    this.pageviewMetricState = this.getRuntimeContext
      .getState(new ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics", caseClassTypeInfo))

If the problem is that the function is not being invoked, I'd recommend checking what TimeCharacteristic you are using (I don't necessarily know what is going on in the StreamEnvironment.getStreamEnvironment). If you use the ProcessingTime the results will be emitted only after a minute passes. (You are using TimeWindow.minutes(1))

Hope that helps.

Best,

Dawid

On 18/09/2020 10:42, Martin Frank Hansen wrote:
Hi,

I am trying to calculate some different metrics using the state backend to control if events have been seen before. I am using the ProcessWindowFunction, but nothing gets through, it is as if the .process-function is ignored. Is it not possible to store a custom case class as ValueState? Or do I need to implement a serializer for the case-class? Or ...

Any help is much appreciated.

My code: 

class MetricsProcessFunction extends ProcessWindowFunction[Event,PageviewBasedMetrics,PageviewBasedMetricsGroup,TimeWindow]()  {

  var pageviewMetricState: ValueState[PageviewBasedMetrics] = _
  override def open(parameters: Configuration): Unit = {
    pageviewMetricState = this.getRuntimeContext.getState(new ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics", classOf[PageviewBasedMetrics]))
  }

  override def process(key: PageviewBasedMetricsGroup, context: Context, elements: Iterable[Event], out: Collector[PageviewBasedMetrics]): Unit = {

    if(elements.head.event.getOrElse("") == "page_view"){
      val tmp = pwbm.pageviews + 1
      val tmpPBM = pwbm.copy(pageviews = tmp,
        startTime = Instant.ofEpochMilli(context.window.getStart).atOffset(ZoneOffset.UTC).toInstant,
        endTime = Instant.ofEpochMilli(context.window.getEnd).atOffset(ZoneOffset.UTC).toInstant)

      pageviewMetricState.update(SnowplowPickler.write(tmpPBM))
    }
    out.collect(SnowplowPickler.read(pageviewMetricState.value()))
  }
}

object AggregateMultipleMetrics  {


  def main(args: Array[String]) {
    val env: StreamEnvironment = StreamEnvironment.getStreamEnvironment("AggregateMetrics")
    val executionEnv: StreamExecutionEnvironment = env.streamExecutionEnv
    val appProps: Properties = env.appProps

    val inputStream: String = appProps.getProperty("input_topic")
    val outputTopic1Min: String = appProps.getProperty("output_topic_1_min")
    val outputSerializer1Min: KafkaSerializationSchemaPageviewBasedMetrics = new KafkaSerializationSchemaPageviewBasedMetrics(outputTopic1Min)
    val partitioner: FlinkKafkaPartitioner[PageviewBasedMetrics] = new FlinkKafkaKeyPartitioner[PageviewBasedMetrics]()

    val snowplowEventSource = new SnowplowEventSource().getStream(inputStream, appProps, executionEnv)

    val target1Min: SinkFunction[PageviewBasedMetrics] = new KafkaSink[PageviewBasedMetrics, KafkaSerializationSchemaPageviewBasedMetrics]().getSinkFunction(
      outputTopic1Min,
      outputSerializer1Min,
      partitioner,
      appProps)

    mainDataStream
      .keyBy[PageviewBasedMetricsGroup]((e: Event) => Util.getPageviewBasedMetricsGroup(e))
      .timeWindow(Time.minutes(1))
      .process(new MetricsProcessFunction)
      .addSink(target1Min)

    // execute program
    executionEnv.execute("Count pageview-based metrics")

  }
}




--

Martin Frank Hansen

Data Engineer
Digital Service
M: +45 25 57 14 18
E: [hidden email]




--

Martin Frank Hansen

Data Engineer
Digital Service
M: +45 25 57 14 18
E: [hidden email]



--

Martin Frank Hansen

Data Engineer
Digital Service
M: +45 25 57 14 18
E: [hidden email]