http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/What-happens-to-a-Source-s-Operator-State-if-it-stops-being-initialized-and-snapshotted-Accidentally-tp31260p31420.html
Thanks for the clarification. I'll try to find some time to write a
reproducible test case and submit a ticket. While it may not be able
>
> You are right Aaron.
>
> I would say this is like this by design as Flink doesn't require you to initialize state in the open method so it has no safe way to delete the non-referenced ones.
>
> What you can do is restore the state and clear it on all operators and not reference it again. I know this feels like a workaround but I have no better idea at the moment.
>
> Cheers,
> Gyula
>
> On Wed, Nov 27, 2019 at 6:08 PM Aaron Levin <
[hidden email]> wrote:
>>
>> Hi,
>>
>> Yes, we're using UNION state. I would assume, though, that if you are
>> not reading the UNION state it would either stop stick around as a
>> constant factor in your state size, or get cleared.
>>
>> Looks like I should try to recreate a small example and submit a bug
>> if this is true. Otherwise it's impossible to remove union state from
>> your operators.
>>
>> On Wed, Nov 27, 2019 at 6:50 AM Congxian Qiu <
[hidden email]> wrote:
>> >
>> > Hi
>> >
>> > Do you use UNION state in your scenario, when using UNION state, then JM may encounter OOM because each TDD will contains all the state of all subtasks[1]
>> >
>> > [1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state>> > Best,
>> > Congxian
>> >
>> >
>> > Aaron Levin <
[hidden email]> 于2019年11月27日周三 上午3:55写道:
>> >>
>> >> Hi,
>> >>
>> >> Some context: after a refactoring, we were unable to start our jobs.
>> >> They started fine and checkpointed fine, but once the job restarted
>> >> owing to a transient failure, the application was unable to start. The
>> >> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The
>> >> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside
>> >> the `_metadata` file we saw `- 1402496 offsets:
>> >> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened
>> >> to be the operator state we were no longer initializing or
>> >> snapshotting after the refactoring.
>> >>
>> >> Before I dig further into this and try to find a smaller reproducible
>> >> test case I thought I would ask if someone knows what the expected
>> >> behaviour is for the following scenario:
>> >>
>> >> suppose you have an operator (in this case a Source) which has some
>> >> operator ListState. Suppose you run your flink job for some time and
>> >> then later refactor your job such that you no longer use that state
>> >> (so after the refactoring you're no longer initializing this operator
>> >> state in initializeState, nor are you snapshotting the operator state
>> >> in snapshotState). If you launch your new code from a recent
>> >> savepoint, what do we expect to happen to the state? Do we anticipate
>> >> the behaviour I explained above?
>> >>
>> >> My assumption would be that Flink would not read this state and so it
>> >> would be removed from the next checkpoint or savepoint. Alternatively,
>> >> I might assume it would not be read but would linger around every
>> >> future checkpoint or savepoint. However, it feels like what is
>> >> happening is it's not read and then possibly replicated by every
>> >> instance of the task every time a checkpoint happens (hence the
>> >> accidentally exponential behaviour).
>> >>
>> >> Thoughts?
>> >>
>> >> PS - in case someone asks: I was sure that we were calling `.clear()`
>> >> appropriately in `snapshotState` (we, uh, already learned that lesson
>> >> :D)
>> >>
>> >> Best,
>> >>
>> >> Aaron Levin