MapState with List Type for values

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

MapState with List Type for values

Aaron Langford
Hello Flink Community,

I have a question about using MapState with lists as values. Here is a description of the use case:

I have an operator over a keyed stream where for each record that comes, it needs to look into some state to determine if a value has arrived or not. If the value has not arrived yet, the record needs to be en-queued for when the value will eventually arrive. When that value does arrive, the queued records need to be flushed, and associated list state cleared.

Here's an emoji representation of the stream:
---------------------------------------------------------------------
πŸΏπŸ¦žπŸΏπŸ¦„πŸΏπŸ¦„πŸ¦„πŸ‰ ...
---------------------------------------------------------------------
Where 🐿 must wait for at least 1 🦞 to be output (otherwise be queued) andΒ πŸ¦„Β must wait forΒ πŸ‰to be output (otherwise be queued). If you can't tell, this is basically/sort of a join without a window.

The output should be something abstractly like this:

---------------------------------------------------------------------
{🐿,🦞}{🐿,🦞}{🐿,🦞}{πŸ¦„,πŸ‰}{πŸ¦„,πŸ‰}{πŸ¦„,πŸ‰}...
---------------------------------------------------------------------

Many records might be en-queued while waiting for a given value. Many records may be waiting for many different values, but any record that is en-queued will only be waiting for a record of one type to show up.

Records on this stream are keyed by some shared parent key, and I have reasons to avoid keying the stream on the "join key" as it were. Namely I'm getting a CDC stream with a lot of different tables, and I want to avoid a topology with N operators for N different tables if I can.

If I lean on MapState<KEY, Seq<VALUE>> to get this done for me, then my job suffers considerably in terms of performance. I believe one of the biggest bottlenecks is that for each time I need to interact with a Seq<VALUE> (like for appends), I must deserialize the entire list.

Is there a way to set up a MapState whose value is a ListState? Or is there any guidance for how I might serialize/deserialize a list type in the MapState in such a way that appends aren't so expensive? Open to other suggestions/approaches as well.

Aaron
Reply | Threaded
Open this post in threaded view
|

Re: MapState with List Type for values

Yun Tang
Hi Aaron

You cannot set up a map state whose value is a list state, but you can set its value as a list. However, I think that would also suffer in serialize/deserialize when appending the list as value.

What is the KEY in your map state? If you could use emoji as your KEY, and you could act like this:

After keyby, we would first process a 🐿,  then we get map state as < 🐿, 1>. Then we also process another 🐿, and we would get  < 🐿, 2>. Lastly, we process a 🦞 and try to search the map to know we already queued two 🐿. In this time we could produce {🐿, 🦞} and set previous map state as < 🐿, 1> . If you could follow this logic, the previous serialize/deserialize of Seq<VALUE> could be greatly reduced.

Best
Yun Tang

From: Aaron Langford <[hidden email]>
Sent: Wednesday, December 18, 2019 6:47
To: [hidden email] <[hidden email]>
Subject: MapState with List Type for values
 
Hello Flink Community,

I have a question about using MapState with lists as values. Here is a description of the use case:

I have an operator over a keyed stream where for each record that comes, it needs to look into some state to determine if a value has arrived or not. If the value has not arrived yet, the record needs to be en-queued for when the value will eventually arrive. When that value does arrive, the queued records need to be flushed, and associated list state cleared.

Here's an emoji representation of the stream:
---------------------------------------------------------------------
πŸΏπŸ¦žπŸΏπŸ¦„πŸΏπŸ¦„πŸ¦„πŸ‰ ...
---------------------------------------------------------------------
Where πŸΏ must wait for at least 1 πŸ¦ž to be output (otherwise be queued) and πŸ¦„ must wait for πŸ‰to be output (otherwise be queued). If you can't tell, this is basically/sort of a join without a window.

The output should be something abstractly like this:

---------------------------------------------------------------------
{🐿,🦞}{🐿,🦞}{🐿,🦞}{πŸ¦„,πŸ‰}{πŸ¦„,πŸ‰}{πŸ¦„,πŸ‰}...
---------------------------------------------------------------------

Many records might be en-queued while waiting for a given value. Many records may be waiting for many different values, but any record that is en-queued will only be waiting for a record of one type to show up.

Records on this stream are keyed by some shared parent key, and I have reasons to avoid keying the stream on the "join key" as it were. Namely I'm getting a CDC stream with a lot of different tables, and I want to avoid a topology with N operators for N different tables if I can.

