I have a process function defined with these interfaces public class MyProcessFunction<IN> extends ProcessFunction<IN, OUT> implements CheckpointedFunction, ProcessingTimeCallback {...} In snapshotState() method, I want to close files and emit the metadata about the closed files to downstream operator. it doesn't seem possible with snapshotState(FunctionSnapshotContext context) interface. I can keep metadata in snapshot and restore them during recovery. but if there is no input record coming for a long time, processElement(T value, Context ctx, Collector<DataFile> out) won't be called. Then I can't forward the restored data to downstream operator with guaranteed latency. I can add a timer. but it doesn't seem that onProcessingTime(long timestamp) allows me to forward output to downstream operator either. Thanks, Steven |
Hi,
Indeed it seems like this is not possible to emit records on checkpoint/snapshot through ProcessFunction. However you could do it via a custom Operator (there you have a constant access to output collector). Another workaround might be to register processing time service in your ProcessFunction. @Override public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception { ctx.timerService().registerProcessingTimeTimer(...); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception { // … } Piotrek
|
Pirotr, > However you could do it via a custom Operator (there you have a constant access to output collector). Can you elaborate that a little bit? are you referring to "Output<StreamRecord<OUT>> output" in AbstractStreamOperator class? > register processing time service in your ProcessFunction. I think your timer proposal can work. I was originally register timer like this. ProcessingTimeCallback interface doesn't supply the Collector parameter ((StreamingRuntimeContext) getRuntimeContext()) .getProcessingTimeService() .registerTimer(..., this); Thanks, Steven On Mon, Jun 11, 2018 at 2:52 AM, Piotr Nowojski <[hidden email]> wrote:
|
@Override public void processElement(Integer ctx.timerService(). } @Override public void onTimer(long timestamp, // … } On Mon, Jun 11, 2018 at 9:22 AM, Steven Wu <[hidden email]> wrote:
|
Hi, > Can you elaborate that a little bit? are you referring to "Output<StreamRecord<OUT>> output" in AbstractStreamOperator class? Yes. However I have never tried it, so I’m not 100% sure there are no pit falls with that. Regarding processing time timers. You should be able to register the timer once and then re-register in `onTimer(…)` callback using `ctx.timerService()`. Piotrek
|
Free forum by Nabble | Edit this page |