how to emit record to downstream operator in snapshotState and/or onProcessingTime

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

how to emit record to downstream operator in snapshotState and/or onProcessingTime

Steven Wu
I have a process function defined with these interfaces

public class MyProcessFunction<IN> extends ProcessFunction<IN, OUT> 
    implements CheckpointedFunction, ProcessingTimeCallback {...}

In snapshotState() method, I want to close files and emit the metadata about the closed files to downstream operator. it doesn't seem possible with snapshotState(FunctionSnapshotContext context) interface.

I can keep metadata in snapshot and restore them during recovery. but if there is no input record coming for a long time,  processElement(T value, Context ctx, Collector<DataFile> out) won't be called. Then I can't forward the restored data to downstream operator with guaranteed latency.

I can add a timer. but it doesn't seem that onProcessingTime(long timestamp) allows me to forward output to downstream operator either.

Thanks,
Steven
Reply | Threaded
Open this post in threaded view
|

Re: how to emit record to downstream operator in snapshotState and/or onProcessingTime

Piotr Nowojski
Hi,

Indeed it seems like this is not possible to emit records on checkpoint/snapshot through ProcessFunction. However you could do it via a custom Operator (there you have a constant access to output collector). Another workaround might be to register processing time service in your ProcessFunction.

@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
   ctx.timerService().registerProcessingTimeTimer(...);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
   // …
}

Piotrek

On 11 Jun 2018, at 01:07, Steven Wu <[hidden email]> wrote:

I have a process function defined with these interfaces

public class MyProcessFunction<IN> extends ProcessFunction<IN, OUT> 
    implements CheckpointedFunction, ProcessingTimeCallback {...}

In snapshotState() method, I want to close files and emit the metadata about the closed files to downstream operator. it doesn't seem possible with snapshotState(FunctionSnapshotContext context) interface.

I can keep metadata in snapshot and restore them during recovery. but if there is no input record coming for a long time,  processElement(T value, Context ctx, Collector<DataFile> out) won't be called. Then I can't forward the restored data to downstream operator with guaranteed latency.

I can add a timer. but it doesn't seem that onProcessingTime(long timestamp) allows me to forward output to downstream operator either.

Thanks,
Steven

Reply | Threaded
Open this post in threaded view
|

Re: how to emit record to downstream operator in snapshotState and/or onProcessingTime

Steven Wu
Pirotr,

However you could do it via a custom Operator (there you have a constant access to output collector). 

Can you elaborate that a little bit? are you referring to "Output<StreamRecord<OUT>> output" in AbstractStreamOperator class?

register processing time service in your ProcessFunction.

I think your timer proposal can work. 

I was originally register timer like this. ProcessingTimeCallback interface doesn't supply the Collector parameter

((StreamingRuntimeContext) getRuntimeContext())
    .getProcessingTimeService()
    .registerTimer(..., this);

Thanks, 
Steven



On Mon, Jun 11, 2018 at 2:52 AM, Piotr Nowojski <[hidden email]> wrote:
Hi,

Indeed it seems like this is not possible to emit records on checkpoint/snapshot through ProcessFunction. However you could do it via a custom Operator (there you have a constant access to output collector). Another workaround might be to register processing time service in your ProcessFunction.

@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
   ctx.timerService().registerProcessingTimeTimer(...);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
   // …
}

Piotrek

On 11 Jun 2018, at 01:07, Steven Wu <[hidden email]> wrote:

I have a process function defined with these interfaces

public class MyProcessFunction<IN> extends ProcessFunction<IN, OUT> 
    implements CheckpointedFunction, ProcessingTimeCallback {...}

In snapshotState() method, I want to close files and emit the metadata about the closed files to downstream operator. it doesn't seem possible with snapshotState(FunctionSnapshotContext context) interface.

I can keep metadata in snapshot and restore them during recovery. but if there is no input record coming for a long time,  processElement(T value, Context ctx, Collector<DataFile> out) won't be called. Then I can't forward the restored data to downstream operator with guaranteed latency.

I can add a timer. but it doesn't seem that onProcessingTime(long timestamp) allows me to forward output to downstream operator either.

Thanks,
Steven


Reply | Threaded
Open this post in threaded view
|

Re: how to emit record to downstream operator in snapshotState and/or onProcessingTime

Steven Wu

@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
   ctx.timerService().registerProcessingTimeTimer(...);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
   // …
}

correcting myself regarding the above timer proposal. it still requires a message/record come in. I am trying to guard against when there is a long gap of idle. then I won't be able to register a timer.


On Mon, Jun 11, 2018 at 9:22 AM, Steven Wu <[hidden email]> wrote:
Pirotr,

