Hi,
Some context: after a refactoring, we were unable to start our jobs. They started fine and checkpointed fine, but once the job restarted owing to a transient failure, the application was unable to start. The Job Manager was OOM'ing (even when I gave them 256GB of ram!). The `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside the `_metadata` file we saw `- 1402496 offsets: com.stripe.flink.backfill.kafka-archive-file-progress`. This happened to be the operator state we were no longer initializing or snapshotting after the refactoring. Before I dig further into this and try to find a smaller reproducible test case I thought I would ask if someone knows what the expected behaviour is for the following scenario: suppose you have an operator (in this case a Source) which has some operator ListState. Suppose you run your flink job for some time and then later refactor your job such that you no longer use that state (so after the refactoring you're no longer initializing this operator state in initializeState, nor are you snapshotting the operator state in snapshotState). If you launch your new code from a recent savepoint, what do we expect to happen to the state? Do we anticipate the behaviour I explained above? My assumption would be that Flink would not read this state and so it would be removed from the next checkpoint or savepoint. Alternatively, I might assume it would not be read but would linger around every future checkpoint or savepoint. However, it feels like what is happening is it's not read and then possibly replicated by every instance of the task every time a checkpoint happens (hence the accidentally exponential behaviour). Thoughts? PS - in case someone asks: I was sure that we were calling `.clear()` appropriately in `snapshotState` (we, uh, already learned that lesson :D) Best, Aaron Levin |
Hi Do you use UNION state in your scenario, when using UNION state, then JM may encounter OOM because each TDD will contains all the state of all subtasks[1] Aaron Levin <[hidden email]> 于2019年11月27日周三 上午3:55写道: Hi, |
Hi,
Yes, we're using UNION state. I would assume, though, that if you are not reading the UNION state it would either stop stick around as a constant factor in your state size, or get cleared. Looks like I should try to recreate a small example and submit a bug if this is true. Otherwise it's impossible to remove union state from your operators. On Wed, Nov 27, 2019 at 6:50 AM Congxian Qiu <[hidden email]> wrote: > > Hi > > Do you use UNION state in your scenario, when using UNION state, then JM may encounter OOM because each TDD will contains all the state of all subtasks[1] > > [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state > Best, > Congxian > > > Aaron Levin <[hidden email]> 于2019年11月27日周三 上午3:55写道: >> >> Hi, >> >> Some context: after a refactoring, we were unable to start our jobs. >> They started fine and checkpointed fine, but once the job restarted >> owing to a transient failure, the application was unable to start. The >> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The >> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside >> the `_metadata` file we saw `- 1402496 offsets: >> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened >> to be the operator state we were no longer initializing or >> snapshotting after the refactoring. >> >> Before I dig further into this and try to find a smaller reproducible >> test case I thought I would ask if someone knows what the expected >> behaviour is for the following scenario: >> >> suppose you have an operator (in this case a Source) which has some >> operator ListState. Suppose you run your flink job for some time and >> then later refactor your job such that you no longer use that state >> (so after the refactoring you're no longer initializing this operator >> state in initializeState, nor are you snapshotting the operator state >> in snapshotState). If you launch your new code from a recent >> savepoint, what do we expect to happen to the state? Do we anticipate >> the behaviour I explained above? >> >> My assumption would be that Flink would not read this state and so it >> would be removed from the next checkpoint or savepoint. Alternatively, >> I might assume it would not be read but would linger around every >> future checkpoint or savepoint. However, it feels like what is >> happening is it's not read and then possibly replicated by every >> instance of the task every time a checkpoint happens (hence the >> accidentally exponential behaviour). >> >> Thoughts? >> >> PS - in case someone asks: I was sure that we were calling `.clear()` >> appropriately in `snapshotState` (we, uh, already learned that lesson >> :D) >> >> Best, >> >> Aaron Levin |
You are right Aaron. I would say this is like this by design as Flink doesn't require you to initialize state in the open method so it has no safe way to delete the non-referenced ones. What you can do is restore the state and clear it on all operators and not reference it again. I know this feels like a workaround but I have no better idea at the moment. Cheers, Gyula On Wed, Nov 27, 2019 at 6:08 PM Aaron Levin <[hidden email]> wrote: Hi, |
Thanks for the clarification. I'll try to find some time to write a
reproducible test case and submit a ticket. While it may not be able to delete the non-referenced ones, I'm surprised it's exponentially replicating them, and so it's probably worth documenting in a ticket. On Wed, Nov 27, 2019 at 12:15 PM Gyula Fóra <[hidden email]> wrote: > > You are right Aaron. > > I would say this is like this by design as Flink doesn't require you to initialize state in the open method so it has no safe way to delete the non-referenced ones. > > What you can do is restore the state and clear it on all operators and not reference it again. I know this feels like a workaround but I have no better idea at the moment. > > Cheers, > Gyula > > On Wed, Nov 27, 2019 at 6:08 PM Aaron Levin <[hidden email]> wrote: >> >> Hi, >> >> Yes, we're using UNION state. I would assume, though, that if you are >> not reading the UNION state it would either stop stick around as a >> constant factor in your state size, or get cleared. >> >> Looks like I should try to recreate a small example and submit a bug >> if this is true. Otherwise it's impossible to remove union state from >> your operators. >> >> On Wed, Nov 27, 2019 at 6:50 AM Congxian Qiu <[hidden email]> wrote: >> > >> > Hi >> > >> > Do you use UNION state in your scenario, when using UNION state, then JM may encounter OOM because each TDD will contains all the state of all subtasks[1] >> > >> > [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state >> > Best, >> > Congxian >> > >> > >> > Aaron Levin <[hidden email]> 于2019年11月27日周三 上午3:55写道: >> >> >> >> Hi, >> >> >> >> Some context: after a refactoring, we were unable to start our jobs. >> >> They started fine and checkpointed fine, but once the job restarted >> >> owing to a transient failure, the application was unable to start. The >> >> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The >> >> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside >> >> the `_metadata` file we saw `- 1402496 offsets: >> >> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened >> >> to be the operator state we were no longer initializing or >> >> snapshotting after the refactoring. >> >> >> >> Before I dig further into this and try to find a smaller reproducible >> >> test case I thought I would ask if someone knows what the expected >> >> behaviour is for the following scenario: >> >> >> >> suppose you have an operator (in this case a Source) which has some >> >> operator ListState. Suppose you run your flink job for some time and >> >> then later refactor your job such that you no longer use that state >> >> (so after the refactoring you're no longer initializing this operator >> >> state in initializeState, nor are you snapshotting the operator state >> >> in snapshotState). If you launch your new code from a recent >> >> savepoint, what do we expect to happen to the state? Do we anticipate >> >> the behaviour I explained above? >> >> >> >> My assumption would be that Flink would not read this state and so it >> >> would be removed from the next checkpoint or savepoint. Alternatively, >> >> I might assume it would not be read but would linger around every >> >> future checkpoint or savepoint. However, it feels like what is >> >> happening is it's not read and then possibly replicated by every >> >> instance of the task every time a checkpoint happens (hence the >> >> accidentally exponential behaviour). >> >> >> >> Thoughts? >> >> >> >> PS - in case someone asks: I was sure that we were calling `.clear()` >> >> appropriately in `snapshotState` (we, uh, already learned that lesson >> >> :D) >> >> >> >> Best, >> >> >> >> Aaron Levin |
Free forum by Nabble | Edit this page |