Hi All,
I'm trying to create some experiment with rich windowing function and operator state. I modify the streaming stock prices from https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala I create the simple windowing function like below class MyWindowFunction extends RichWindowMapFunction[StockPricex, StockPricex] { println("created") private var counter = 0 override def open(conf: Configuration): Unit = { println("opened") } override def mapWindow(values: Iterable[StockPricex], out: Collector[StockPricex]): Unit = { // if not initialized .. println(counter) println(values) counter = counter + 1 } } However the open() method is not invoked when i try to run this code on my local environment spx .groupBy(x => x.symbol) .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS)) .mapWindow(new MyWindowFunction) Any thought on this one ? Cheers |
Hi tambunanw,
The issue is already known and we’ll patch soon. [1] In next release (maybe 0.9.1), the problem will be solved. Regards, Chiwan Park [1] https://issues.apache.org/jira/browse/FLINK-2257 > On Jul 3, 2015, at 4:57 PM, tambunanw <[hidden email]> wrote: > > Hi All, > > I'm trying to create some experiment with rich windowing function and > operator state. I modify the streaming stock prices from > > https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala > > I create the simple windowing function like below > > class MyWindowFunction extends RichWindowMapFunction[StockPricex, > StockPricex] { > println("created") > private var counter = 0 > > override def open(conf: Configuration): Unit = { > println("opened") > } > > override def mapWindow(values: Iterable[StockPricex], out: > Collector[StockPricex]): Unit = { > // if not initialized .. > > println(counter) > println(values) > counter = counter + 1 > > } > } > > However the open() method is not invoked when i try to run this code on my > local environment > > spx > .groupBy(x => x.symbol) > .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, > TimeUnit.SECONDS)) > .mapWindow(new MyWindowFunction) > > Any thought on this one ? > > > Cheers > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Open-method-is-not-called-with-custom-implementation-RichWindowMapFunction-tp1924.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Thanks Chiwan, Glad to hear that. Cheers On Fri, Jul 3, 2015 at 3:24 PM, Chiwan Park <[hidden email]> wrote: Hi tambunanw, |
I found that the patch had been merged to upstream. [1] :)
Regards, Chiwan Park [1] https://github.com/apache/flink/pull/855 > On Jul 3, 2015, at 5:26 PM, Welly Tambunan <[hidden email]> wrote: > > Thanks Chiwan, > > > Glad to hear that. > > > Cheers > > On Fri, Jul 3, 2015 at 3:24 PM, Chiwan Park <[hidden email]> wrote: > Hi tambunanw, > > The issue is already known and we’ll patch soon. [1] > In next release (maybe 0.9.1), the problem will be solved. > > Regards, > Chiwan Park > > [1] https://issues.apache.org/jira/browse/FLINK-2257 > > > On Jul 3, 2015, at 4:57 PM, tambunanw <[hidden email]> wrote: > > > > Hi All, > > > > I'm trying to create some experiment with rich windowing function and > > operator state. I modify the streaming stock prices from > > > > https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala > > > > I create the simple windowing function like below > > > > class MyWindowFunction extends RichWindowMapFunction[StockPricex, > > StockPricex] { > > println("created") > > private var counter = 0 > > > > override def open(conf: Configuration): Unit = { > > println("opened") > > } > > > > override def mapWindow(values: Iterable[StockPricex], out: > > Collector[StockPricex]): Unit = { > > // if not initialized .. > > > > println(counter) > > println(values) > > counter = counter + 1 > > > > } > > } > > > > However the open() method is not invoked when i try to run this code on my > > local environment > > > > spx > > .groupBy(x => x.symbol) > > .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, > > TimeUnit.SECONDS)) > > .mapWindow(new MyWindowFunction) > > > > Any thought on this one ? > > > > > > Cheers > > > > > > > > -- > > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Open-method-is-not-called-with-custom-implementation-RichWindowMapFunction-tp1924.html > > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > > > > > > > > > -- > Welly Tambunan > Triplelands > > http://weltam.wordpress.com > http://www.triplelands.com |
Thanks Chiwan Great Job ! Cheers On Fri, Jul 3, 2015 at 3:32 PM, Chiwan Park <[hidden email]> wrote: I found that the patch had been merged to upstream. [1] :) |
Free forum by Nabble | Edit this page |