However you could do it via a custom Operator (there you have a constant access to output collector). 

Can you elaborate that a little bit? are you referring to "Output<StreamRecord<OUT>> output" in AbstractStreamOperator class?

register processing time service in your ProcessFunction.

I think your timer proposal can work. 

I was originally register timer like this. ProcessingTimeCallback interface doesn't supply the Collector parameter

((StreamingRuntimeContext) getRuntimeContext())
    .getProcessingTimeService()
    .registerTimer(..., this);

Thanks, 
Steven



On Mon, Jun 11, 2018 at 2:52 AM, Piotr Nowojski <[hidden email]> wrote:
Hi,

Indeed it seems like this is not possible to emit records on checkpoint/snapshot through ProcessFunction. However you could do it via a custom Operator (there you have a constant access to output collector). Another workaround might be to register processing time service in your ProcessFunction.

@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
   ctx.timerService().registerProcessingTimeTimer(...);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
   // …
}

Piotrek

On 11 Jun 2018, at 01:07, Steven Wu <[hidden email]> wrote:

I have a process function defined with these interfaces

public class MyProcessFunction<IN> extends ProcessFunction<IN, OUT> 
    implements CheckpointedFunction, ProcessingTimeCallback {...}

In snapshotState() method, I want to close files and emit the metadata about the closed files to downstream operator. it doesn't seem possible with snapshotState(FunctionSnapshotContext context) interface.

I can keep metadata in snapshot and restore them during recovery. but if there is no input record coming for a long time,  processElement(T value, Context ctx, Collector<DataFile> out) won't be called. Then I can't forward the restored data to downstream operator with guaranteed latency.

I can add a timer. but it doesn't seem that onProcessingTime(long timestamp) allows me to forward output to downstream operator either.

Thanks,
Steven



Reply | Threaded
Open this post in threaded view
|

Re: how to emit record to downstream operator in snapshotState and/or onProcessingTime

Piotr Nowojski
Hi,

> Can you elaborate that a little bit? are you referring to "Output<StreamRecord<OUT>> output" in AbstractStreamOperator class?

Yes. However I have never tried it, so I’m not 100% sure there are no pit falls with that.

Regarding processing time timers. You should be able to register the timer once and then re-register in `onTimer(…)` callback using  `ctx.timerService()`.

Piotrek

On 11 Jun 2018, at 18:59, Steven Wu <[hidden email]> wrote:


@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
   ctx.timerService().registerProcessingTimeTimer(...);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
   // …
}

correcting myself regarding the above timer proposal. it still requires a message/record come in. I am trying to guard against when there is a long gap of idle. then I won't be able to register a timer.


On Mon, Jun 11, 2018 at 9:22 AM, Steven Wu <[hidden email]> wrote:
Pirotr,

However you could do it via a custom Operator (there you have a constant access to output collector). 

Can you elaborate that a little bit? are you referring to "Output<StreamRecord<OUT>> output" in AbstractStreamOperator class?

register processing time service in your ProcessFunction.

I think your timer proposal can work. 

I was originally register timer like this. ProcessingTimeCallback interface doesn't supply the Collector parameter

((StreamingRuntimeContext) getRuntimeContext())
    .getProcessingTimeService()
    .registerTimer(..., this);

Thanks, 
Steven



On Mon, Jun 11, 2018 at 2:52 AM, Piotr Nowojski <[hidden email]> wrote:
Hi,

Indeed it seems like this is not possible to emit records on checkpoint/snapshot through ProcessFunction. However you could do it via a custom Operator (there you have a constant access to output collector). Another workaround might be to register processing time service in your ProcessFunction.

@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
   ctx.timerService().registerProcessingTimeTimer(...);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
   // …
}

Piotrek

On 11 Jun 2018, at 01:07, Steven Wu <[hidden email]> wrote:

I have a process function defined with these interfaces

public class MyProcessFunction<IN> extends ProcessFunction<IN, OUT> 
    implements CheckpointedFunction, ProcessingTimeCallback {...}

In snapshotState() method, I want to close files and emit the metadata about the closed files to downstream operator. it doesn't seem possible with snapshotState(FunctionSnapshotContext context) interface.

I can keep metadata in snapshot and restore them during recovery. but if there is no input record coming for a long time,  processElement(T value, Context ctx, Collector<DataFile> out) won't be called. Then I can't forward the restored data to downstream operator with guaranteed latency.

I can add a timer. but it doesn't seem that onProcessingTime(long timestamp) allows me to forward output to downstream operator either.

Thanks,
Steven