Hello everyone,
I'm trying to understand how I can use the Incremental Aggregation + Window Functions, as I've been unsuccessfully trying for a while now.
The use-case is one where I have a stream of objects, I want to count the number of objects
within a sliding window, and then within the window compare the count against a Redis threshold value, then emit a new type of object with the
count, the window meta-data, and a boolean of whether the threshold was reached.
The individual parts I understand, but it is getting the fold and window function correct that is giving me trouble. The stripped down code for example is here:
https://gist.github.com/dbciar/36e4cf7df6eae2c214efb72d30385fd8 I think one issue is that what I want to do may not be possible, as when I look at the apply(accumulator, fold, window) definition here:
..I think that the apply function needs to output a DataStream of the same type that the fold function uses as an aggregator, is this correct?
At the moment I get the error that apply does not have a matching signature, even when I changed the window function to return an Int.
I'm using version 1.1.0 artefacts from https://repository.apache.org/content/repositories/orgapacheflink-1098/.
Any help or links would be great, I've tried searching the mailing list but I wasn't able to find anything that's close to this situation that I could use,
Thanks David
This message (and any attachments) is for the recipient only. NERC is subject to the Freedom of Information Act 2000 and the contents of this email and any reply you make may be disclosed by NERC unless it is exempt from release under the Act. Any material supplied to NERC may be stored in an electronic records management system. |
Hello again,
Having had another go at this today, I clearly see that I cannot pass a certain type into the fold/window function and expect to be able to return a datastream of another type from the window function. I have tried a different approach and am now receiving a run-time exception, caused by trying to use a composite case class as the fold accumulator value. My query now is whether this is possible, and if it is possible, how to fix the run-time exception. Again any help is appreciated. The exception: Exception in thread "main" java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lorg.apache.flink.api.common.typeinfo.TypeInformation; at org.management.observations.processing.jobs.QCBlockNull$$anon$6.<init>(QCBlockNull.scala:104) at org.management.observations.processing.jobs.QCBlockNull$.main(QCBlockNull.scala:104) at org.management.observations.processing.jobs.QCBlockNull.main(QCBlockNull.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) The code snippet is pasted below, but also neater formatted Gist link: // The cause of the exception is the .apply(...) below and the use of IncrementalPlaceHolder. The fold and window classes return type IncrementalWindowPlaceholder val nullQCEvents1h = nullStream .keyBy("feature","procedure") .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(30))) .apply(new IncrementalWindowPlaceholder(0,None,None,None), new QCFoldCounter(), new QCCheckNullAggregate()) // The aggregate class I want to use with the fold/window function and emit as the DataStream type: case class IncrementalWindowPlaceholder (foldedValue: Double, keys: Option[Tuple], startTime: Option[Long], endTime: Option[Long]){ override def toString: String = foldedValue.toString+','+keys.getOrElse('-')+','+startTime.getOrElse('-')+','+endTime.getOrElse('-') } Also here: https://gist.github.com/dbciar/904e2d35d6aae30214666de1176f1d7c |
Free forum by Nabble | Edit this page |