Flink: For terabytes of keyed state.

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink: For terabytes of keyed state.

Gowri Sundaram
Hello all,
We have read in multiple sources that Flink has been used for use cases with terabytes of application state.

We are considering using Flink for a similar use case with keyed state in the range of 20 to 30 TB. We had a few questions regarding the same.

  • Is Flink a good option for this kind of scale of data ? We are considering using RocksDB as the state backend.
  • What happens when we want to add a node to the cluster ?
    • As per our understanding, if we have 10 nodes in our cluster, with 20TB of state, this means that adding a node would require the entire 20TB of data to be shipped again from the external checkpoint remote storage to the taskmanager nodes.
    • Assuming 1Gb/s network speed, and assuming all nodes can read their respective 2TB state parallely, this would mean a minimum downtime of half an hour. And this is assuming the throughput of the remote storage does not become the bottleneck.
    • Is there any way to reduce this estimated downtime ?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Flink: For terabytes of keyed state.

Congxian Qiu
Hi

1. From my experience, Flink can support such big state, you can set appropriate parallelism for the stateful operator. for RocksDB you may need to care about the disk performance.
2. Inside Flink, the state is separated by key-group, each task/parallelism contains multiple key-groups.  Flink does not need to restart when you add a node to the cluster, but every time restart from savepoint/checkpoint(or failover), Flink needs to redistribute the checkpoint data, this can be omitted if it's failover and local recovery[1] is enabled 
3. for upload/download state, you can ref to the multiple thread upload/download[2][3] for speed up them


Best,
Congxian


Gowri Sundaram <[hidden email]> 于2020年5月1日周五 下午6:29写道:
Hello all,
We have read in multiple sources that Flink has been used for use cases with terabytes of application state.

We are considering using Flink for a similar use case with keyed state in the range of 20 to 30 TB. We had a few questions regarding the same.

  • Is Flink a good option for this kind of scale of data ? We are considering using RocksDB as the state backend.
  • What happens when we want to add a node to the cluster ?
    • As per our understanding, if we have 10 nodes in our cluster, with 20TB of state, this means that adding a node would require the entire 20TB of data to be shipped again from the external checkpoint remote storage to the taskmanager nodes.
    • Assuming 1Gb/s network speed, and assuming all nodes can read their respective 2TB state parallely, this would mean a minimum downtime of half an hour. And this is assuming the throughput of the remote storage does not become the bottleneck.
    • Is there any way to reduce this estimated downtime ?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Flink: For terabytes of keyed state.

Gowri Sundaram
Hi Congxian,
Thank you so much for your response, that really helps!

From your experience, how long does it take for Flink to redistribute terabytes of state data on node addition / node failure.

Thanks!

On Sun, May 3, 2020 at 6:56 PM Congxian Qiu <[hidden email]> wrote:
Hi

1. From my experience, Flink can support such big state, you can set appropriate parallelism for the stateful operator. for RocksDB you may need to care about the disk performance.
2. Inside Flink, the state is separated by key-group, each task/parallelism contains multiple key-groups.  Flink does not need to restart when you add a node to the cluster, but every time restart from savepoint/checkpoint(or failover), Flink needs to redistribute the checkpoint data, this can be omitted if it's failover and local recovery[1] is enabled 
3. for upload/download state, you can ref to the multiple thread upload/download[2][3] for speed up them


Best,
Congxian


Gowri Sundaram <[hidden email]> 于2020年5月1日周五 下午6:29写道:
Hello all,
We have read in multiple sources that Flink has been used for use cases with terabytes of application state.

We are considering using Flink for a similar use case with keyed state in the range of 20 to 30 TB. We had a few questions regarding the same.

  • Is Flink a good option for this kind of scale of data ? We are considering using RocksDB as the state backend.
  • What happens when we want to add a node to the cluster ?
    • As per our understanding, if we have 10 nodes in our cluster, with 20TB of state, this means that adding a node would require the entire 20TB of data to be shipped again from the external checkpoint remote storage to the taskmanager nodes.
    • Assuming 1Gb/s network speed, and assuming all nodes can read their respective 2TB state parallely, this would mean a minimum downtime of half an hour. And this is assuming the throughput of the remote storage does not become the bottleneck.
    • Is there any way to reduce this estimated downtime ?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Flink: For terabytes of keyed state.

Congxian Qiu
Hi

From my experience, you should care the state size for a single task(not the whole job state size), the download speed for single thread is almost 100 MB/s (this may vary in different env), and I do not have much performance for loading state into RocksDB(we use an internal KV store in my company), but loading state into RocksDB will not slower than downloading from my experience.

Best,
Congxian


Gowri Sundaram <[hidden email]> 于2020年5月3日周日 下午11:25写道:
Hi Congxian,
Thank you so much for your response, that really helps!

From your experience, how long does it take for Flink to redistribute terabytes of state data on node addition / node failure.

Thanks!

On Sun, May 3, 2020 at 6:56 PM Congxian Qiu <[hidden email]> wrote:
Hi

1. From my experience, Flink can support such big state, you can set appropriate parallelism for the stateful operator. for RocksDB you may need to care about the disk performance.
2. Inside Flink, the state is separated by key-group, each task/parallelism contains multiple key-groups.  Flink does not need to restart when you add a node to the cluster, but every time restart from savepoint/checkpoint(or failover), Flink needs to redistribute the checkpoint data, this can be omitted if it's failover and local recovery[1] is enabled 
3. for upload/download state, you can ref to the multiple thread upload/download[2][3] for speed up them


Best,
Congxian


Gowri Sundaram <[hidden email]> 于2020年5月1日周五 下午6:29写道:
Hello all,
We have read in multiple sources that Flink has been used for use cases with terabytes of application state.

