Hello everyone,
I'm relatively new to using Apache Flink and Scala, and am just getting to grips with some of the basic functionality both provide. I've hit a wall trying to implement a custom WindowFunction over a keyed countWindow however, and hoped someone may have a pointer. The full code is in a Gist (https://gist.github.com/dbciar/37df92d321c180f5e96e5e3f17806c91), and I am using version Flink 1.0.3, Scala 2.11.
So my workflow is that I read string values from a Kafka queue, parse these into a DataStream of RawObservation type using a custom map, and then create a keyed countWindow stream.
The problem is that when I try to implement a custom WindowFunction the IDE gives an error on the ".apply" function "Cannot resolve symbol apply". I have a feeling that this might be caused by my WindowFunction not being implemented correctly and not matching the signature of the apply function. I think this as when I remove the '[String]' return type from apply ('.apply[String]') I get the following errors:
-------------------------------------------------------------------------------------------------------- Unspecified value parameters: foldFunction: (NotInferedR, RawObservation) => NotInferedR, windowFunction: (Tuple, GlobalWindow, Iterable[NotInferedR], Collector[NotInferedR]) => Unit
Unspecified value parameters: foldFunction: FoldFunction[RawObservation, NotInferedR], function: WindowFunction[NotInferedR, NotInferedR, Tuple, GlobalWindow] Unspecified value parameters: function: WindowFunction[RawObservation, NotInferedR, Tuple, GlobalWindow] Unspecified value parameters: windowFunction: (Tuple, GlobalWindow, Iterable[RawObservation], Collector[NotInferedR]) => Unit Type mismatch, expected: (Tuple, GlobalWindow, Iterable[RawObservation], Collector[NotInferedR]) => Unit, actual: SequentialDeltaCheck Type mismatch, expected: WindowFunction[RawObservation, NotInferedR, Tuple, GlobalWindow], actual: SequentialDeltaCheck --------------------------------------------------------------------------------------------------------
As an aside to this, when defining the WindowFunction, I wasn't sure if I was correctly setting the key type to Tuple2, as it is a compound key.
Any help or pointers to something I may have missed in the docs would be great, I've a had a look through but nothing jumped out at me. I also think I could probably do this using the fold transform, but I wanted to try using window functions first.
Thanks, David
The workflow:
val stream: DataStream[RawObservation] = env
The WindowFunction:
class SequentialDeltaCheck extends WindowFunction[RawObservation, String, String, TimeWindow]{
This message (and any attachments) is for the recipient only. NERC is subject to the Freedom of Information Act 2000 and the contents of this email and any reply you make may be disclosed by NERC unless it is exempt from release under the Act. Any material supplied to NERC may be stored in an electronic records management system. |
Hi David, countWindow(size,slide) creates a GlobalWindow, not a TimeWindow. Also you have to use Tuple instead of Tuple2.
Thanks and Regards, On Wed, Jul 13, 2016 at 7:07 AM, Ciar, David B. <[hidden email]> wrote:
-- Thanks and Regards, Vishnu Viswanath, |
Hi Vishnu,
Thank you for the pointers/modified example, that was really helpful and it is working as expected now.
I took another look through the documentation and found in the "Window" section for streaming data, the "Recipes for building windows" sub-section, where it shows the countWindow being created with a GlobalWindow. After applying your changes and it working, that section made more sense to me.
Cheers, David From: Vishnu Viswanath <[hidden email]>
Sent: 13 July 2016 12:35:44 To: [hidden email] Subject: Re: countWindow custom WindowFunction Hi David, countWindow(size,slide) creates a GlobalWindow, not a TimeWindow. Also you have to use Tuple instead of Tuple2.
Thanks and Regards, On Wed, Jul 13, 2016 at 7:07 AM, Ciar, David B. <[hidden email]> wrote:
--
Thanks and Regards,
Vishnu Viswanath,
This message (and any attachments) is for the recipient only. NERC is subject to the Freedom of Information Act 2000 and the contents of this email and any reply you make may be disclosed by NERC unless it is exempt from release under the Act. Any material supplied to NERC may be stored in an electronic records management system. |
Free forum by Nabble | Edit this page |