Hey there,
recently I have to join two streams while one of it may be idle for a long time, in flink 1.12, the Watermark Generator has a method `withIdleness` to detect if a stream is idle or not so that the operator can still advance its watermark by another active stream, and the state of this operator will continuously grow up. But in flink 1.10, there's no such withIdleness method flink 1.10 docs mention a workaround in https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission,but this doesn't work well. After walking through the code,I found StreamSourceContexts#getSourceContext provides a param idleness which is hard coded to -1 in StreamSource#run. StreamSourceContexts#getSourceContext public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext( StreamSource#run this.ctx = StreamSourceContexts.getSourceContext( After extending a flink KafkaConnector and setting idleness using reflection, I found it works as I expected! I'm very curious that why flink does not provide this param to user to determine if a stream is idle and what will be the side effect. thx. |
The idleTimeout you found is from an
earlier attempt at implementing idleness, but making it
configurable was aborted midway through as there were some API
issues. The effort was subsumed by a new source interface and
watermark generators that were introduced in 1.12.
Some more details can be found in FLINK-5018.
On 1/11/2021 12:40 PM, Akisaya wrote:
|
thank you @chesnay I tried in vain to find the issue about introduction of new watermark strategy, can you provide some details about it ? Chesnay Schepler <[hidden email]> 于2021年1月11日周一 下午9:43写道:
|
This is the parent ticket for the new
source interface:FLINK-10740
This is the parent ticket for the
reworked watermark generators:FLINK-17653
On 1/11/2021 5:16 PM, Akisaya wrote:
|
Free forum by Nabble | Edit this page |