[DISCUSS] Deprecate and remove UnionList OperatorState

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Deprecate and remove UnionList OperatorState

Aljoscha Krettek
Hi Devs,

@Users: I'm cc'ing the user ML to see if there are any users that are
relying on this feature. Please comment here if that is the case.

I'd like to discuss the deprecation and eventual removal of UnionList
Operator State, aka Operator State with Union Redistribution. If you
don't know what I'm talking about you can take a look in the
documentation: [1]. It's not documented thoroughly because it started
out as mostly an internal feature.

The immediate main reason for removing this is also mentioned in the
documentation: "Do not use this feature if your list may have high
cardinality. Checkpoint metadata will store an offset to each list
entry, which could lead to RPC framesize or out-of-memory errors." The
insidious part of this limitation is that you will only notice that
there is a problem when it is too late. Checkpointing will still work
and a program can continue when the state size is too big. The system
will only fail when trying to restore from a snapshot that has union
state that is too big. This could be fixed by working around that issue
but I think there are more long-term issues with this type of state.

I think we need to deprecate and remove API for state that is not tied
to a key. Keyed state is easy to reason about, the system can
re-partition state and also re-partition records and therefore scale the
system in and out. Operator state, on the other hand is not tied to a
key but an operator. This is a more "physical" concept, if you will,
that potentially ties business logic closer to the underlying runtime
execution model, which in turns means less degrees of freedom for the
framework, that is Flink. This is future work, though, but we should
start with deprecating union list state because it is the potentially
most dangerous type of state.

We currently use this state type internally in at least the
StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
we're in the process of hopefully getting rid of it there with our work
on sources and sinks. Before we fully remove it, we should of course
signal this to users by deprecating it.

What do you think?

Best,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Deprecate and remove UnionList OperatorState

Arvid Heise-3
+1 to getting rid of non-keyed state as is in general and for union state in particular. I had a hard time to wrap my head around the semantics of non-keyed state when designing the rescale of unaligned checkpoint.

The only plausible use cases are legacy source and sinks. Both should also be reworked in deprecated.

My main question is how to represent state in these two cases. For sources, state should probably be bound to splits. In that regard, split (id) may act as a key. More generally, there should be probably a concept that supersedes keys and includes splits.

For sinks, I can see two cases:
- Either we are in a keyed context, then state should be bound to the key.
- Or we are in a non-keyed context, then state might be bound to the split (?) in case of a source->sink chaining.
- Maybe it should also be a new(?) concept like output partition.

It's not clear to me if there are more cases and if we can always find a good way to bind state to some sort of key, especially for arbitrary communication patterns (which we may need to replace as well potentially).

On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek <[hidden email]> wrote:
Hi Devs,

@Users: I'm cc'ing the user ML to see if there are any users that are
relying on this feature. Please comment here if that is the case.

I'd like to discuss the deprecation and eventual removal of UnionList
Operator State, aka Operator State with Union Redistribution. If you
don't know what I'm talking about you can take a look in the
documentation: [1]. It's not documented thoroughly because it started
out as mostly an internal feature.

The immediate main reason for removing this is also mentioned in the
documentation: "Do not use this feature if your list may have high
cardinality. Checkpoint metadata will store an offset to each list
entry, which could lead to RPC framesize or out-of-memory errors." The
insidious part of this limitation is that you will only notice that
there is a problem when it is too late. Checkpointing will still work
and a program can continue when the state size is too big. The system
will only fail when trying to restore from a snapshot that has union
state that is too big. This could be fixed by working around that issue
but I think there are more long-term issues with this type of state.

I think we need to deprecate and remove API for state that is not tied
to a key. Keyed state is easy to reason about, the system can
re-partition state and also re-partition records and therefore scale the
system in and out. Operator state, on the other hand is not tied to a
key but an operator. This is a more "physical" concept, if you will,
that potentially ties business logic closer to the underlying runtime
execution model, which in turns means less degrees of freedom for the
framework, that is Flink. This is future work, though, but we should
start with deprecating union list state because it is the potentially
most dangerous type of state.

We currently use this state type internally in at least the
StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
we're in the process of hopefully getting rid of it there with our work
on sources and sinks. Before we fully remove it, we should of course
signal this to users by deprecating it.

What do you think?

Best,
Aljoscha


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Deprecate and remove UnionList OperatorState

Seth Wiesman
Generally +1

The one use case I've seen of union state I've seen in production (outside of sources and sinks) is as a "poor mans" broadcast state. This was obviously before that feature was added which is now a few years ago so I don't know if those pipelines still exist. FWIW, if they do the state processor api can provide a migration path as it supports rewriting union state as broadcast state.

Seth

On Wed, Sep 9, 2020 at 10:21 AM Arvid Heise <[hidden email]> wrote:
+1 to getting rid of non-keyed state as is in general and for union state
in particular. I had a hard time to wrap my head around the semantics of
non-keyed state when designing the rescale of unaligned checkpoint.

The only plausible use cases are legacy source and sinks. Both should also
be reworked in deprecated.

My main question is how to represent state in these two cases. For sources,
state should probably be bound to splits. In that regard, split (id) may
act as a key. More generally, there should be probably a concept that
supersedes keys and includes splits.

For sinks, I can see two cases:
- Either we are in a keyed context, then state should be bound to the key.
- Or we are in a non-keyed context, then state might be bound to the split
(?) in case of a source->sink chaining.
- Maybe it should also be a new(?) concept like output partition.

It's not clear to me if there are more cases and if we can always find a
good way to bind state to some sort of key, especially for arbitrary
communication patterns (which we may need to replace as well potentially).

On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek <[hidden email]> wrote:

> Hi Devs,
>
> @Users: I'm cc'ing the user ML to see if there are any users that are
> relying on this feature. Please comment here if that is the case.
>
> I'd like to discuss the deprecation and eventual removal of UnionList
> Operator State, aka Operator State with Union Redistribution. If you
> don't know what I'm talking about you can take a look in the
> documentation: [1]. It's not documented thoroughly because it started
> out as mostly an internal feature.
>
> The immediate main reason for removing this is also mentioned in the
> documentation: "Do not use this feature if your list may have high
> cardinality. Checkpoint metadata will store an offset to each list
> entry, which could lead to RPC framesize or out-of-memory errors." The
> insidious part of this limitation is that you will only notice that
> there is a problem when it is too late. Checkpointing will still work
> and a program can continue when the state size is too big. The system
> will only fail when trying to restore from a snapshot that has union
> state that is too big. This could be fixed by working around that issue
> but I think there are more long-term issues with this type of state.
>
> I think we need to deprecate and remove API for state that is not tied
> to a key. Keyed state is easy to reason about, the system can
> re-partition state and also re-partition records and therefore scale the
> system in and out. Operator state, on the other hand is not tied to a
> key but an operator. This is a more "physical" concept, if you will,
> that potentially ties business logic closer to the underlying runtime
> execution model, which in turns means less degrees of freedom for the
> framework, that is Flink. This is future work, though, but we should
> start with deprecating union list state because it is the potentially
> most dangerous type of state.
>
> We currently use this state type internally in at least the
> StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
> we're in the process of hopefully getting rid of it there with our work
> on sources and sinks. Before we fully remove it, we should of course
> signal this to users by deprecating it.
>
> What do you think?
>
> Best,
> Aljoscha
>


--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Deprecate and remove UnionList OperatorState

Alexey Trenikhun
-1

We use union state to generate sequences, each operator generates offset0 + number-of-tasks -  task-index + task-specific-counter * number-of-tasks (e.g. for 2 instances of operator -one instance produce even number, another odd). Last generated sequence number is stored union list state, on restart from where we should start to avoid collision with already generated numbers, to do saw we calculate offset0 as max over union list state.

Alexey


