Open method is not called with custom implementation RichWindowMapFunction

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Open method is not called with custom implementation RichWindowMapFunction

tambunanw
Hi All,

I'm trying to create some experiment with rich windowing function and operator state. I modify the streaming stock prices from

https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala

I create the simple windowing function like below

class MyWindowFunction extends RichWindowMapFunction[StockPricex, StockPricex] {
  println("created")
  private var counter = 0

  override def open(conf: Configuration): Unit = {
    println("opened")
  }

  override def mapWindow(values: Iterable[StockPricex], out: Collector[StockPricex]): Unit = {
    // if not initialized ..

    println(counter)
    println(values)
    counter = counter + 1

  }
}

However the open() method is not invoked when i try to run this code on my local environment

    spx
      .groupBy(x => x.symbol)
      .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS))
      .mapWindow(new MyWindowFunction)

Any thought on this one ?


Cheers
Reply | Threaded
Open this post in threaded view
|

Re: Open method is not called with custom implementation RichWindowMapFunction

Chiwan Park-2
Hi tambunanw,

The issue is already known and we’ll patch soon. [1]
In next release (maybe 0.9.1), the problem will be solved.

Regards,
Chiwan Park

[1] https://issues.apache.org/jira/browse/FLINK-2257

> On Jul 3, 2015, at 4:57 PM, tambunanw <[hidden email]> wrote:
>
> Hi All,
>
> I'm trying to create some experiment with rich windowing function and
> operator state. I modify the streaming stock prices from
>
> https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
>
> I create the simple windowing function like below
>
> class MyWindowFunction extends RichWindowMapFunction[StockPricex,
> StockPricex] {
>  println("created")
>  private var counter = 0
>
>  override def open(conf: Configuration): Unit = {
>    println("opened")
>  }
>
>  override def mapWindow(values: Iterable[StockPricex], out:
> Collector[StockPricex]): Unit = {
>    // if not initialized ..
>
>    println(counter)
>    println(values)
>    counter = counter + 1
>
>  }
> }
>
> However the open() method is not invoked when i try to run this code on my
> local environment
>
>    spx
>      .groupBy(x => x.symbol)
>      .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1,
> TimeUnit.SECONDS))
>      .mapWindow(new MyWindowFunction)
>
> Any thought on this one ?
>
>
> Cheers
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Open-method-is-not-called-with-custom-implementation-RichWindowMapFunction-tp1924.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.





Reply | Threaded
Open this post in threaded view
|

Re: Open method is not called with custom implementation RichWindowMapFunction

tambunanw
Thanks Chiwan, 


Glad to hear that. 


Cheers

On Fri, Jul 3, 2015 at 3:24 PM, Chiwan Park <[hidden email]> wrote:
Hi tambunanw,

The issue is already known and we’ll patch soon. [1]
In next release (maybe 0.9.1), the problem will be solved.

Regards,
Chiwan Park

[1] https://issues.apache.org/jira/browse/FLINK-2257

> On Jul 3, 2015, at 4:57 PM, tambunanw <[hidden email]> wrote:
>
> Hi All,
>
> I'm trying to create some experiment with rich windowing function and
> operator state. I modify the streaming stock prices from
>
> https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
>
> I create the simple windowing function like below
>
> class MyWindowFunction extends RichWindowMapFunction[StockPricex,
> StockPricex] {
>  println("created")
>  private var counter = 0
>
>  override def open(conf: Configuration): Unit = {
>    println("opened")
>  }
>
>  override def mapWindow(values: Iterable[StockPricex], out:
> Collector[StockPricex]): Unit = {
>    // if not initialized ..
>
>    println(counter)
>    println(values)
>    counter = counter + 1
>
>  }
> }
>
> However the open() method is not invoked when i try to run this code on my
> local environment
>
>    spx
>      .groupBy(x => x.symbol)
>      .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1,
> TimeUnit.SECONDS))
>      .mapWindow(new MyWindowFunction)
>
> Any thought on this one ?
>
>
> Cheers
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Open-method-is-not-called-with-custom-implementation-RichWindowMapFunction-tp1924.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.








--
Reply | Threaded
Open this post in threaded view
|

Re: Open method is not called with custom implementation RichWindowMapFunction

Chiwan Park-2
I found that the patch had been merged to upstream. [1] :)

