Hi!
I'm running a backfill Flink stream job over older data. It has multiple interval joins. I noticed my checkpoint is regularly gaining in size. I'd expect my checkpoints to stabilize and not grow. Is there a setting to prune useless data from the checkpoint? My top guess is that my checkpoint has a bunch of useless state in it. - Dan |
Hi Dan, Have you use a too large upperBound or lowerBound? If not, could you also check the watermark strategy ? The interval join operator depends on the event-time timer for cleanup, and the event-time timer would be triggered via watermark. Best, Yun
|
Hi Yun! Thanks for the quick reply. One of the lowerBounds is large but the table being joined with is ~500 rows. I also have my own operator that only outputs the first value. public class OnlyFirstUser<T extends GeneratedMessageV3> extends RichFlatMapFunction<T, T> { private transient ValueState<Boolean> alreadyOutputted; @Override public void flatMap(T value, Collector<T> out) throws Exception { if (!alreadyOutputted.value()) { alreadyOutputted.update(true); out.collect(value); } } @Override public void open(Configuration config) { ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>( "alreadyOutputted", // the state name TypeInformation.of(new TypeHint<Boolean>() {}), // type information false); // default value of the state, if nothing was set alreadyOutputted = getRuntimeContext().getState(descriptor); } } All of my inputs have this watermark strategy. In the Flink UI, early in the job run, I see "Low Watermarks" on each node and they increase. After some checkpoint failures, low watermarks stop appearing in the UI. .assignTimestampsAndWatermarks( WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1))); Thanks Yun! On Mon, Mar 8, 2021 at 7:27 AM Yun Gao <[hidden email]> wrote:
|
Hi Dan, Regarding the original checkpoint size problem, could you also have a check which tasks' state are increasing from the checkpoint UI ? For example, the attached operator has a `alreadyOutputed` value state, which seems to keep increasing if there are always new keys ? Best, Yun
|
Hi Yun! That advice was useful. The state for that operator is very small (31kb). Most of the checkpoint size is in a couple simple DataStream.intervalJoin operators. The time intervals are fairly short. I'm going to try running the code with some small configuration changes. One thing I did notice is that I set a positive value for the relativeUpperBound. I'm not sure if I found a bug in IntervalJoinOperator. The logic in IntervalJoinOperator.onEventTime needs an exact timestamp for clean up. It has some logic around cleaning up the right side that uses timerTimestamp + lowerBound. However, processElement doesn’t use the same logic when creating a timer (I only see + lowerBound). Maybe I'm misreading the code. It feels like a bug. On Mon, Mar 8, 2021 at 10:29 PM Yun Gao <[hidden email]> wrote:
|
Hey Dan, I think the logic should be correct. Mind that in the processElement we are using *relative*Upper/LowerBound, which are inverted global bound: relativeUpperBound = upperBound for left and -lowerBound for right relativeLowerBound = lowerBound for left and -upperBound for right Therefore the cleaning logic in onTimer effectively uses the same logic. If I understand it correctly, this trick was introduced to deduplicate the method. There might be a bug somewhere, but I don't think it's where you pointed. I'd suggest to first investigate the progress of watermarks. Best, Dawid On 09/03/2021 08:36, Dan Hill wrote:
OpenPGP_signature (855 bytes) Download Attachment |
I figured it out. I have some records with the same key and I was doing an IntervalJoin. One of the IntervalJoin implementations that I found looks like it the runtime increases exponentially when there are duplicate keys. I introduced a de-duping step and it works a lot faster. On Thu, Mar 11, 2021 at 5:30 AM Dawid Wysakowicz <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |