Hi!
To work around FLINK-2491 which causes checkpointing issues for us I am trying to chain
SourceFunction
s so that the first one never quits. The basic idea is as follows:
class WrappingSourceFunction(innerSourceFunction: SourceFunction[Inner]) extends SourceFunction[Outer] {
override def run(outerCtx: SourceContext[Outer]): Unit = {
outerCtx.collect(...)
val innerCtx: SourceContext[Inner] = new SourceContextWrapper(outerCtx)
innerSourceFunction.run(innerCtx)
}
override def cancel() = innerSourceFunction.cancel()
}
Is it ok to call run()
of a different
SourceFunction
inside of
run()
and implement my own SourceContext
delegating to another one? It works for a small test running on a local Flink environment, but I am wondering if there could
be any issues doing that on production.
Thanks,
Jochen