Passing a custom SourceContext to a SourceFunction

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Passing a custom SourceContext to a SourceFunction

Debasish Ghosh
Hi -

I have a custom SourceFunction ..

class MySourceFunction[T](data: Seq[T]) extends SourceFunction[T] {
  def run(ctx: SourceContext[T]): Unit = {
    data.foreach(d ⇒ ctx.collect(d))
  } 
}

When this function is run during job execution, the SourceContext that gets passed serializes the data. I would like to pass a mock SourceContext (similar to https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java) in the run method. How do I do this ? Note I am not invoking the run method explicitly anywhere.

Any help will be appreciated.

Reply | Threaded
Open this post in threaded view
|

Re: Passing a custom SourceContext to a SourceFunction

Chesnay Schepler
You cannot control what kind of SourceContext is passed into your function.

What are you trying to achieve?

On 15/05/2019 09:30, Debasish Ghosh wrote:
Hi -

I have a custom SourceFunction ..

class MySourceFunction[T](data: Seq[T]) extends SourceFunction[T] {
  def run(ctx: SourceContext[T]): Unit = {
    data.foreach(d ⇒ ctx.collect(d))
  } 
}

When this function is run during job execution, the SourceContext that gets passed serializes the data. I would like to pass a mock SourceContext (similar to https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java) in the run method. How do I do this ? Note I am not invoking the run method explicitly anywhere.

Any help will be appreciated.