One way I can think of is:
1. Apply a fake keyBy to the stream and let all the data generate the same key. 2. Use MapState in KeyedProcessFunction on the result of the keyBy above. But is it a good solution? What's the implication to parallelism? Are there better ways? |
From my understanding, having a fake keyBy (stream.keyBy(r => "dummyString"))
means there would be only one slot handling the data. Would a broadcast function [1] work for your case? Regards, Averell [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
But I only have one stream, nothing to connect it to.
On 2019/05/07 00:15:59, Averell <[hidden email]> wrote: > From my understanding, having a fake keyBy (stream.keyBy(r => "dummyString")) > means there would be only one slot handling the data. > Would a broadcast function [1] work for your case? > > Regards, > Averell > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > |
Hi, if you want to increase the parallelism you could also pick a key randomly from a set of keys. The price you would pay is a shuffle operation (network I/O) which would not be needed if you were using the unkeyed stream and used the operator list state. However, with keyed state you could also use Flink's RocksDBKeyedStateBackend which allows to go out of core if your state size should grow very large. Cheers, Till On Tue, May 7, 2019 at 5:57 PM an0 <[hidden email]> wrote: But I only have one stream, nothing to connect it to. |
I switched to using operator list state. It is more clear. It is also supported by RocksDBKeyedStateBackend, isn't it?
On 2019/05/08 14:42:36, Till Rohrmann <[hidden email]> wrote: > Hi, > > if you want to increase the parallelism you could also pick a key randomly > from a set of keys. The price you would pay is a shuffle operation (network > I/O) which would not be needed if you were using the unkeyed stream and > used the operator list state. > > However, with keyed state you could also use Flink's > RocksDBKeyedStateBackend which allows to go out of core if your state size > should grow very large. > > Cheers, > Till > > On Tue, May 7, 2019 at 5:57 PM an0 <[hidden email]> wrote: > > > But I only have one stream, nothing to connect it to. > > > > On 2019/05/07 00:15:59, Averell <[hidden email]> wrote: > > > From my understanding, having a fake keyBy (stream.keyBy(r => > > "dummyString")) > > > means there would be only one slot handling the data. > > > Would a broadcast function [1] work for your case? > > > > > > Regards, > > > Averell > > > > > > [1] > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html > > > > > > > > > > > > -- > > > Sent from: > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > > > > > > |
Hi, Yes, IMO it is more clear. However, you should be aware that operator state is maintained on heap only (not in RocksDB). Best, Fabian Am Mi., 8. Mai 2019 um 20:44 Uhr schrieb an0 <[hidden email]>: I switched to using operator list state. It is more clear. It is also supported by RocksDBKeyedStateBackend, isn't it? |
Thanks, I didn't know that. But it is checkpoints to RocksDB, isn't it? BTW, is this special treatment of operator state documented anywhere?
On 2019/05/09 07:39:34, Fabian Hueske <[hidden email]> wrote: > Hi, > > Yes, IMO it is more clear. > However, you should be aware that operator state is maintained on heap only > (not in RocksDB). > > Best, Fabian > > > Am Mi., 8. Mai 2019 um 20:44 Uhr schrieb an0 <[hidden email]>: > > > I switched to using operator list state. It is more clear. It is also > > supported by RocksDBKeyedStateBackend, isn't it? > > > > On 2019/05/08 14:42:36, Till Rohrmann <[hidden email]> wrote: > > > Hi, > > > > > > if you want to increase the parallelism you could also pick a key > > randomly > > > from a set of keys. The price you would pay is a shuffle operation > > (network > > > I/O) which would not be needed if you were using the unkeyed stream and > > > used the operator list state. > > > > > > However, with keyed state you could also use Flink's > > > RocksDBKeyedStateBackend which allows to go out of core if your state > > size > > > should grow very large. > > > > > > Cheers, > > > Till > > > > > > On Tue, May 7, 2019 at 5:57 PM an0 <[hidden email]> wrote: > > > > > > > But I only have one stream, nothing to connect it to. > > > > > > > > On 2019/05/07 00:15:59, Averell <[hidden email]> wrote: > > > > > From my understanding, having a fake keyBy (stream.keyBy(r => > > > > "dummyString")) > > > > > means there would be only one slot handling the data. > > > > > Would a broadcast function [1] work for your case? > > > > > > > > > > Regards, > > > > > Averell > > > > > > > > > > [1] > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html > > > > > > > > > > > > > > > > > > > > -- > > > > > Sent from: > > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > > > > > > > > > > > > > > > |
Hi, RocksDB is only used as local state store. Operator state is not stored in RocksDB but only on the TM JVM heap. When a checkpoint is taken, the keyed state from RocksDB and the operator state from the heap are both copied to a persistent data store (HDFS, S3, ...). I was trying to find the documentation that explains how operator state is managed, but couldn't find it. I'll create a Jira to fix that. Best, Fabian Am Do., 9. Mai 2019 um 16:10 Uhr schrieb an0 <[hidden email]>: Thanks, I didn't know that. But it is checkpoints to RocksDB, isn't it? BTW, is this special treatment of operator state documented anywhere? |
Got it, thanks.
On 2019/05/10 10:20:40, Fabian Hueske <[hidden email]> wrote: > Hi, > > RocksDB is only used as local state store. Operator state is not stored in > RocksDB but only on the TM JVM heap. > When a checkpoint is taken, the keyed state from RocksDB and the operator > state from the heap are both copied to a persistent data store (HDFS, S3, > ...). > > I was trying to find the documentation that explains how operator state is > managed, but couldn't find it. > I'll create a Jira to fix that. > > Best, Fabian > > Am Do., 9. Mai 2019 um 16:10 Uhr schrieb an0 <[hidden email]>: > > > Thanks, I didn't know that. But it is checkpoints to RocksDB, isn't it? > > BTW, is this special treatment of operator state documented anywhere? > > > > On 2019/05/09 07:39:34, Fabian Hueske <[hidden email]> wrote: > > > Hi, > > > > > > Yes, IMO it is more clear. > > > However, you should be aware that operator state is maintained on heap > > only > > > (not in RocksDB). > > > > > > Best, Fabian > > > > > > > > > Am Mi., 8. Mai 2019 um 20:44 Uhr schrieb an0 <[hidden email]>: > > > > > > > I switched to using operator list state. It is more clear. It is also > > > > supported by RocksDBKeyedStateBackend, isn't it? > > > > > > > > On 2019/05/08 14:42:36, Till Rohrmann <[hidden email]> wrote: > > > > > Hi, > > > > > > > > > > if you want to increase the parallelism you could also pick a key > > > > randomly > > > > > from a set of keys. The price you would pay is a shuffle operation > > > > (network > > > > > I/O) which would not be needed if you were using the unkeyed stream > > and > > > > > used the operator list state. > > > > > > > > > > However, with keyed state you could also use Flink's > > > > > RocksDBKeyedStateBackend which allows to go out of core if your state > > > > size > > > > > should grow very large. > > > > > > > > > > Cheers, > > > > > Till > > > > > > > > > > On Tue, May 7, 2019 at 5:57 PM an0 <[hidden email]> wrote: > > > > > > > > > > > But I only have one stream, nothing to connect it to. > > > > > > > > > > > > On 2019/05/07 00:15:59, Averell <[hidden email]> wrote: > > > > > > > From my understanding, having a fake keyBy (stream.keyBy(r => > > > > > > "dummyString")) > > > > > > > means there would be only one slot handling the data. > > > > > > > Would a broadcast function [1] work for your case? > > > > > > > > > > > > > > Regards, > > > > > > > Averell > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > Sent from: > > > > > > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |