mark kafka stream as idle if no data comes in for a while in flink 1.10

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

mark kafka stream as idle if no data comes in for a while in flink 1.10

Akisaya
Hey there,
recently I have to join two streams while one of it may be idle for a long time, in flink 1.12, the Watermark Generator has a method `withIdleness` to detect if a stream is idle or not so that the operator can still advance its watermark by another active stream, and the state of this operator will continuously grow up.

But in flink 1.10, there's no such withIdleness method

After walking through the code,I found StreamSourceContexts#getSourceContext provides a param idleness which is hard coded to -1 in StreamSource#run.

StreamSourceContexts#getSourceContext
public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
TimeCharacteristic timeCharacteristic,
ProcessingTimeService processingTimeService,
Object checkpointLock,
StreamStatusMaintainer streamStatusMaintainer,
Output<StreamRecord<OUT>> output,
long watermarkInterval,
long idleTimeout) {


StreamSource#run
this.ctx = StreamSourceContexts.getSourceContext(
timeCharacteristic,
getProcessingTimeService(),
lockingObject,
streamStatusMaintainer,
collector,
watermarkInterval,
-1);

After extending a flink KafkaConnector and setting idleness using reflection, I found it works as I expected!



I'm very curious that why flink does not provide this param to user to determine if a stream is idle and what will be the side effect.

thx.




Reply | Threaded
Open this post in threaded view
|

Re: mark kafka stream as idle if no data comes in for a while in flink 1.10

Chesnay Schepler
The idleTimeout you found is from an earlier attempt at implementing idleness, but making it configurable was aborted midway through as there were some API issues. The effort was subsumed by a new source interface and watermark generators that were introduced in 1.12.

Some more details can be found in FLINK-5018.

On 1/11/2021 12:40 PM, Akisaya wrote:
Hey there,
recently I have to join two streams while one of it may be idle for a long time, in flink 1.12, the Watermark Generator has a method `withIdleness` to detect if a stream is idle or not so that the operator can still advance its watermark by another active stream, and the state of this operator will continuously grow up.

But in flink 1.10, there's no such withIdleness method

After walking through the code,I found StreamSourceContexts#getSourceContext provides a param idleness which is hard coded to -1 in StreamSource#run.

StreamSourceContexts#getSourceContext
public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
      TimeCharacteristic timeCharacteristic,
      ProcessingTimeService processingTimeService,
      Object checkpointLock,
      StreamStatusMaintainer streamStatusMaintainer,
      Output<StreamRecord<OUT>> output,
      long watermarkInterval,
      long idleTimeout) {


StreamSource#run
this.ctx = StreamSourceContexts.getSourceContext(
   timeCharacteristic,
   getProcessingTimeService(),
   lockingObject,
   streamStatusMaintainer,
   collector,
   watermarkInterval,
   -1);

After extending a flink KafkaConnector and setting idleness using reflection, I found it works as I expected!



I'm very curious that why flink does not provide this param to user to determine if a stream is idle and what will be the side effect.

thx.





Reply | Threaded
Open this post in threaded view
|

Re: mark kafka stream as idle if no data comes in for a while in flink 1.10

Akisaya
thank you @chesnay

I tried in vain to find the issue about introduction of  new watermark strategy, can you provide some details about it ?

Chesnay Schepler <[hidden email]> 于2021年1月11日周一 下午9:43写道:
The idleTimeout you found is from an earlier attempt at implementing idleness, but making it configurable was aborted midway through as there were some API issues. The effort was subsumed by a new source interface and watermark generators that were introduced in 1.12.

Some more details can be found in FLINK-5018.

On 1/11/2021 12:40 PM, Akisaya wrote:
Hey there,
recently I have to join two streams while one of it may be idle for a long time, in flink 1.12, the Watermark Generator has a method `withIdleness` to detect if a stream is idle or not so that the operator can still advance its watermark by another active stream, and the state of this operator will continuously grow up.

But in flink 1.10, there's no such withIdleness method

After walking through the code,I found StreamSourceContexts#getSourceContext provides a param idleness which is hard coded to -1 in StreamSource#run.

StreamSourceContexts#getSourceContext
public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
      TimeCharacteristic timeCharacteristic,
      ProcessingTimeService processingTimeService,
      Object checkpointLock,
      StreamStatusMaintainer streamStatusMaintainer,
      Output<StreamRecord<OUT>> output,
      long watermarkInterval,
      long idleTimeout) {


StreamSource#run
this.ctx = StreamSourceContexts.getSourceContext(
   timeCharacteristic,
   getProcessingTimeService(),
   lockingObject,
   streamStatusMaintainer,
   collector,
   watermarkInterval,
   -1);

After extending a flink KafkaConnector and setting idleness using reflection, I found it works as I expected!



I'm very curious that why flink does not provide this param to user to determine if a stream is idle and what will be the side effect.

thx.





Reply | Threaded
Open this post in threaded view
|

Re: mark kafka stream as idle if no data comes in for a while in flink 1.10

Chesnay Schepler
This is the parent ticket for the new source interface:FLINK-10740
This is the parent ticket for the reworked watermark generators:FLINK-17653

On 1/11/2021 5:16 PM, Akisaya wrote:
thank you @chesnay

I tried in vain to find the issue about introduction of  new watermark strategy, can you provide some details about it ?

Chesnay Schepler <[hidden email]> 于2021年1月11日周一 下午9:43写道:
The idleTimeout you found is from an earlier attempt at implementing idleness, but making it configurable was aborted midway through as there were some API issues. The effort was subsumed by a new source interface and watermark generators that were introduced in 1.12.

Some more details can be found in FLINK-5018.

On 1/11/2021 12:40 PM, Akisaya wrote:
Hey there,
recently I have to join two streams while one of it may be idle for a long time, in flink 1.12, the Watermark Generator has a method `withIdleness` to detect if a stream is idle or not so that the operator can still advance its watermark by another active stream, and the state of this operator will continuously grow up.

But in flink 1.10, there's no such withIdleness method

After walking through the code,I found StreamSourceContexts#getSourceContext provides a param idleness which is hard coded to -1 in StreamSource#run.

StreamSourceContexts#getSourceContext
public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
      TimeCharacteristic timeCharacteristic,
      ProcessingTimeService processingTimeService,
      Object checkpointLock,
      StreamStatusMaintainer streamStatusMaintainer,
      Output<StreamRecord<OUT>> output,
      long watermarkInterval,
      long idleTimeout) {


StreamSource#run
this.ctx = StreamSourceContexts.getSourceContext(
   timeCharacteristic,
   getProcessingTimeService(),
   lockingObject,
   streamStatusMaintainer,
   collector,
   watermarkInterval,
   -1);

After extending a flink KafkaConnector and setting idleness using reflection, I found it works as I expected!



I'm very curious that why flink does not provide this param to user to determine if a stream is idle and what will be the side effect.

thx.