Regards,
Chiwan Park

[1] https://github.com/apache/flink/pull/855

> On Jul 3, 2015, at 5:26 PM, Welly Tambunan <[hidden email]> wrote:
>
> Thanks Chiwan,
>
>
> Glad to hear that.
>
>
> Cheers
>
> On Fri, Jul 3, 2015 at 3:24 PM, Chiwan Park <[hidden email]> wrote:
> Hi tambunanw,
>
> The issue is already known and we’ll patch soon. [1]
> In next release (maybe 0.9.1), the problem will be solved.
>
> Regards,
> Chiwan Park
>
> [1] https://issues.apache.org/jira/browse/FLINK-2257
>
> > On Jul 3, 2015, at 4:57 PM, tambunanw <[hidden email]> wrote:
> >
> > Hi All,
> >
> > I'm trying to create some experiment with rich windowing function and
> > operator state. I modify the streaming stock prices from
> >
> > https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
> >
> > I create the simple windowing function like below
> >
> > class MyWindowFunction extends RichWindowMapFunction[StockPricex,
> > StockPricex] {
> >  println("created")
> >  private var counter = 0
> >
> >  override def open(conf: Configuration): Unit = {
> >    println("opened")
> >  }
> >
> >  override def mapWindow(values: Iterable[StockPricex], out:
> > Collector[StockPricex]): Unit = {
> >    // if not initialized ..
> >
> >    println(counter)
> >    println(values)
> >    counter = counter + 1
> >
> >  }
> > }
> >
> > However the open() method is not invoked when i try to run this code on my
> > local environment
> >
> >    spx
> >      .groupBy(x => x.symbol)
> >      .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1,
> > TimeUnit.SECONDS))
> >      .mapWindow(new MyWindowFunction)
> >
> > Any thought on this one ?
> >
> >
> > Cheers
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Open-method-is-not-called-with-custom-implementation-RichWindowMapFunction-tp1924.html
> > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>
>
>
>
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com

Reply | Threaded
Open this post in threaded view
|

Re: Open method is not called with custom implementation RichWindowMapFunction

tambunanw
Thanks Chiwan

Great Job !

Cheers 

On Fri, Jul 3, 2015 at 3:32 PM, Chiwan Park <[hidden email]> wrote:
I found that the patch had been merged to upstream. [1] :)

Regards,
Chiwan Park

[1] https://github.com/apache/flink/pull/855

> On Jul 3, 2015, at 5:26 PM, Welly Tambunan <[hidden email]> wrote:
>
> Thanks Chiwan,
>
>
> Glad to hear that.
>
>
> Cheers
>
> On Fri, Jul 3, 2015 at 3:24 PM, Chiwan Park <[hidden email]> wrote:
> Hi tambunanw,
>
> The issue is already known and we’ll patch soon. [1]
> In next release (maybe 0.9.1), the problem will be solved.
>
> Regards,
> Chiwan Park
>
> [1] https://issues.apache.org/jira/browse/FLINK-2257
>
> > On Jul 3, 2015, at 4:57 PM, tambunanw <[hidden email]> wrote:
> >
> > Hi All,
> >
> > I'm trying to create some experiment with rich windowing function and
> > operator state. I modify the streaming stock prices from
> >
> > https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
> >
> > I create the simple windowing function like below
> >
> > class MyWindowFunction extends RichWindowMapFunction[StockPricex,
> > StockPricex] {
> >  println("created")
> >  private var counter = 0
> >
> >  override def open(conf: Configuration): Unit = {
> >    println("opened")
> >  }
> >
> >  override def mapWindow(values: Iterable[StockPricex], out:
> > Collector[StockPricex]): Unit = {
> >    // if not initialized ..
> >
> >    println(counter)
> >    println(values)
> >    counter = counter + 1
> >
> >  }
> > }
> >
> > However the open() method is not invoked when i try to run this code on my
> > local environment
> >
> >    spx
> >      .groupBy(x => x.symbol)
> >      .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1,
> > TimeUnit.SECONDS))
> >      .mapWindow(new MyWindowFunction)
> >
> > Any thought on this one ?
> >
> >
> > Cheers
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Open-method-is-not-called-with-custom-implementation-RichWindowMapFunction-tp1924.html
> > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>
>
>
>
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com




--