We are considering using Flink for a similar use case with keyed state in the range of 20 to 30 TB. We had a few questions regarding the same.

  • Is Flink a good option for this kind of scale of data ? We are considering using RocksDB as the state backend.
  • What happens when we want to add a node to the cluster ?
    • As per our understanding, if we have 10 nodes in our cluster, with 20TB of state, this means that adding a node would require the entire 20TB of data to be shipped again from the external checkpoint remote storage to the taskmanager nodes.
    • Assuming 1Gb/s network speed, and assuming all nodes can read their respective 2TB state parallely, this would mean a minimum downtime of half an hour. And this is assuming the throughput of the remote storage does not become the bottleneck.
    • Is there any way to reduce this estimated downtime ?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Flink: For terabytes of keyed state.

Gowri Sundaram
Hi Congxian, 
Thank you so much for your response! We will go ahead and do a POC to test out how Flink performs at scale.

Regards,
- Gowri

On Wed, May 6, 2020 at 8:34 AM Congxian Qiu <[hidden email]> wrote:
Hi

From my experience, you should care the state size for a single task(not the whole job state size), the download speed for single thread is almost 100 MB/s (this may vary in different env), and I do not have much performance for loading state into RocksDB(we use an internal KV store in my company), but loading state into RocksDB will not slower than downloading from my experience.

Best,
Congxian


Gowri Sundaram <[hidden email]> 于2020年5月3日周日 下午11:25写道:
Hi Congxian,
Thank you so much for your response, that really helps!

From your experience, how long does it take for Flink to redistribute terabytes of state data on node addition / node failure.

Thanks!

On Sun, May 3, 2020 at 6:56 PM Congxian Qiu <[hidden email]> wrote:
Hi

1. From my experience, Flink can support such big state, you can set appropriate parallelism for the stateful operator. for RocksDB you may need to care about the disk performance.
2. Inside Flink, the state is separated by key-group, each task/parallelism contains multiple key-groups.  Flink does not need to restart when you add a node to the cluster, but every time restart from savepoint/checkpoint(or failover), Flink needs to redistribute the checkpoint data, this can be omitted if it's failover and local recovery[1] is enabled 
3. for upload/download state, you can ref to the multiple thread upload/download[2][3] for speed up them


Best,
Congxian


Gowri Sundaram <[hidden email]> 于2020年5月1日周五 下午6:29写道:
Hello all,
We have read in multiple sources that Flink has been used for use cases with terabytes of application state.

We are considering using Flink for a similar use case with keyed state in the range of 20 to 30 TB. We had a few questions regarding the same.

  • Is Flink a good option for this kind of scale of data ? We are considering using RocksDB as the state backend.
  • What happens when we want to add a node to the cluster ?
    • As per our understanding, if we have 10 nodes in our cluster, with 20TB of state, this means that adding a node would require the entire 20TB of data to be shipped again from the external checkpoint remote storage to the taskmanager nodes.
    • Assuming 1Gb/s network speed, and assuming all nodes can read their respective 2TB state parallely, this would mean a minimum downtime of half an hour. And this is assuming the throughput of the remote storage does not become the bottleneck.
    • Is there any way to reduce this estimated downtime ?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Flink: For terabytes of keyed state.

Congxian Qiu
Hi Gowri

Please let us know if you meet any problem~

Best,
Congxian


Gowri Sundaram <[hidden email]> 于2020年5月6日周三 下午1:53写道:
Hi Congxian, 
Thank you so much for your response! We will go ahead and do a POC to test out how Flink performs at scale.

Regards,
- Gowri

On Wed, May 6, 2020 at 8:34 AM Congxian Qiu <[hidden email]> wrote:
Hi

From my experience, you should care the state size for a single task(not the whole job state size), the download speed for single thread is almost 100 MB/s (this may vary in different env), and I do not have much performance for loading state into RocksDB(we use an internal KV store in my company), but loading state into RocksDB will not slower than downloading from my experience.

Best,
Congxian


Gowri Sundaram <[hidden email]> 于2020年5月3日周日 下午11:25写道:
Hi Congxian,
Thank you so much for your response, that really helps!

From your experience, how long does it take for Flink to redistribute terabytes of state data on node addition / node failure.

Thanks!

On Sun, May 3, 2020 at 6:56 PM Congxian Qiu <[hidden email]> wrote:
Hi

1. From my experience, Flink can support such big state, you can set appropriate parallelism for the stateful operator. for RocksDB you may need to care about the disk performance.
2. Inside Flink, the state is separated by key-group, each task/parallelism contains multiple key-groups.  Flink does not need to restart when you add a node to the cluster, but every time restart from savepoint/checkpoint(or failover), Flink needs to redistribute the checkpoint data, this can be omitted if it's failover and local recovery[1] is enabled 
3. for upload/download state, you can ref to the multiple thread upload/download[2][3] for speed up them


Best,
Congxian


Gowri Sundaram <[hidden email]> 于2020年5月1日周五 下午6:29写道:
Hello all,
We have read in multiple sources that Flink has been used for use cases with terabytes of application state.

We are considering using Flink for a similar use case with keyed state in the range of 20 to 30 TB. We had a few questions regarding the same.

  • Is Flink a good option for this kind of scale of data ? We are considering using RocksDB as the state backend.
  • What happens when we want to add a node to the cluster ?
    • As per our understanding, if we have 10 nodes in our cluster, with 20TB of state, this means that adding a node would require the entire 20TB of data to be shipped again from the external checkpoint remote storage to the taskmanager nodes.
    • Assuming 1Gb/s network speed, and assuming all nodes can read their respective 2TB state parallely, this would mean a minimum downtime of half an hour. And this is assuming the throughput of the remote storage does not become the bottleneck.
    • Is there any way to reduce this estimated downtime ?

Thank you!