If I lean on MapState<KEY, Seq<VALUE>> to get this done for me, then my job suffers considerably in terms of performance. I believe one of the biggest bottlenecks is that for each time I need to interact with a Seq<VALUE> (like for appends), I must deserialize the entire list.

Is there a way to set up a MapState whose value is a ListState? Or is there any guidance for how I might serialize/deserialize a list type in the MapState in such a way that appends aren't so expensive? Open to other suggestions/approaches as well.

Aaron
Reply | Threaded
Open this post in threaded view
|

Re: MapState with List Type for values

Aaron Langford
So the suggestion as I read it is to have some kind of shared queue for all waiting records. This allows for use of ListState, and cheap appends.

Then the map state counts how many of each record is queued.

When I finally get a record that allows me to remove elements from the queue, I can iterate through the ListState until I have flushed N elements downstream, where N is the number from the mapstate.

Does that read back like what you were suggesting?


The question then becomes how to remove those flushed elements from the ListState. ListState doesn't support removing items by index, or like a linked list.

In which case, maybe this calls for two ListState objects. Each time I need to flush elements, I can move elements that don't match from one ListState to the other.

I scan through all records in ListState each time I receive a record who has elements waiting...


I'll have to think through this and see if that 2 ListState approach is feasible.

Is there anyone working on something like a TreeState, memtable/SortedList state, or AppendingState that supports linked-list-like remove operations? Some of these things might be useful in helping along this implementation.

Aaron

On Wed, Dec 18, 2019 at 8:53 AM Yun Tang <[hidden email]> wrote:
Hi Aaron

You cannot set up a map state whose value is a list state, but you can set its value as a list. However, I think that would also suffer in serialize/deserialize when appending the list as value.

What is the KEY in your map state? If you could use emoji as your KEY, and you could act like this:

After keyby, we would first process a 🐿,  then we get map state as < 🐿, 1>. Then we also process another 🐿, and we would get  < 🐿, 2>. Lastly, we process a 🦞 and try to search the map to know we already queued two 🐿. In this time we could produce {🐿, 🦞} and set previous map state as < 🐿, 1> . If you could follow this logic, the previous serialize/deserialize of Seq<VALUE> could be greatly reduced.

Best
Yun Tang

From: Aaron Langford <[hidden email]>
Sent: Wednesday, December 18, 2019 6:47
To: [hidden email] <[hidden email]>
Subject: MapState with List Type for values
Β 
Hello Flink Community,

I have a question about using MapState with lists as values. Here is a description of the use case:

I have an operator over a keyed stream where for each record that comes, it needs to look into some state to determine if a value has arrived or not. If the value has not arrived yet, the record needs to be en-queued for when the value will eventually arrive. When that value does arrive, the queued records need to be flushed, and associated list state cleared.

Here's an emoji representation of the stream:
---------------------------------------------------------------------
πŸΏπŸ¦žπŸΏπŸ¦„πŸΏπŸ¦„πŸ¦„πŸ‰ ...
---------------------------------------------------------------------
Where 🐿 must wait for at least 1 🦞 to be output (otherwise be queued) andΒ πŸ¦„Β must wait forΒ πŸ‰to be output (otherwise be queued). If you can't tell, this is basically/sort of a join without a window.

The output should be something abstractly like this:

---------------------------------------------------------------------
{🐿,🦞}{🐿,🦞}{🐿,🦞}{πŸ¦„,πŸ‰}{πŸ¦„,πŸ‰}{πŸ¦„,πŸ‰}...
---------------------------------------------------------------------

Many records might be en-queued while waiting for a given value. Many records may be waiting for many different values, but any record that is en-queued will only be waiting for a record of one type to show up.

Records on this stream are keyed by some shared parent key, and I have reasons to avoid keying the stream on the "join key" as it were. Namely I'm getting a CDC stream with a lot of different tables, and I want to avoid a topology with N operators for N different tables if I can.

If I lean on MapState<KEY, Seq<VALUE>> to get this done for me, then my job suffers considerably in terms of performance. I believe one of the biggest bottlenecks is that for each time I need to interact with a Seq<VALUE> (like for appends), I must deserialize the entire list.

Is there a way to set up a MapState whose value is a ListState? Or is there any guidance for how I might serialize/deserialize a list type in the MapState in such a way that appends aren't so expensive? Open to other suggestions/approaches as well.