From: Seth Wiesman <[hidden email]>
Sent: Wednesday, September 9, 2020 9:37:03 AM
To: dev <[hidden email]>
Cc: Aljoscha Krettek <[hidden email]>; user <[hidden email]>
Subject: Re: [DISCUSS] Deprecate and remove UnionList OperatorState
 
Generally +1

The one use case I've seen of union state I've seen in production (outside of sources and sinks) is as a "poor mans" broadcast state. This was obviously before that feature was added which is now a few years ago so I don't know if those pipelines still exist. FWIW, if they do the state processor api can provide a migration path as it supports rewriting union state as broadcast state.

Seth

On Wed, Sep 9, 2020 at 10:21 AM Arvid Heise <[hidden email]> wrote:
+1 to getting rid of non-keyed state as is in general and for union state
in particular. I had a hard time to wrap my head around the semantics of
non-keyed state when designing the rescale of unaligned checkpoint.

The only plausible use cases are legacy source and sinks. Both should also
be reworked in deprecated.

My main question is how to represent state in these two cases. For sources,
state should probably be bound to splits. In that regard, split (id) may
act as a key. More generally, there should be probably a concept that
supersedes keys and includes splits.

For sinks, I can see two cases:
- Either we are in a keyed context, then state should be bound to the key.
- Or we are in a non-keyed context, then state might be bound to the split
(?) in case of a source->sink chaining.
- Maybe it should also be a new(?) concept like output partition.

It's not clear to me if there are more cases and if we can always find a
good way to bind state to some sort of key, especially for arbitrary
communication patterns (which we may need to replace as well potentially).

On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek <[hidden email]> wrote:

> Hi Devs,
>
> @Users: I'm cc'ing the user ML to see if there are any users that are
> relying on this feature. Please comment here if that is the case.
>
> I'd like to discuss the deprecation and eventual removal of UnionList
> Operator State, aka Operator State with Union Redistribution. If you
> don't know what I'm talking about you can take a look in the
> documentation: [1]. It's not documented thoroughly because it started
> out as mostly an internal feature.
>
> The immediate main reason for removing this is also mentioned in the
> documentation: "Do not use this feature if your list may have high
> cardinality. Checkpoint metadata will store an offset to each list
> entry, which could lead to RPC framesize or out-of-memory errors." The
> insidious part of this limitation is that you will only notice that
> there is a problem when it is too late. Checkpointing will still work
> and a program can continue when the state size is too big. The system
> will only fail when trying to restore from a snapshot that has union
> state that is too big. This could be fixed by working around that issue
> but I think there are more long-term issues with this type of state.
>
> I think we need to deprecate and remove API for state that is not tied
> to a key. Keyed state is easy to reason about, the system can
> re-partition state and also re-partition records and therefore scale the
> system in and out. Operator state, on the other hand is not tied to a
> key but an operator. This is a more "physical" concept, if you will,
> that potentially ties business logic closer to the underlying runtime
> execution model, which in turns means less degrees of freedom for the
> framework, that is Flink. This is future work, though, but we should
> start with deprecating union list state because it is the potentially
> most dangerous type of state.
>
> We currently use this state type internally in at least the
> StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
> we're in the process of hopefully getting rid of it there with our work
> on sources and sinks. Before we fully remove it, we should of course
> signal this to users by deprecating it.
>
> What do you think?
>
> Best,
> Aljoscha
>


--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Deprecate and remove UnionList OperatorState

Steven Wu
Right now, we use UnionState to store the `nextCheckpointId` in the Iceberg sink use case, because we can't retrieve the checkpointId from the FunctionInitializationContext during the restore case. But we can move away from it if the restore context provides the checkpointId.

On Sat, Sep 12, 2020 at 8:20 AM Alexey Trenikhun <[hidden email]> wrote:
-1

We use union state to generate sequences, each operator generates offset0 + number-of-tasks -  task-index + task-specific-counter * number-of-tasks (e.g. for 2 instances of operator -one instance produce even number, another odd). Last generated sequence number is stored union list state, on restart from where we should start to avoid collision with already generated numbers, to do saw we calculate offset0 as max over union list state.

