Per the documentation:
"The meta data file of a Savepoint contains (primarily) pointers to all files on stable storage that are part of the Savepoint, in form of absolute paths." I somehow have a _metadata file that's 1.9GB. Running strings on it I find 962 strings, most of which look like HDFS paths, which leaves a lot of that file-size unexplained. What else is in there, and how exactly could this be happening? We're running 1.6. Jacob |
Hi Jacob,
Could you specify which StateBackend you are using? The reason I am asking is that, from the documentation in [1]: "Note that if you use the MemoryStateBackend, metadata and savepoint state will be stored in the _metadata file. Since it is self-contained, you may move the file and restore from any location." I am also cc'ing Gordon who may know a bit more about state formats. I hope this helps, Kostas [1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart <[hidden email]> wrote: > > Per the documentation: > > "The meta data file of a Savepoint contains (primarily) pointers to all files on stable storage that are part of the Savepoint, in form of absolute paths." > > I somehow have a _metadata file that's 1.9GB. Running strings on it I find 962 strings, most of which look like HDFS paths, which leaves a lot of that file-size unexplained. What else is in there, and how exactly could this be happening? > > We're running 1.6. > > Jacob |
Hi Jacob, Apart from what Klou already mentioned, one slightly possible reason: If you are using the FsStateBackend, it is also possible that your state is small enough to be considered to be stored inline within the metadata file. That is governed by the "state.backend.fs.memory-threshold" configuration, with a default value of 1024 bytes, or can also be configured with the `fileStateSizeThreshold` argument when constructing the `FsStateBackend`. The purpose of that threshold is to ensure that the backend does not create a large amount of very small files, where potentially the file pointers are actually larger than the state itself. Cheers, Gordon On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas <[hidden email]> wrote: Hi Jacob, |
Kostas and Gordon, Thanks for the suggestions! I'm on RocksDB. We don't have that setting configured so it should be at the default 1024b. This is the full "state.*" section showing in the JobManager UI. Jacob On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Jacob Sevart Software Engineer, Safety |
Hi Jacob,
As I said previously I am not 100% sure what can be causing this behavior, but this is a related thread here: https://lists.apache.org/thread.html/r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d%40%3Cuser.flink.apache.org%3E Which you can re-post your problem and monitor for answers. Cheers, Kostas On Wed, Mar 4, 2020 at 7:02 PM Jacob Sevart <[hidden email]> wrote: > > Kostas and Gordon, > > Thanks for the suggestions! I'm on RocksDB. We don't have that setting configured so it should be at the default 1024b. This is the full "state.*" section showing in the JobManager UI. > > > > Jacob > > On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote: >> >> Hi Jacob, >> >> Apart from what Klou already mentioned, one slightly possible reason: >> >> If you are using the FsStateBackend, it is also possible that your state is small enough to be considered to be stored inline within the metadata file. >> That is governed by the "state.backend.fs.memory-threshold" configuration, with a default value of 1024 bytes, or can also be configured with the `fileStateSizeThreshold` argument when constructing the `FsStateBackend`. >> The purpose of that threshold is to ensure that the backend does not create a large amount of very small files, where potentially the file pointers are actually larger than the state itself. >> >> Cheers, >> Gordon >> >> >> >> On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas <[hidden email]> wrote: >>> >>> Hi Jacob, >>> >>> Could you specify which StateBackend you are using? >>> >>> The reason I am asking is that, from the documentation in [1]: >>> >>> "Note that if you use the MemoryStateBackend, metadata and savepoint >>> state will be stored in the _metadata file. Since it is >>> self-contained, you may move the file and restore from any location." >>> >>> I am also cc'ing Gordon who may know a bit more about state formats. >>> >>> I hope this helps, >>> Kostas >>> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html >>> >>> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart <[hidden email]> wrote: >>> > >>> > Per the documentation: >>> > >>> > "The meta data file of a Savepoint contains (primarily) pointers to all files on stable storage that are part of the Savepoint, in form of absolute paths." >>> > >>> > I somehow have a _metadata file that's 1.9GB. Running strings on it I find 962 strings, most of which look like HDFS paths, which leaves a lot of that file-size unexplained. What else is in there, and how exactly could this be happening? >>> > >>> > We're running 1.6. >>> > >>> > Jacob > > > > -- > Jacob Sevart > Software Engineer, Safety |
Thanks, I will monitor that thread. I'm having a hard time following the serialization code, but if you know anything about the layout, tell me if this makes sense. What I see in the hex editor is, first, many HDFS paths. Then gigabytes of unreadable data. Then finally another HDFS path at the end. If it is putting state in there, under normal circumstances, does it make sense that it would be interleaved with metadata? I would expect all the metadata to come first, and then state. Jacob Jacob On Thu, Mar 5, 2020 at 10:53 AM Kostas Kloudas <[hidden email]> wrote: Hi Jacob, Jacob Sevart Software Engineer, Safety |
Hi As Gordon said, the metadata will contain the ByteStreamStateHandle, when writing out the ByteStreamStateHandle, will write out the handle name -- which is a path(as you saw). The ByteStreamStateHandle will be created when state size is small than `state.backend.fs.memory-threshold`(default is 1024). If you want to verify this, you can ref the unit test `CheckpointMetadataLoadingTest#testLoadAndValidateSavepoint` and load the metadata, you can find out that there are many `ByteStreamStateHandle`, and their names are the strings you saw in the metadata. Best, Congxian Jacob Sevart <[hidden email]> 于2020年3月6日周五 上午3:57写道:
|
Running Checkpoints.loadCheckpointMetadata under a debugger, I found something: subtaskState.managedOperatorState[0].sateNameToPartitionOffsets("startup-times").offsets.value weights 43MB (5.3 million longs). "startup-times" is an operator state of mine (union list of java.time.Instant). I see a way to end up fewer items in the list, but I'm not sure how the actual size is related to the number of offsets. Can you elaborate on that? Incidentally, 42.5MB is the number I got out of https://issues.apache.org/jira/browse/FLINK-14618. So I think my two problems are closely related. Jacob On Mon, Mar 9, 2020 at 6:36 AM Congxian Qiu <[hidden email]> wrote:
Jacob Sevart Software Engineer, Safety |
Oh, I should clarify that's 43MB per partition, so with 48 partitions it explains my 2GB. On Fri, Mar 13, 2020 at 7:21 PM Jacob Sevart <[hidden email]> wrote:
Jacob Sevart Software Engineer, Safety |
Hi Jacob, I think you are running into some deficiencies of Flink's union state here. The problem is that for every entry in your list state, Flink stores a separate offset (a long value). The reason for this behaviour is that we use the same state implementation for the union state as well as for the split state. For the latter, the offset information is required to split the state in case of changing the parallelism of your job. My recommendation would be to try to get rid of union state all together. The union state has primarily been introduced to checkpoint some source implementations and might become deprecated due to performance problems once these sources can be checkpointed differently. Cheers, Till On Sat, Mar 14, 2020 at 3:23 AM Jacob Sevart <[hidden email]> wrote:
|
Thanks! That would do it. I've disabled the operator for now. The purpose was to know the age of the job's state, so that we could consider its output in terms of how much context it knows. Regular state seemed insufficient because partitions might see their first traffic at different times. How would you go about implementing something like that? On Mon, Mar 16, 2020 at 1:54 PM Till Rohrmann <[hidden email]> wrote:
Jacob Sevart Software Engineer, Safety |
Did I understand you correctly that you use the union state to synchronize the per partition state across all operators in order to obtain a global overview? If this is the case, then this will only work in case of a failover. Only then, all operators are being restarted with the union of all operators state. If the job would never fail, then there would never be an exchange of state. If you really need a global view over your data, then you need to create an operator with a parallelism of 1 which records all the different timestamps. Another idea could be to use the broadcast state pattern [1]. You could have an operator which extracts the java.time.Instant and outputs them as a side output and simply forwards the records on the main output. Then you could use the side output as the broadcast input and the main output as the normal input into the broadcast operator. The problem with this approach might be that you don't get order guarantees between the side and the main output. Cheers, Till On Tue, Mar 17, 2020 at 2:29 AM Jacob Sevart <[hidden email]> wrote:
|
Thanks, makes sense. What about using the config mechanism? We're collecting and distributing some environment variables at startup, would it also work to include a timestamp with that? Also, would you be interested in a patch to note the caveat about union state metadata in the documentation? Jacob On Tue, Mar 17, 2020 at 2:51 AM Till Rohrmann <[hidden email]> wrote:
Jacob Sevart Software Engineer, Safety |
Hi Jacob, if you could create patch for updating the union state metadata documentation that would be great. I can help with the review and merging this patch. If the value stays fixed over the lifetime of the job and you know it before starting the job, then you could use the config mechanism. What won't work is if for every restart you would need a different value. Updating the config after a recovery is not possible. Cheers, Till On Fri, Mar 20, 2020 at 6:29 PM Jacob Sevart <[hidden email]> wrote:
|
Thanks, will do. I only want the time stamp to reset when the job comes up with no state. Checkpoint recoveries should keep the same value. Jacob On Sat, Mar 21, 2020 at 10:16 AM Till Rohrmann <[hidden email]> wrote:
Jacob Sevart Software Engineer, Safety |
On Sat, Mar 21, 2020 at 10:37 AM Jacob Sevart <[hidden email]> wrote:
Jacob Sevart Software Engineer, Safety |
Free forum by Nabble | Edit this page |