Aaron
Reply | Threaded
Open this post in threaded view
|

Re: MapState with List Type for values

Yun Tang
Hi Aaron

There exists a runtime key which acts in the keyBy action, and one map-key in your map state. Generally speaking, the runtime key is not the same as the map-key. If you could store your emoji as the map-key in your state, no list state is necessary. The basic idea is a bit like join implemented in flink-table API [1]. I think you could refer to the implementation of SQL join and how they use states to find more hints.


Best
Yun Tang


From: Aaron Langford <[hidden email]>
Sent: Thursday, December 19, 2019 2:22
To: Yun Tang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: MapState with List Type for values
 
So the suggestion as I read it is to have some kind of shared queue for all waiting records. This allows for use of ListState, and cheap appends.

Then the map state counts how many of each record is queued.

When I finally get a record that allows me to remove elements from the queue, I can iterate through the ListState until I have flushed N elements downstream, where N is the number from the mapstate.

Does that read back like what you were suggesting?


The question then becomes how to remove those flushed elements from the ListState. ListState doesn't support removing items by index, or like a linked list.

In which case, maybe this calls for two ListState objects. Each time I need to flush elements, I can move elements that don't match from one ListState to the other.

I scan through all records in ListState each time I receive a record who has elements waiting...


I'll have to think through this and see if that 2 ListState approach is feasible.

Is there anyone working on something like a TreeState, memtable/SortedList state, or AppendingState that supports linked-list-like remove operations? Some of these things might be useful in helping along this implementation.

Aaron

On Wed, Dec 18, 2019 at 8:53 AM Yun Tang <[hidden email]> wrote:
Hi Aaron

You cannot set up a map state whose value is a list state, but you can set its value as a list. However, I think that would also suffer in serialize/deserialize when appending the list as value.

What is the KEY in your map state? If you could use emoji as your KEY, and you could act like this:

After keyby, we would first process a 🐿,  then we get map state as < 🐿, 1>. Then we also process another 🐿, and we would get  < 🐿, 2>. Lastly, we process a 🦞 and try to search the map to know we already queued two 🐿. In this time we could produce {🐿, 🦞} and set previous map state as < 🐿, 1> . If you could follow this logic, the previous serialize/deserialize of Seq<VALUE> could be greatly reduced.

Best
Yun Tang

From: Aaron Langford <[hidden email]>
Sent: Wednesday, December 18, 2019 6:47
To: [hidden email] <[hidden email]>
Subject: MapState with List Type for values
 
Hello Flink Community,

I have a question about using MapState with lists as values. Here is a description of the use case:

I have an operator over a keyed stream where for each record that comes, it needs to look into some state to determine if a value has arrived or not. If the value has not arrived yet, the record needs to be en-queued for when the value will eventually arrive. When that value does arrive, the queued records need to be flushed, and associated list state cleared.

Here's an emoji representation of the stream:
---------------------------------------------------------------------
πŸΏπŸ¦žπŸΏπŸ¦„πŸΏπŸ¦„πŸ¦„πŸ‰ ...
---------------------------------------------------------------------
Where πŸΏ must wait for at least 1 πŸ¦ž to be output (otherwise be queued) and πŸ¦„ must wait for πŸ‰to be output (otherwise be queued). If you can't tell, this is basically/sort of a join without a window.

The output should be something abstractly like this:

---------------------------------------------------------------------
{🐿,🦞}{🐿,🦞}{🐿,🦞}{πŸ¦„,πŸ‰}{πŸ¦„,πŸ‰}{πŸ¦„,πŸ‰}...
---------------------------------------------------------------------

Many records might be en-queued while waiting for a given value. Many records may be waiting for many different values, but any record that is en-queued will only be waiting for a record of one type to show up.

Records on this stream are keyed by some shared parent key, and I have reasons to avoid keying the stream on the "join key" as it were. Namely I'm getting a CDC stream with a lot of different tables, and I want to avoid a topology with N operators for N different tables if I can.

If I lean on MapState<KEY, Seq<VALUE>> to get this done for me, then my job suffers considerably in terms of performance. I believe one of the biggest bottlenecks is that for each time I need to interact with a Seq<VALUE> (like for appends), I must deserialize the entire list.

Is there a way to set up a MapState whose value is a ListState? Or is there any guidance for how I might serialize/deserialize a list type in the MapState in such a way that appends aren't so expensive? Open to other suggestions/approaches as well.

Aaron