Alexey


From: Seth Wiesman <[hidden email]>
Sent: Wednesday, September 9, 2020 9:37:03 AM
To: dev <[hidden email]>
Cc: Aljoscha Krettek <[hidden email]>; user <[hidden email]>
Subject: Re: [DISCUSS] Deprecate and remove UnionList OperatorState
 
Generally +1

The one use case I've seen of union state I've seen in production (outside of sources and sinks) is as a "poor mans" broadcast state. This was obviously before that feature was added which is now a few years ago so I don't know if those pipelines still exist. FWIW, if they do the state processor api can provide a migration path as it supports rewriting union state as broadcast state.

Seth

On Wed, Sep 9, 2020 at 10:21 AM Arvid Heise <[hidden email]> wrote:
+1 to getting rid of non-keyed state as is in general and for union state
in particular. I had a hard time to wrap my head around the semantics of
non-keyed state when designing the rescale of unaligned checkpoint.

The only plausible use cases are legacy source and sinks. Both should also
be reworked in deprecated.

My main question is how to represent state in these two cases. For sources,
state should probably be bound to splits. In that regard, split (id) may
act as a key. More generally, there should be probably a concept that
supersedes keys and includes splits.

For sinks, I can see two cases:
- Either we are in a keyed context, then state should be bound to the key.
- Or we are in a non-keyed context, then state might be bound to the split
(?) in case of a source->sink chaining.
- Maybe it should also be a new(?) concept like output partition.

It's not clear to me if there are more cases and if we can always find a
good way to bind state to some sort of key, especially for arbitrary
communication patterns (which we may need to replace as well potentially).

On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek <[hidden email]> wrote:

> Hi Devs,
>
> @Users: I'm cc'ing the user ML to see if there are any users that are
> relying on this feature. Please comment here if that is the case.
>
> I'd like to discuss the deprecation and eventual removal of UnionList
> Operator State, aka Operator State with Union Redistribution. If you
> don't know what I'm talking about you can take a look in the
> documentation: [1]. It's not documented thoroughly because it started
> out as mostly an internal feature.
>
> The immediate main reason for removing this is also mentioned in the
> documentation: "Do not use this feature if your list may have high
> cardinality. Checkpoint metadata will store an offset to each list
> entry, which could lead to RPC framesize or out-of-memory errors." The
> insidious part of this limitation is that you will only notice that
> there is a problem when it is too late. Checkpointing will still work
> and a program can continue when the state size is too big. The system
> will only fail when trying to restore from a snapshot that has union
> state that is too big. This could be fixed by working around that issue
> but I think there are more long-term issues with this type of state.
>
> I think we need to deprecate and remove API for state that is not tied
> to a key. Keyed state is easy to reason about, the system can
> re-partition state and also re-partition records and therefore scale the
> system in and out. Operator state, on the other hand is not tied to a
> key but an operator. This is a more "physical" concept, if you will,
> that potentially ties business logic closer to the underlying runtime
> execution model, which in turns means less degrees of freedom for the
> framework, that is Flink. This is future work, though, but we should
> start with deprecating union list state because it is the potentially
> most dangerous type of state.
>
> We currently use this state type internally in at least the
> StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
> we're in the process of hopefully getting rid of it there with our work
> on sources and sinks. Before we fully remove it, we should of course
> signal this to users by deprecating it.
>
> What do you think?
>
> Best,
> Aljoscha
>


--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Deprecate and remove UnionList OperatorState

Aljoscha Krettek
On 14.09.20 02:20, Steven Wu wrote:
> Right now, we use UnionState to store the `nextCheckpointId` in the Iceberg
> sink use case, because we can't retrieve the checkpointId from
> the FunctionInitializationContext during the restore case. But we can move
> away from it if the restore context provides the checkpointId.

Is the code for this available in the open source? I checked the Iceberg
sink that's available in Iceberg proper and the one in Netflix
Skunkworks:
https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java#L228

Both of them are only using operator state, not the union variant.

Best,
Aljoscha