Call run() of another SourceFunction inside run()?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Call run() of another SourceFunction inside run()?

Schneider, Jochen

Hi!

To work around FLINK-2491 which causes checkpointing issues for us I am trying to chain SourceFunctions so that the first one never quits. The basic idea is as follows:

class WrappingSourceFunction(innerSourceFunction: SourceFunction[Inner]) extends SourceFunction[Outer] {
  override def run(outerCtx: SourceContext[Outer]): Unit = {
    outerCtx.collect(...)
 
 
    val innerCtx: SourceContext[Inner] = new SourceContextWrapper(outerCtx)
    innerSourceFunction.run(innerCtx)
  }
 
  override def cancel() = innerSourceFunction.cancel()
}

Is it ok to call run() of a different SourceFunction inside of run() and implement my own SourceContext delegating to another one? It works for a small test running on a local Flink environment, but I am wondering if there could be any issues doing that on production.

Thanks,

                Jochen

Reply | Threaded
Open this post in threaded view
|

Re: Call run() of another SourceFunction inside run()?

Piotr Nowojski-4
Hi,

I think it should be working. At least from the top of my head I do not see any reason why it shouldn't be working. 

Just make sure that you are proxying all relevant methods, not only those defined in `SourceFunction`. For example `FlinkKafkaConsumer` is implementing/extending: `RichParallelSourceFunction`, `CheckpointListener`, `CheckpointedFunction` and  `ResultTypeQueryable<T>`, so if you want to wrap `FlinkKafkaConsumer`, you would need to proxy all of those interfaces/calls from your `WrappingSourceFunction` to the `innerSourceFunction`. 

Best,
Piotrek

śr., 14 kwi 2021 o 11:36 Schneider, Jochen <[hidden email]> napisał(a):

Hi!

To work around FLINK-2491 which causes checkpointing issues for us I am trying to chain SourceFunctions so that the first one never quits. The basic idea is as follows:

class WrappingSourceFunction(innerSourceFunction: SourceFunction[Inner]) extends SourceFunction[Outer] {
  override def run(outerCtx: SourceContext[Outer]): Unit = {
    outerCtx.collect(...)
 
 
    val innerCtx: SourceContext[Inner] = new SourceContextWrapper(outerCtx)
    innerSourceFunction.run(innerCtx)
  }
 
  override def cancel() = innerSourceFunction.cancel()
}

Is it ok to call run() of a different SourceFunction inside of run() and implement my own SourceContext delegating to another one? It works for a small test running on a local Flink environment, but I am wondering if there could be any issues doing that on production.

Thanks,

                Jochen