Proper way to call a Python function in WindowFunction.apply()

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Proper way to call a Python function in WindowFunction.apply()

Dongwon Kim-2
Hi all,

What is the proper way to call a Python function in WindowFunction.apply()?

I want to apply a Python function to values in a fixed-side sliding window.
I'm trying it because
- I'm currently working on time-series prediction using deep learning, which is why I need a sliding window to get the latest N items from the unbound data stream.
- I already have a DNN written using Keras on top of Theano (Keras and Theano are Python libraries) in order to exploit Nvidia's CUDA library . 
- There is no Python DataStream API, so I tried to use Scala DataStream API.
- PySpark's structured streaming does not allow me to define UDAF (see a question I posted on stackoverflow about it: http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0)
- Spark DStream API does not look promising to this case due to the lack of support in count window.

For these reasons, I thoughtlessly wrote a toy example to see the feasibility of applying Python methods to values in the sliding window.
--------
import jep.Jep
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow

class WindowFunction extends AllWindowFunction[String, String, GlobalWindow] {
  val jep = new Jep()
  jep.runScript("prediction.py")

  override def apply(window: GlobalWindow, iter: Iterable[String], out: Collector[String]): Unit = {
    // ...
  }
}

object main {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.socketTextStream("localhost", 9999)
      .countWindowAll(5, 1)
      .apply(new WindowFunction())
      .print()
    env.execute()
  }
}
--------

Now I'm facing with serializable error with the following error messages:
--------
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
at org.apache.flink.streaming.api.scala.AllWindowedStream.clean(AllWindowedStream.scala:568)
at org.apache.flink.streaming.api.scala.AllWindowedStream.apply(AllWindowedStream.scala:315)
at main$.main(main.scala:23)
at main.main(main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: jep.Jep
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
... 11 more
--------

Apparently, the source of problem is the third party library called Jep which helps call Python scripts.
Do I have to make the third party library serializable? 
Or there's a way to figure out this sort of thing in a totally different way in Flink?

Any help (even other frameworks than Flink) will be appreciated :-)
Thanks you.

- Dongwon
Reply | Threaded
Open this post in threaded view
|

Re: Proper way to call a Python function in WindowFunction.apply()

Chesnay Schepler
Hello,

I would suggest implementing the RichWindowFunction instead, and instantiate Jep within open(), or maybe do some lazy instantiation within apply.

Regards,
Chesnay

On 14.03.2017 15:47, 김동원 wrote:
Hi all,

What is the proper way to call a Python function in WindowFunction.apply()?

I want to apply a Python function to values in a fixed-side sliding window.
I'm trying it because
- I'm currently working on time-series prediction using deep learning, which is why I need a sliding window to get the latest N items from the unbound data stream.
- I already have a DNN written using Keras on top of Theano (Keras and Theano are Python libraries) in order to exploit Nvidia's CUDA library . 
- There is no Python DataStream API, so I tried to use Scala DataStream API.
- PySpark's structured streaming does not allow me to define UDAF (see a question I posted on stackoverflow about it: http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0)
- Spark DStream API does not look promising to this case due to the lack of support in count window.

For these reasons, I thoughtlessly wrote a toy example to see the feasibility of applying Python methods to values in the sliding window.
--------
import jep.Jep
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow

class WindowFunction extends AllWindowFunction[String, String, GlobalWindow] {
  val jep = new Jep()
  jep.runScript("prediction.py")

  override def apply(window: GlobalWindow, iter: Iterable[String], out: Collector[String]): Unit = {
    // ...
  }
}

object main {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.socketTextStream("localhost", 9999)
      .countWindowAll(5, 1)
      .apply(new WindowFunction())
      .print()
    env.execute()
  }
}
--------

Now I'm facing with serializable error with the following error messages:
--------
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
at org.apache.flink.streaming.api.scala.AllWindowedStream.clean(AllWindowedStream.scala:568)
at org.apache.flink.streaming.api.scala.AllWindowedStream.apply(AllWindowedStream.scala:315)
at main$.main(main.scala:23)
at main.main(main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: jep.Jep
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
... 11 more
--------

Apparently, the source of problem is the third party library called Jep which helps call Python scripts.
Do I have to make the third party library serializable? 
Or there's a way to figure out this sort of thing in a totally different way in Flink?

Any help (even other frameworks than Flink) will be appreciated :-)
Thanks you.

- Dongwon

Reply | Threaded
Open this post in threaded view
|

Re: Proper way to call a Python function in WindowFunction.apply()

Chesnay Schepler
Hey,

Naturally this would imply that you're script is available on all nodes, so you will have to distribute it manually.

On 14.03.2017 17:23, Chesnay Schepler wrote:
Hello,

I would suggest implementing the RichWindowFunction instead, and instantiate Jep within open(), or maybe do some lazy instantiation within apply.

Regards,
Chesnay

On 14.03.2017 15:47, 김동원 wrote:
Hi all,

What is the proper way to call a Python function in WindowFunction.apply()?

