Hi Devs,
@Users: I'm cc'ing the user ML to see if there are any users that are relying on this feature. Please comment here if that is the case. I'd like to discuss the deprecation and eventual removal of UnionList Operator State, aka Operator State with Union Redistribution. If you don't know what I'm talking about you can take a look in the documentation: [1]. It's not documented thoroughly because it started out as mostly an internal feature. The immediate main reason for removing this is also mentioned in the documentation: "Do not use this feature if your list may have high cardinality. Checkpoint metadata will store an offset to each list entry, which could lead to RPC framesize or out-of-memory errors." The insidious part of this limitation is that you will only notice that there is a problem when it is too late. Checkpointing will still work and a program can continue when the state size is too big. The system will only fail when trying to restore from a snapshot that has union state that is too big. This could be fixed by working around that issue but I think there are more long-term issues with this type of state. I think we need to deprecate and remove API for state that is not tied to a key. Keyed state is easy to reason about, the system can re-partition state and also re-partition records and therefore scale the system in and out. Operator state, on the other hand is not tied to a key but an operator. This is a more "physical" concept, if you will, that potentially ties business logic closer to the underlying runtime execution model, which in turns means less degrees of freedom for the framework, that is Flink. This is future work, though, but we should start with deprecating union list state because it is the potentially most dangerous type of state. We currently use this state type internally in at least the StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However, we're in the process of hopefully getting rid of it there with our work on sources and sinks. Before we fully remove it, we should of course signal this to users by deprecating it. What do you think? Best, Aljoscha |
+1 to getting rid of non-keyed state as is in general and for union state in particular. I had a hard time to wrap my head around the semantics of non-keyed state when designing the rescale of unaligned checkpoint. The only plausible use cases are legacy source and sinks. Both should also be reworked in deprecated. My main question is how to represent state in these two cases. For sources, state should probably be bound to splits. In that regard, split (id) may act as a key. More generally, there should be probably a concept that supersedes keys and includes splits. For sinks, I can see two cases: - Either we are in a keyed context, then state should be bound to the key. - Or we are in a non-keyed context, then state might be bound to the split (?) in case of a source->sink chaining. - Maybe it should also be a new(?) concept like output partition. It's not clear to me if there are more cases and if we can always find a good way to bind state to some sort of key, especially for arbitrary communication patterns (which we may need to replace as well potentially). On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek <[hidden email]> wrote: Hi Devs, -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Generally +1 The one use case I've seen of union state I've seen in production (outside of sources and sinks) is as a "poor mans" broadcast state. This was obviously before that feature was added which is now a few years ago so I don't know if those pipelines still exist. FWIW, if they do the state processor api can provide a migration path as it supports rewriting union state as broadcast state. Seth On Wed, Sep 9, 2020 at 10:21 AM Arvid Heise <[hidden email]> wrote: +1 to getting rid of non-keyed state as is in general and for union state |
-1
We use union state to generate sequences, each operator generates offset0 + number-of-tasks - task-index + task-specific-counter * number-of-tasks (e.g. for 2 instances of operator -one instance produce even number, another odd).
Last generated sequence number is stored union list state, on restart from where we should start to avoid collision with already generated numbers, to do saw we calculate offset0 as max over union list state.
Alexey
From: Seth Wiesman <[hidden email]>
Sent: Wednesday, September 9, 2020 9:37:03 AM To: dev <[hidden email]> Cc: Aljoscha Krettek <[hidden email]>; user <[hidden email]> Subject: Re: [DISCUSS] Deprecate and remove UnionList OperatorState Generally +1
The one use case I've seen of union state I've seen in production (outside of sources and sinks) is as a "poor mans" broadcast state. This was obviously before that feature was added which is now a few years ago so I don't know if those pipelines still
exist. FWIW, if they do the state processor api can provide a migration path as it supports rewriting union state as broadcast state.
Seth
On Wed, Sep 9, 2020 at 10:21 AM Arvid Heise <[hidden email]> wrote:
+1 to getting rid of non-keyed state as is in general and for union state |
Right now, we use UnionState to store the `nextCheckpointId` in the Iceberg sink use case, because we can't retrieve the checkpointId from the FunctionInitializationContext during the restore case. But we can move away from it if the restore context provides the checkpointId. On Sat, Sep 12, 2020 at 8:20 AM Alexey Trenikhun <[hidden email]> wrote:
|
On 14.09.20 02:20, Steven Wu wrote:
> Right now, we use UnionState to store the `nextCheckpointId` in the Iceberg > sink use case, because we can't retrieve the checkpointId from > the FunctionInitializationContext during the restore case. But we can move > away from it if the restore context provides the checkpointId. Is the code for this available in the open source? I checked the Iceberg sink that's available in Iceberg proper and the one in Netflix Skunkworks: https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java#L228 Both of them are only using operator state, not the union variant. Best, Aljoscha |
Free forum by Nabble | Edit this page |