Hi,
I am trying to calculate some different metrics using the state backend to control if events have been seen before. I am using the ProcessWindowFunction, but nothing gets through, it is as if the .process-function is ignored. Is it not possible to store a custom case class as ValueState? Or do I need to implement a serializer for the case-class? Or ... Any help is much appreciated. My code: class MetricsProcessFunction extends ProcessWindowFunction[Event,PageviewBasedMetrics,PageviewBasedMetricsGroup,TimeWindow]() { override def open(parameters: Configuration): Unit = { object AggregateMultipleMetrics { |
Hi Martin, I am not sure what is the exact problem. Is it that the ProcessFunction is not invoked or is the problem with values in your state? As for the question of the case class and ValueState. The best way to do it, is to provide the TypeInformation explicitly. If you do not provide the TypeInformation, the ValueStateDescriptor will use java type extraction stack, which can not handle case classes well and if I am not mistaken they will end up serialized with a generic serializer using Kryo. You can create a proper TypeInformation for a scala case class like this: import org.apache.flink.streaming.api.scala._ // important to
import for the implicit scala type extraction val caseClassTypeInfo =
implicitly[TypeInformation[PageviewBasedMetrics]] If the problem is that the function is not being invoked, I'd recommend checking what TimeCharacteristic you are using (I don't necessarily know what is going on in the StreamEnvironment.getStreamEnvironment). If you use the ProcessingTime the results will be emitted only after a minute passes. (You are using TimeWindow.minutes(1)) Hope that helps. Best, Dawid On 18/09/2020 10:42, Martin Frank
Hansen wrote:
signature.asc (849 bytes) Download Attachment |
Hi Dawid, Thanks for your reply, much appreciated. I tried using your implementation for TypeInformation, but still nothing gets through. There are no errors either, but it simply runs without sending data to the sink. I have checked that there is data in the input topic, and I have used the code to run a similar job (with a simple string type as ValueState). I have added a print-statement to the process function, but nothing gets written to the console which suggests that the method is never called, which it should be by this: mainDataStream Could the problem be in the open-method? best regards Den fre. 18. sep. 2020 kl. 12.30 skrev Dawid Wysakowicz <[hidden email]>:
Martin Frank Hansen Data Engineer |
Another note, the case class in hand has about 40 fields in it, is there a maximum limit for the number of fields? best regards Den fre. 18. sep. 2020 kl. 13.05 skrev Martin Frank Hansen <[hidden email]>:
|
Free forum by Nabble | Edit this page |