Access to a shared resource within a mapper

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Access to a shared resource within a mapper

Timur Fayruzov
Hello,

I'm writing a Scala Flink application. I have a standalone process that exists on every Flink node that I need to call to transform my data. To access this process I need to initialize non thread-safe client first. I would like to avoid initializing a client for each element being transformed. A straightforward implementation would be something like this:
```
val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
val pool = new ArrayBlockingQueue[Client](5)
// pool is filled here
data.map(e => {
val client = pool.take()
val res = client.transform(e)
pool.put(client)
res
})
```
However, this causes a runtime exception with message "Task not serializable", which makes sense.

Function parameters and broadcast variables won't work either as far as I understand. Is there a way to make this happen?

Thanks,
Timur
Reply | Threaded
Open this post in threaded view
|

Re: Access to a shared resource within a mapper

Aljoscha Krettek
Hi,
you could use a RichMapFunction that has an open method:

data.map(new RichMapFunction[...]() {
  def open(): () = {
    // initialize client
  }
 
  def map(input: INT): OUT = {
    // use client
  }
}

the open() method is called before any elements are passed to the function. The counterpart of open() is close(), which is called after all elements are through or if the job cancels.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <[hidden email]> wrote:
Hello,

I'm writing a Scala Flink application. I have a standalone process that exists on every Flink node that I need to call to transform my data. To access this process I need to initialize non thread-safe client first. I would like to avoid initializing a client for each element being transformed. A straightforward implementation would be something like this:
```
val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
val pool = new ArrayBlockingQueue[Client](5)
// pool is filled here
data.map(e => {
val client = pool.take()
val res = client.transform(e)
pool.put(client)
res
})
```
However, this causes a runtime exception with message "Task not serializable", which makes sense.

Function parameters and broadcast variables won't work either as far as I understand. Is there a way to make this happen?

Thanks,
Timur
Reply | Threaded
Open this post in threaded view
|

Re: Access to a shared resource within a mapper

Timur Fayruzov
Outstanding! Thanks, Aljoscha.

On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
you could use a RichMapFunction that has an open method:

data.map(new RichMapFunction[...]() {
  def open(): () = {
    // initialize client
  }
 
  def map(input: INT): OUT = {
    // use client
  }
}

the open() method is called before any elements are passed to the function. The counterpart of open() is close(), which is called after all elements are through or if the job cancels.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <[hidden email]> wrote:
Hello,

I'm writing a Scala Flink application. I have a standalone process that exists on every Flink node that I need to call to transform my data. To access this process I need to initialize non thread-safe client first. I would like to avoid initializing a client for each element being transformed. A straightforward implementation would be something like this:
```
val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
val pool = new ArrayBlockingQueue[Client](5)
// pool is filled here
data.map(e => {
val client = pool.take()
val res = client.transform(e)
pool.put(client)
res
})
```
However, this causes a runtime exception with message "Task not serializable", which makes sense.

Function parameters and broadcast variables won't work either as far as I understand. Is there a way to make this happen?

Thanks,
Timur

Reply | Threaded
Open this post in threaded view
|

Re: Access to a shared resource within a mapper

Stephan Ewen
You may also be able to initialize the client only in the parallel execution by making it a "lazy" variable in Scala.

On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <[hidden email]> wrote:
Outstanding! Thanks, Aljoscha.

On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
you could use a RichMapFunction that has an open method:

data.map(new RichMapFunction[...]() {
  def open(): () = {
    // initialize client
  }
 
  def map(input: INT): OUT = {
    // use client
  }
}

the open() method is called before any elements are passed to the function. The counterpart of open() is close(), which is called after all elements are through or if the job cancels.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <[hidden email]> wrote:
Hello,

I'm writing a Scala Flink application. I have a standalone process that exists on every Flink node that I need to call to transform my data. To access this process I need to initialize non thread-safe client first. I would like to avoid initializing a client for each element being transformed. A straightforward implementation would be something like this:
```
val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
val pool = new ArrayBlockingQueue[Client](5)
// pool is filled here
data.map(e => {
val client = pool.take()
val res = client.transform(e)
pool.put(client)
res
})
```
However, this causes a runtime exception with message "Task not serializable", which makes sense.

Function parameters and broadcast variables won't work either as far as I understand. Is there a way to make this happen?

Thanks,
Timur


Reply | Threaded
Open this post in threaded view
|

Re: Access to a shared resource within a mapper

Timur Fayruzov
Actually, a follow-up question: is map function single-threaded (within one task manager, that is). If it's not then lazy initialization wont' work, is it right?

On Fri, Apr 22, 2016 at 11:50 AM, Stephan Ewen <[hidden email]> wrote:
You may also be able to initialize the client only in the parallel execution by making it a "lazy" variable in Scala.

On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <[hidden email]> wrote:
Outstanding! Thanks, Aljoscha.

On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
you could use a RichMapFunction that has an open method:

data.map(new RichMapFunction[...]() {
  def open(): () = {
    // initialize client
  }
 
  def map(input: INT): OUT = {
    // use client
  }
}

the open() method is called before any elements are passed to the function. The counterpart of open() is close(), which is called after all elements are through or if the job cancels.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <[hidden email]> wrote:
Hello,

I'm writing a Scala Flink application. I have a standalone process that exists on every Flink node that I need to call to transform my data. To access this process I need to initialize non thread-safe client first. I would like to avoid initializing a client for each element being transformed. A straightforward implementation would be something like this:
```
val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
val pool = new ArrayBlockingQueue[Client](5)
// pool is filled here
data.map(e => {
val client = pool.take()
val res = client.transform(e)
pool.put(client)
res
})
```
However, this causes a runtime exception with message "Task not serializable", which makes sense.

Function parameters and broadcast variables won't work either as far as I understand. Is there a way to make this happen?

Thanks,
Timur



Reply | Threaded
Open this post in threaded view
|

Re: Access to a shared resource within a mapper

Fabian Hueske-2
Hi Timur,

a TaskManager may run as many subtasks of a Map operator as it has slots. Each subtask of an operator runs in a different thread. Each parallel subtask of a Map operator has its own MapFunction object, so it should be possible to use a lazy val.

However, you should not use static variables to hold state, because these are shared between all MapFunction in a TaskManager (JVM).

2016-04-22 21:21 GMT+02:00 Timur Fayruzov <[hidden email]>:
Actually, a follow-up question: is map function single-threaded (within one task manager, that is). If it's not then lazy initialization wont' work, is it right?

On Fri, Apr 22, 2016 at 11:50 AM, Stephan Ewen <[hidden email]> wrote:
You may also be able to initialize the client only in the parallel execution by making it a "lazy" variable in Scala.

On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <[hidden email]> wrote:
Outstanding! Thanks, Aljoscha.

On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
you could use a RichMapFunction that has an open method:

data.map(new RichMapFunction[...]() {
  def open(): () = {
    // initialize client
  }
 
  def map(input: INT): OUT = {
    // use client
  }
}

the open() method is called before any elements are passed to the function. The counterpart of open() is close(), which is called after all elements are through or if the job cancels.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <[hidden email]> wrote:
Hello,

I'm writing a Scala Flink application. I have a standalone process that exists on every Flink node that I need to call to transform my data. To access this process I need to initialize non thread-safe client first. I would like to avoid initializing a client for each element being transformed. A straightforward implementation would be something like this:
```
val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
val pool = new ArrayBlockingQueue[Client](5)
// pool is filled here
data.map(e => {
val client = pool.take()
val res = client.transform(e)
pool.put(client)
res
})
```
However, this causes a runtime exception with message "Task not serializable", which makes sense.

Function parameters and broadcast variables won't work either as far as I understand. Is there a way to make this happen?

Thanks,
Timur




Reply | Threaded
Open this post in threaded view
|

Re: Access to a shared resource within a mapper

Timur Fayruzov
Hi Fabian,

I didn't realize you meant that lazy val should be inside RichMapFunction implementation, it makes sense. That's what I ended up doing already.

Thanks!
Timur

On Mon, Apr 25, 2016 at 3:34 AM, Fabian Hueske <[hidden email]> wrote:
Hi Timur,

a TaskManager may run as many subtasks of a Map operator as it has slots. Each subtask of an operator runs in a different thread. Each parallel subtask of a Map operator has its own MapFunction object, so it should be possible to use a lazy val.

However, you should not use static variables to hold state, because these are shared between all MapFunction in a TaskManager (JVM).

2016-04-22 21:21 GMT+02:00 Timur Fayruzov <[hidden email]>:
Actually, a follow-up question: is map function single-threaded (within one task manager, that is). If it's not then lazy initialization wont' work, is it right?

On Fri, Apr 22, 2016 at 11:50 AM, Stephan Ewen <[hidden email]> wrote:
You may also be able to initialize the client only in the parallel execution by making it a "lazy" variable in Scala.

On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <[hidden email]> wrote:
Outstanding! Thanks, Aljoscha.

On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
you could use a RichMapFunction that has an open method:

data.map(new RichMapFunction[...]() {
  def open(): () = {
    // initialize client
  }
 
  def map(input: INT): OUT = {
    // use client
  }
}

the open() method is called before any elements are passed to the function. The counterpart of open() is close(), which is called after all elements are through or if the job cancels.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <[hidden email]> wrote:
Hello,

I'm writing a Scala Flink application. I have a standalone process that exists on every Flink node that I need to call to transform my data. To access this process I need to initialize non thread-safe client first. I would like to avoid initializing a client for each element being transformed. A straightforward implementation would be something like this:
```
val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
val pool = new ArrayBlockingQueue[Client](5)
// pool is filled here
data.map(e => {
val client = pool.take()
val res = client.transform(e)
pool.put(client)
res
})
```
However, this causes a runtime exception with message "Task not serializable", which makes sense.

Function parameters and broadcast variables won't work either as far as I understand. Is there a way to make this happen?

Thanks,
Timur