I want to apply a Python function to values in a fixed-side sliding window.
I'm trying it because
- I'm currently working on time-series prediction using deep learning, which is why I need a sliding window to get the latest N items from the unbound data stream.
- I already have a DNN written using Keras on top of Theano (Keras and Theano are Python libraries) in order to exploit Nvidia's CUDA library . 
- There is no Python DataStream API, so I tried to use Scala DataStream API.
- PySpark's structured streaming does not allow me to define UDAF (see a question I posted on stackoverflow about it: http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0)
- Spark DStream API does not look promising to this case due to the lack of support in count window.

For these reasons, I thoughtlessly wrote a toy example to see the feasibility of applying Python methods to values in the sliding window.
--------
import jep.Jep
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow

class WindowFunction extends AllWindowFunction[String, String, GlobalWindow] {
  val jep = new Jep()
  jep.runScript("prediction.py")

  override def apply(window: GlobalWindow, iter: Iterable[String], out: Collector[String]): Unit = {
    // ...
  }
}

object main {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.socketTextStream("localhost", 9999)
      .countWindowAll(5, 1)
      .apply(new WindowFunction())
      .print()
    env.execute()
  }
}
--------

Now I'm facing with serializable error with the following error messages:
--------
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
at org.apache.flink.streaming.api.scala.AllWindowedStream.clean(AllWindowedStream.scala:568)
at org.apache.flink.streaming.api.scala.AllWindowedStream.apply(AllWindowedStream.scala:315)
at main$.main(main.scala:23)
at main.main(main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: jep.Jep
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
... 11 more
--------

Apparently, the source of problem is the third party library called Jep which helps call Python scripts.
Do I have to make the third party library serializable? 
Or there's a way to figure out this sort of thing in a totally different way in Flink?

Any help (even other frameworks than Flink) will be appreciated :-)
Thanks you.

- Dongwon


Reply | Threaded
Open this post in threaded view
|

Re: Proper way to call a Python function in WindowFunction.apply()

Dongwon Kim-2
Alright, it works perfectly.
I checked that my Python methods are properly executed inside RichWindowFunction.
Thanks a lot!

p.s. for those who wonder why I use Jep, refer to https://sushant-hiray.me/posts/python-in-scala-stack/ to grasp the idea of using Python inside Java through Jep instead of Jython and JyNI.


------------
class WindowFunction extends RichAllWindowFunction[String, String, GlobalWindow] {
  var jep: Option[Jep] = None

  override def open(parameters: Configuration): Unit = {
    jep = Some(new Jep())
    jep map (_.runScript("prediction.py"))
  }

  override def apply(window: GlobalWindow, iter: Iterable[String], out: Collector[String]): Unit = {
    ...
  }

2017. 3. 15. 오전 1:27, Chesnay Schepler <[hidden email]> 작성:

Hey,

Naturally this would imply that you're script is available on all nodes, so you will have to distribute it manually.

On 14.03.2017 17:23, Chesnay Schepler wrote:
Hello,

I would suggest implementing the RichWindowFunction instead, and instantiate Jep within open(), or maybe do some lazy instantiation within apply.

Regards,
Chesnay

On 14.03.2017 15:47, 김동원 wrote:
Hi all,

What is the proper way to call a Python function in WindowFunction.apply()?

I want to apply a Python function to values in a fixed-side sliding window.
I'm trying it because
- I'm currently working on time-series prediction using deep learning, which is why I need a sliding window to get the latest N items from the unbound data stream.
- I already have a DNN written using Keras on top of Theano (Keras and Theano are Python libraries) in order to exploit Nvidia's CUDA library . 
- There is no Python DataStream API, so I tried to use Scala DataStream API.
- PySpark's structured streaming does not allow me to define UDAF (see a question I posted on stackoverflow about it: http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0)
- Spark DStream API does not look promising to this case due to the lack of support in count window.

For these reasons, I thoughtlessly wrote a toy example to see the feasibility of applying Python methods to values in the sliding window.
--------
import jep.Jep
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow

class WindowFunction extends AllWindowFunction[String, String, GlobalWindow] {
  val jep = new Jep()
  jep.runScript("prediction.py")

  override def apply(window: GlobalWindow, iter: Iterable[String], out: Collector[String]): Unit = {
    // ...
  }
}

object main {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.socketTextStream("localhost", 9999)
      .countWindowAll(5, 1)
      .apply(new WindowFunction())
      .print()
    env.execute()
  }
}
--------

Now I'm facing with serializable error with the following error messages:
--------
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
at org.apache.flink.streaming.api.scala.AllWindowedStream.clean(AllWindowedStream.scala:568)
at org.apache.flink.streaming.api.scala.AllWindowedStream.apply(AllWindowedStream.scala:315)
at main$.main(main.scala:23)
at main.main(main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: jep.Jep
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
... 11 more
--------

Apparently, the source of problem is the third party library called Jep which helps call Python scripts.
Do I have to make the third party library serializable? 
Or there's a way to figure out this sort of thing in a totally different way in Flink?

Any help (even other frameworks than Flink) will be appreciated :-)
Thanks you.

- Dongwon