Questions Flink DataStream in BATCH execution mode scalability advice

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Questions Flink DataStream in BATCH execution mode scalability advice

Marco Villalobos-2
Questions Flink DataStream in BATCH execution mode scalability advice.

Here is the problem that I am trying to solve.

Input is an S3 bucket directory with about 500 GB of data across many files. The instance that I am running on only has 50GB of EBS storage. The nature of this data is time series data. Imagine name, value, timestamp.

I must average the time_series.value by time_series.name on a tumbling window of 15 minutes. Upon aggregation, the time_series.timestamp gets rounded up a quarter.  I key by tag name and 15-minute interval.

After aggregation, I must forward fill the missing quarters for each time_series.name. Currently, this forward fill operator is keyed only by time_series.name. Does this mean that in batch mode, all of the time series with the same time_series.name within the 500 gb of files must fit in memory?

The results are saved in a rdbms.

If this job somehow reads all 500 GB before it sends it to the first operator, where is the data store?

Now considering that the EMR node only has 50GB of ebs (that's disk storage), is there a means to configure Flink to store its intermediate results within S3?

When the job failed, I saw this exception in the log: "Recovery is suppressed by NoRestartBackoffTimeStrategy." Is there a way to configure this to recover?

My job keeps on failing for the same reason, it says, "The heartbeat of TaskManager with id container_xxx timed out." Is there a way to configure it not to timeout?

I would appreciate any advice on how I should save these problems. Thank you.


Reply | Threaded
Open this post in threaded view
|

Re: Questions Flink DataStream in BATCH execution mode scalability advice

Yun Gao
Hi Marco,

For the remaining issues, 

1. For the aggregation, the 500GB of files are not required to be fit into memory.
Rough speaking for the keyed().window().reduce(), the input records would be first
sort according to the key (time_series.name) via external sorts, which only consumes
a fix amount of memory. Then for all the records of each key, flink would use a special 
single key statebackend to hold the intermediate result for this key, whose memory
consumption usually could be ignored. 
2. The first operator (namely the source) would read the files direclty and emit to the
following operators directly, thus there should be no intermediate result before the first
operator. 
3. I wonder now flink does not support using S3 to store the intermediate result, since it
relies on local I/O mechanisms like mmap or local file read/write, and S3 seems not
support. EBS seems to be ok.
4. The heartbeat timeout happens normally due to akka thread get blocked or network issue. 
To check if thread get blocked, you may first check the GC log to see if there are long full gc, 
if not, then check if the JM or TM akka thread get blocked via thread dump. If it seems to be 
the network issues, the job could configure heartbeat.timeout [1] to increase the timeout.

Best,
Yun

[1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout


------------------Original Mail ------------------
Sender:Marco Villalobos <[hidden email]>
Send Date:Wed May 19 14:03:48 2021
Recipients:user <[hidden email]>
Subject:Questions Flink DataStream in BATCH execution mode scalability advice
Questions Flink DataStream in BATCH execution mode scalability advice.

Here is the problem that I am trying to solve.

Input is an S3 bucket directory with about 500 GB of data across many files. The instance that I am running on only has 50GB of EBS storage. The nature of this data is time series data. Imagine name, value, timestamp.

I must average the time_series.value by time_series.name on a tumbling window of 15 minutes. Upon aggregation, the time_series.timestamp gets rounded up a quarter.  I key by tag name and 15-minute interval.

After aggregation, I must forward fill the missing quarters for each time_series.name. Currently, this forward fill operator is keyed only by time_series.name. Does this mean that in batch mode, all of the time series with the same time_series.name within the 500 gb of files must fit in memory?

The results are saved in a rdbms.

If this job somehow reads all 500 GB before it sends it to the first operator, where is the data store?

Now considering that the EMR node only has 50GB of ebs (that's disk storage), is there a means to configure Flink to store its intermediate results within S3?

When the job failed, I saw this exception in the log: "Recovery is suppressed by NoRestartBackoffTimeStrategy." Is there a way to configure this to recover?

My job keeps on failing for the same reason, it says, "The heartbeat of TaskManager with id container_xxx timed out." Is there a way to configure it not to timeout?

I would appreciate any advice on how I should save these problems. Thank you.


Reply | Threaded
Open this post in threaded view
|

Re: Questions Flink DataStream in BATCH execution mode scalability advice

Marco Villalobos-2


> On May 19, 2021, at 7:26 AM, Yun Gao <[hidden email]> wrote:
>
> Hi Marco,
>
> For the remaining issues,
>
> 1. For the aggregation, the 500GB of files are not required to be fit into memory.
> Rough speaking for the keyed().window().reduce(), the input records would be first
> sort according to the key (time_series.name) via external sorts, which only consumes
> a fix amount of memory. Then for all the records of each key, flink would use a special
> single key statebackend to hold the intermediate result for this key, whose memory
> consumption usually could be ignored.

Thank you. However, if the source of data is s3, this means that locally the machine must have disk space to store the files from s3 and any intermediate files, right?

Considering that the source of data is 500 GB, doesn't this mean that the machine will need at least 500 GB of disk space?


> 2. The first operator (namely the source) would read the files direclty and emit to the
> following operators directly, thus there should be no intermediate result before the first
> operator.
> 3. I wonder now flink does not support using S3 to store the intermediate result, since it
> relies on local I/O mechanisms like mmap or local file read/write, and S3 seems not
> support. EBS seems to be ok.
> 4. The heartbeat timeout happens normally due to akka thread get blocked or network issue.
> To check if thread get blocked, you may first check the GC log to see if there are long full gc,
> if not, then check if the JM or TM akka thread get blocked via thread dump. If it seems to be
> the network issues, the job could configure heartbeat.timeout [1] to increase the timeout.
>
> Best,
> Yun
>
> [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout
>
>
> ------------------Original Mail ------------------
> Sender:Marco Villalobos <[hidden email]>
> Send Date:Wed May 19 14:03:48 2021
> Recipients:user <[hidden email]>
> Subject:Questions Flink DataStream in BATCH execution mode scalability advice
> Questions Flink DataStream in BATCH execution mode scalability advice.
>
> Here is the problem that I am trying to solve.
>
> Input is an S3 bucket directory with about 500 GB of data across many files. The instance that I am running on only has 50GB of EBS storage. The nature of this data is time series data. Imagine name, value, timestamp.
>
> I must average the time_series.value by time_series.name on a tumbling window of 15 minutes. Upon aggregation, the time_series.timestamp gets rounded up a quarter.  I key by tag name and 15-minute interval.
>
> After aggregation, I must forward fill the missing quarters for each time_series.name. Currently, this forward fill operator is keyed only by time_series.name. Does this mean that in batch mode, all of the time series with the same time_series.name within the 500 gb of files must fit in memory?
>
> The results are saved in a rdbms.
>
> If this job somehow reads all 500 GB before it sends it to the first operator, where is the data store?
>
> Now considering that the EMR node only has 50GB of ebs (that's disk storage), is there a means to configure Flink to store its intermediate results within S3?
>
> When the job failed, I saw this exception in the log: "Recovery is suppressed by NoRestartBackoffTimeStrategy." Is there a way to configure this to recover?
>
> My job keeps on failing for the same reason, it says, "The heartbeat of TaskManager with id container_xxx timed out." Is there a way to configure it not to timeout?
>
> I would appreciate any advice on how I should save these problems. Thank you.
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Re: Questions Flink DataStream in BATCH execution mode scalability advice

Yun Gao
Hi Marco,

I think Flink does not need 500GB for the source, the source should
be able to read from S3 in a streaming pattern (namely open the file,
create an input stream and fetch data as required). 

But it might indeed need disk spaces for intermediate data
between operators and the sort operator. The amount of space
should depends on how the operators scale up or scale down
the data volume. Another point is that if the parallelism > 1, 
a single machine should be able to require less disk space.

Best,
Yun


------------------Original Mail ------------------
Sender:Marco Villalobos <[hidden email]>
Send Date:Thu May 20 01:16:39 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Questions Flink DataStream in BATCH execution mode scalability advice


> On May 19, 2021, at 7:26 AM, Yun Gao <[hidden email]> wrote:

> Hi Marco,

> For the remaining issues, 

> 1. For the aggregation, the 500GB of files are not required to be fit into memory.
> Rough speaking for the keyed().window().reduce(), the input records would be first
> sort according to the key (time_series.name) via external sorts, which only consumes
> a fix amount of memory. Then for all the records of each key, flink would use a special 
> single key statebackend to hold the intermediate result for this key, whose memory
> consumption usually could be ignored. 

Thank you. However, if the source of data is s3, this means that locally the machine must have disk space to store the files from s3 and any intermediate files, right?

Considering that the source of data is 500 GB, doesn't this mean that the machine will need at least 500 GB of disk space?


> 2. The first operator (namely the source) would read the files direclty and emit to the
> following operators directly, thus there should be no intermediate result before the first
> operator. 
> 3. I wonder now flink does not support using S3 to store the intermediate result, since it
> relies on local I/O mechanisms like mmap or local file read/write, and S3 seems not
> support. EBS seems to be ok.
> 4. The heartbeat timeout happens normally due to akka thread get blocked or network issue. 
> To check if thread get blocked, you may first check the GC log to see if there are long full gc, 
> if not, then check if the JM or TM akka thread get blocked via thread dump. If it seems to be 
> the network issues, the job could configure heartbeat.timeout [1] to increase the timeout.

> Best,
> Yun

> [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout


> ------------------Original Mail ------------------
> Sender:Marco Villalobos <[hidden email]>
> Send Date:Wed May 19 14:03:48 2021
> Recipients:user <[hidden email]>
> Subject:Questions Flink DataStream in BATCH execution mode scalability advice
> Questions Flink DataStream in BATCH execution mode scalability advice.

> Here is the problem that I am trying to solve.

> Input is an S3 bucket directory with about 500 GB of data across many files. The instance that I am running on only has 50GB of EBS storage. The nature of this data is time series data. Imagine name, value, timestamp.

> I must average the time_series.value by time_series.name on a tumbling window of 15 minutes. Upon aggregation, the time_series.timestamp gets rounded up a quarter.  I key by tag name and 15-minute interval.

> After aggregation, I must forward fill the missing quarters for each time_series.name. Currently, this forward fill operator is keyed only by time_series.name. Does this mean that in batch mode, all of the time series with the same time_series.name within the 500 gb of files must fit in memory?

> The results are saved in a rdbms.

> If this job somehow reads all 500 GB before it sends it to the first operator, where is the data store?

> Now considering that the EMR node only has 50GB of ebs (that's disk storage), is there a means to configure Flink to store its intermediate results within S3?

> When the job failed, I saw this exception in the log: "Recovery is suppressed by NoRestartBackoffTimeStrategy." Is there a way to configure this to recover?

> My job keeps on failing for the same reason, it says, "The heartbeat of TaskManager with id container_xxx timed out." Is there a way to configure it not to timeout?

> I would appreciate any advice on how I should save these problems. Thank you.