Flink s3 streaming performance

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink s3 streaming performance

venkata sateesh` kolluru
Hello,

I have posted the same in stackoverflow but didnt get any response. So posting it here for help.


Details:

I am working on a flink application on kubernetes(eks) which consumes data from kafka and write it to s3.

We have around 120 million xml messages of size 4TB in kafka. Consuming from kafka is super fast.

These are just string messages from kafka. 

There is a high back pressure while writing to s3. We are not even hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing only 300 writes per minute to S3 which is very slow.

I am using StreamFileSink to write to s3 with Rolling policy as OnCheckpointPolicy.

Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)

Other than this I dont have any config related to s3

    StreamingFileSink<Tuple3<String,String, String>> sink = StreamingFileSink
            .forRowFormat(new Path(s3://BUCKET),
                    (Tuple3<String,String, String> element, OutputStream stream) -> {
                        PrintStream out = new PrintStream(stream);
                        out.println(element.f2);
                    })
            // Determine component type for each record
            .withBucketAssigner(new CustomBucketAssigner())
            .withRollingPolicy(OnCheckpointRollingPolicy.build())
            .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
            .build(); 

Is there anything that we can optimize on s3 from streamfilesink or in flink-conf.xml ?

Like using bulkformat or any config params like fs.s3.maxThreads etc.

For checkpointing too I am using s3:// instead of s3p or s3a

env.setStateBackend((StateBackend) new RocksDBStateBackend(s3://checkpoint_bucket, true));
        env.enableCheckpointing(300000);

Reply | Threaded
Open this post in threaded view
|

Re: Flink s3 streaming performance

David Magalhães
Hi Venkata. 

300 requests per minute look like a 200ms per request, which should be a normal response time to send a file if there isn't any speed limitation (how big are the files?).

Have you changed the parallelization to be higher than 1? I also recommend to limit the source parallelization, because it can consume pretty fast from Kafka and create some kind of backpressure.

I don't any much experience with StreamingFileSink, because I've ended up using a custom S3Sink, but I did have some issues writing to S3 because the request wasn't parallelised. Check this thread, http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070

On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <[hidden email]> wrote:
Hello,

I have posted the same in stackoverflow but didnt get any response. So posting it here for help.


Details:

I am working on a flink application on kubernetes(eks) which consumes data from kafka and write it to s3.

We have around 120 million xml messages of size 4TB in kafka. Consuming from kafka is super fast.

These are just string messages from kafka. 

There is a high back pressure while writing to s3. We are not even hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing only 300 writes per minute to S3 which is very slow.

I am using StreamFileSink to write to s3 with Rolling policy as OnCheckpointPolicy.

Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)

Other than this I dont have any config related to s3

    StreamingFileSink<Tuple3<String,String, String>> sink = StreamingFileSink
            .forRowFormat(new Path(s3://BUCKET),
                    (Tuple3<String,String, String> element, OutputStream stream) -> {
                        PrintStream out = new PrintStream(stream);
                        out.println(element.f2);
                    })
            // Determine component type for each record
            .withBucketAssigner(new CustomBucketAssigner())
            .withRollingPolicy(OnCheckpointRollingPolicy.build())
            .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
            .build(); 

Is there anything that we can optimize on s3 from streamfilesink or in flink-conf.xml ?

Like using bulkformat or any config params like fs.s3.maxThreads etc.

For checkpointing too I am using s3:// instead of s3p or s3a

env.setStateBackend((StateBackend) new RocksDBStateBackend(s3://checkpoint_bucket, true));
        env.enableCheckpointing(300000);

Reply | Threaded
Open this post in threaded view
|

Re: Flink s3 streaming performance

venkata sateesh` kolluru
Hi David,

The avg size of each file is around 30KB and I have checkpoint interval of 5 minutes. Some files are even 1 kb, because of checkpoint some files are merged into 1 big file around 300MB.

With 120 million files and 4Tb, if the rate of transfer is 300 per minute, it is taking weeks to write to s3.

I have tried to increase parallelism of sink but I dont see any improvement. 

The sink record is Tuple3<String,String,String>, the actual content of file is f2. This is content is written to <s3 bucket>/f0/f1/part*-* 

I guess the prefix determination in custombucketassigner wont be causing this delay?

Could you please shed some light on writing custom s3 sink ?

Thanks


On Sun, May 31, 2020, 6:34 AM David Magalhães <[hidden email]> wrote:
Hi Venkata. 

300 requests per minute look like a 200ms per request, which should be a normal response time to send a file if there isn't any speed limitation (how big are the files?).

Have you changed the parallelization to be higher than 1? I also recommend to limit the source parallelization, because it can consume pretty fast from Kafka and create some kind of backpressure.

I don't any much experience with StreamingFileSink, because I've ended up using a custom S3Sink, but I did have some issues writing to S3 because the request wasn't parallelised. Check this thread, http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070

On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <[hidden email]> wrote:
Hello,

I have posted the same in stackoverflow but didnt get any response. So posting it here for help.


Details:

I am working on a flink application on kubernetes(eks) which consumes data from kafka and write it to s3.

We have around 120 million xml messages of size 4TB in kafka. Consuming from kafka is super fast.

These are just string messages from kafka. 

There is a high back pressure while writing to s3. We are not even hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing only 300 writes per minute to S3 which is very slow.

I am using StreamFileSink to write to s3 with Rolling policy as OnCheckpointPolicy.

Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)

Other than this I dont have any config related to s3

    StreamingFileSink<Tuple3<String,String, String>> sink = StreamingFileSink
            .forRowFormat(new Path(s3://BUCKET),
                    (Tuple3<String,String, String> element, OutputStream stream) -> {
                        PrintStream out = new PrintStream(stream);
                        out.println(element.f2);
                    })
            // Determine component type for each record
            .withBucketAssigner(new CustomBucketAssigner())
            .withRollingPolicy(OnCheckpointRollingPolicy.build())
            .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
            .build(); 

Is there anything that we can optimize on s3 from streamfilesink or in flink-conf.xml ?

Like using bulkformat or any config params like fs.s3.maxThreads etc.

For checkpointing too I am using s3:// instead of s3p or s3a

env.setStateBackend((StateBackend) new RocksDBStateBackend(s3://checkpoint_bucket, true));
        env.enableCheckpointing(300000);

Reply | Threaded
Open this post in threaded view
|

Re: Flink s3 streaming performance

Jörn Franke
I think S3 is a wrong storage backend for this volumes of small messages. 
Try to use a NoSQL database or write multiple messages into one file in S3 (10000 or 100000)

If you still want to go with your scenario then try a network optimized instance and use s3a in Flink and configure s3 entropy.

Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <[hidden email]>:


Hi David,

The avg size of each file is around 30KB and I have checkpoint interval of 5 minutes. Some files are even 1 kb, because of checkpoint some files are merged into 1 big file around 300MB.

With 120 million files and 4Tb, if the rate of transfer is 300 per minute, it is taking weeks to write to s3.

I have tried to increase parallelism of sink but I dont see any improvement. 

The sink record is Tuple3<String,String,String>, the actual content of file is f2. This is content is written to <s3 bucket>/f0/f1/part*-* 

I guess the prefix determination in custombucketassigner wont be causing this delay?

Could you please shed some light on writing custom s3 sink ?

Thanks


On Sun, May 31, 2020, 6:34 AM David Magalhães <[hidden email]> wrote:
Hi Venkata. 

300 requests per minute look like a 200ms per request, which should be a normal response time to send a file if there isn't any speed limitation (how big are the files?).

Have you changed the parallelization to be higher than 1? I also recommend to limit the source parallelization, because it can consume pretty fast from Kafka and create some kind of backpressure.

I don't any much experience with StreamingFileSink, because I've ended up using a custom S3Sink, but I did have some issues writing to S3 because the request wasn't parallelised. Check this thread, http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070

On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <[hidden email]> wrote:
Hello,

I have posted the same in stackoverflow but didnt get any response. So posting it here for help.


Details:

I am working on a flink application on kubernetes(eks) which consumes data from kafka and write it to s3.

We have around 120 million xml messages of size 4TB in kafka. Consuming from kafka is super fast.

These are just string messages from kafka. 

There is a high back pressure while writing to s3. We are not even hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing only 300 writes per minute to S3 which is very slow.

I am using StreamFileSink to write to s3 with Rolling policy as OnCheckpointPolicy.

Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)

Other than this I dont have any config related to s3

    StreamingFileSink<Tuple3<String,String, String>> sink = StreamingFileSink
            .forRowFormat(new Path(s3://BUCKET),
                    (Tuple3<String,String, String> element, OutputStream stream) -> {
                        PrintStream out = new PrintStream(stream);
                        out.println(element.f2);
                    })
            // Determine component type for each record
            .withBucketAssigner(new CustomBucketAssigner())
            .withRollingPolicy(OnCheckpointRollingPolicy.build())
            .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
            .build(); 

Is there anything that we can optimize on s3 from streamfilesink or in flink-conf.xml ?

Like using bulkformat or any config params like fs.s3.maxThreads etc.

For checkpointing too I am using s3:// instead of s3p or s3a

env.setStateBackend((StateBackend) new RocksDBStateBackend(s3://checkpoint_bucket, true));
        env.enableCheckpointing(300000);

Reply | Threaded
Open this post in threaded view
|

Re: Flink s3 streaming performance

Arvid Heise-3
Hi Venkata,

are the many small files intended or is it rather an issue of our commit on checkpointing? If so then FLINK-11499 [1] should help you. Design is close to done, unfortunately implementation will not make it into 1.11.


Lastly, the way you output elements looks also a bit suspicious. PrintStream is not known for great performance. I'm also surprised that it works without manual flushing.


On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke <[hidden email]> wrote:
I think S3 is a wrong storage backend for this volumes of small messages. 
Try to use a NoSQL database or write multiple messages into one file in S3 (10000 or 100000)

If you still want to go with your scenario then try a network optimized instance and use s3a in Flink and configure s3 entropy.

Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <[hidden email]>:


Hi David,

The avg size of each file is around 30KB and I have checkpoint interval of 5 minutes. Some files are even 1 kb, because of checkpoint some files are merged into 1 big file around 300MB.

With 120 million files and 4Tb, if the rate of transfer is 300 per minute, it is taking weeks to write to s3.

I have tried to increase parallelism of sink but I dont see any improvement. 

The sink record is Tuple3<String,String,String>, the actual content of file is f2. This is content is written to <s3 bucket>/f0/f1/part*-* 

I guess the prefix determination in custombucketassigner wont be causing this delay?

Could you please shed some light on writing custom s3 sink ?

Thanks


On Sun, May 31, 2020, 6:34 AM David Magalhães <[hidden email]> wrote:
Hi Venkata. 

300 requests per minute look like a 200ms per request, which should be a normal response time to send a file if there isn't any speed limitation (how big are the files?).

Have you changed the parallelization to be higher than 1? I also recommend to limit the source parallelization, because it can consume pretty fast from Kafka and create some kind of backpressure.

I don't any much experience with StreamingFileSink, because I've ended up using a custom S3Sink, but I did have some issues writing to S3 because the request wasn't parallelised. Check this thread, http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070

On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <[hidden email]> wrote:
Hello,

I have posted the same in stackoverflow but didnt get any response. So posting it here for help.


Details:

I am working on a flink application on kubernetes(eks) which consumes data from kafka and write it to s3.

We have around 120 million xml messages of size 4TB in kafka. Consuming from kafka is super fast.

These are just string messages from kafka. 

There is a high back pressure while writing to s3. We are not even hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing only 300 writes per minute to S3 which is very slow.

I am using StreamFileSink to write to s3 with Rolling policy as OnCheckpointPolicy.

Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)

Other than this I dont have any config related to s3

    StreamingFileSink<Tuple3<String,String, String>> sink = StreamingFileSink
            .forRowFormat(new Path(s3://BUCKET),
                    (Tuple3<String,String, String> element, OutputStream stream) -> {
                        PrintStream out = new PrintStream(stream);
                        out.println(element.f2);
                    })
            // Determine component type for each record
            .withBucketAssigner(new CustomBucketAssigner())
            .withRollingPolicy(OnCheckpointRollingPolicy.build())
            .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
            .build(); 

Is there anything that we can optimize on s3 from streamfilesink or in flink-conf.xml ?

Like using bulkformat or any config params like fs.s3.maxThreads etc.

For checkpointing too I am using s3:// instead of s3p or s3a

env.setStateBackend((StateBackend) new RocksDBStateBackend(s3://checkpoint_bucket, true));
        env.enableCheckpointing(300000);



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Flink s3 streaming performance

Kostas Kloudas-2
Hi all,

@Venkata, Do you have many small files being created as Arvid suggested? If yes, then I tend to agree that S3 is probably not the best sink. Although I did not get that from your description.
In addition, instead of PrintStream you can have a look at the code of the SimpleStringEncoder in Flink [1] for a bit more efficient implementation.

Cheers,
Kostas



On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise <[hidden email]> wrote:
Hi Venkata,

are the many small files intended or is it rather an issue of our commit on checkpointing? If so then FLINK-11499 [1] should help you. Design is close to done, unfortunately implementation will not make it into 1.11.


Lastly, the way you output elements looks also a bit suspicious. PrintStream is not known for great performance. I'm also surprised that it works without manual flushing.


On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke <[hidden email]> wrote:
I think S3 is a wrong storage backend for this volumes of small messages. 
Try to use a NoSQL database or write multiple messages into one file in S3 (10000 or 100000)

If you still want to go with your scenario then try a network optimized instance and use s3a in Flink and configure s3 entropy.

Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <[hidden email]>:


Hi David,

The avg size of each file is around 30KB and I have checkpoint interval of 5 minutes. Some files are even 1 kb, because of checkpoint some files are merged into 1 big file around 300MB.

With 120 million files and 4Tb, if the rate of transfer is 300 per minute, it is taking weeks to write to s3.

I have tried to increase parallelism of sink but I dont see any improvement. 

The sink record is Tuple3<String,String,String>, the actual content of file is f2. This is content is written to <s3 bucket>/f0/f1/part*-* 

I guess the prefix determination in custombucketassigner wont be causing this delay?

Could you please shed some light on writing custom s3 sink ?

Thanks


On Sun, May 31, 2020, 6:34 AM David Magalhães <[hidden email]> wrote:
Hi Venkata. 

300 requests per minute look like a 200ms per request, which should be a normal response time to send a file if there isn't any speed limitation (how big are the files?).

Have you changed the parallelization to be higher than 1? I also recommend to limit the source parallelization, because it can consume pretty fast from Kafka and create some kind of backpressure.

I don't any much experience with StreamingFileSink, because I've ended up using a custom S3Sink, but I did have some issues writing to S3 because the request wasn't parallelised. Check this thread, http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070

On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <[hidden email]> wrote:
Hello,

I have posted the same in stackoverflow but didnt get any response. So posting it here for help.


Details:

I am working on a flink application on kubernetes(eks) which consumes data from kafka and write it to s3.

We have around 120 million xml messages of size 4TB in kafka. Consuming from kafka is super fast.

These are just string messages from kafka. 

There is a high back pressure while writing to s3. We are not even hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing only 300 writes per minute to S3 which is very slow.

I am using StreamFileSink to write to s3 with Rolling policy as OnCheckpointPolicy.

Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)

Other than this I dont have any config related to s3

    StreamingFileSink<Tuple3<String,String, String>> sink = StreamingFileSink
            .forRowFormat(new Path(s3://BUCKET),
                    (Tuple3<String,String, String> element, OutputStream stream) -> {
                        PrintStream out = new PrintStream(stream);
                        out.println(element.f2);
                    })
            // Determine component type for each record
            .withBucketAssigner(new CustomBucketAssigner())
            .withRollingPolicy(OnCheckpointRollingPolicy.build())
            .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
            .build(); 

Is there anything that we can optimize on s3 from streamfilesink or in flink-conf.xml ?

Like using bulkformat or any config params like fs.s3.maxThreads etc.

For checkpointing too I am using s3:// instead of s3p or s3a

env.setStateBackend((StateBackend) new RocksDBStateBackend(s3://checkpoint_bucket, true));
        env.enableCheckpointing(300000);



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Flink s3 streaming performance

venkata sateesh` kolluru
Hi Kostas and Arvid,

Thanks for your suggestions. 

The small files were already created and I am trying to roll few into a big file while sinking. But due to the custom bucket assigner, it is hard getting more files with in the same prefix in specified checkinpointing time.

For example:
<bucket>/prefix1/prefix2/YY/MM/DD/HH  is our structure in s3.
checkpointing interval is 5 minutes. prefix1 has 40 different values and prefix 2 has 10000+ values
With in the 5 minute interval, we are able to get part file size in these prefixes not more than 5-10 files.

Regarding printstream, will figure out how to use SimpleStringEncoder on a Tuple as I only need to write tuple.f2 element in the file. If you can guide me on how to do it, I would appreciate it.

Will try Arvid suggestion on increasing fs.s3a.connection.maximum . I was trying to find about these parameters and could find anywhere. Is there a place that I could look at these config params list ?


On Fri, Jun 5, 2020 at 2:13 PM Kostas Kloudas <[hidden email]> wrote:
Hi all,

@Venkata, Do you have many small files being created as Arvid suggested? If yes, then I tend to agree that S3 is probably not the best sink. Although I did not get that from your description.
In addition, instead of PrintStream you can have a look at the code of the SimpleStringEncoder in Flink [1] for a bit more efficient implementation.

Cheers,
Kostas



On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise <[hidden email]> wrote:
Hi Venkata,

are the many small files intended or is it rather an issue of our commit on checkpointing? If so then FLINK-11499 [1] should help you. Design is close to done, unfortunately implementation will not make it into 1.11.


Lastly, the way you output elements looks also a bit suspicious. PrintStream is not known for great performance. I'm also surprised that it works without manual flushing.


On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke <[hidden email]> wrote:
I think S3 is a wrong storage backend for this volumes of small messages. 
Try to use a NoSQL database or write multiple messages into one file in S3 (10000 or 100000)

If you still want to go with your scenario then try a network optimized instance and use s3a in Flink and configure s3 entropy.

Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <[hidden email]>:


Hi David,

The avg size of each file is around 30KB and I have checkpoint interval of 5 minutes. Some files are even 1 kb, because of checkpoint some files are merged into 1 big file around 300MB.

With 120 million files and 4Tb, if the rate of transfer is 300 per minute, it is taking weeks to write to s3.

I have tried to increase parallelism of sink but I dont see any improvement. 

The sink record is Tuple3<String,String,String>, the actual content of file is f2. This is content is written to <s3 bucket>/f0/f1/part*-* 

I guess the prefix determination in custombucketassigner wont be causing this delay?

Could you please shed some light on writing custom s3 sink ?

Thanks


On Sun, May 31, 2020, 6:34 AM David Magalhães <[hidden email]> wrote:
Hi Venkata. 

300 requests per minute look like a 200ms per request, which should be a normal response time to send a file if there isn't any speed limitation (how big are the files?).

Have you changed the parallelization to be higher than 1? I also recommend to limit the source parallelization, because it can consume pretty fast from Kafka and create some kind of backpressure.

I don't any much experience with StreamingFileSink, because I've ended up using a custom S3Sink, but I did have some issues writing to S3 because the request wasn't parallelised. Check this thread, http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070

On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <[hidden email]> wrote:
Hello,

I have posted the same in stackoverflow but didnt get any response. So posting it here for help.


Details:

I am working on a flink application on kubernetes(eks) which consumes data from kafka and write it to s3.

We have around 120 million xml messages of size 4TB in kafka. Consuming from kafka is super fast.

These are just string messages from kafka. 

There is a high back pressure while writing to s3. We are not even hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing only 300 writes per minute to S3 which is very slow.

I am using StreamFileSink to write to s3 with Rolling policy as OnCheckpointPolicy.

Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)

Other than this I dont have any config related to s3

    StreamingFileSink<Tuple3<String,String, String>> sink = StreamingFileSink
            .forRowFormat(new Path(s3://BUCKET),
                    (Tuple3<String,String, String> element, OutputStream stream) -> {
                        PrintStream out = new PrintStream(stream);
                        out.println(element.f2);
                    })
            // Determine component type for each record
            .withBucketAssigner(new CustomBucketAssigner())
            .withRollingPolicy(OnCheckpointRollingPolicy.build())
            .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
            .build(); 

Is there anything that we can optimize on s3 from streamfilesink or in flink-conf.xml ?

Like using bulkformat or any config params like fs.s3.maxThreads etc.

For checkpointing too I am using s3:// instead of s3p or s3a

env.setStateBackend((StateBackend) new RocksDBStateBackend(s3://checkpoint_bucket, true));
        env.enableCheckpointing(300000);



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Flink s3 streaming performance

Arvid Heise-3
Hi Venkata,

you can find them on the Hadoop AWS page (we are just using it as a library) [1].


On Sat, Jun 6, 2020 at 1:26 AM venkata sateesh` kolluru <[hidden email]> wrote:
Hi Kostas and Arvid,

Thanks for your suggestions. 

The small files were already created and I am trying to roll few into a big file while sinking. But due to the custom bucket assigner, it is hard getting more files with in the same prefix in specified checkinpointing time.

For example:
<bucket>/prefix1/prefix2/YY/MM/DD/HH  is our structure in s3.
checkpointing interval is 5 minutes. prefix1 has 40 different values and prefix 2 has 10000+ values
With in the 5 minute interval, we are able to get part file size in these prefixes not more than 5-10 files.

Regarding printstream, will figure out how to use SimpleStringEncoder on a Tuple as I only need to write tuple.f2 element in the file. If you can guide me on how to do it, I would appreciate it.

Will try Arvid suggestion on increasing fs.s3a.connection.maximum . I was trying to find about these parameters and could find anywhere. Is there a place that I could look at these config params list ?


On Fri, Jun 5, 2020 at 2:13 PM Kostas Kloudas <[hidden email]> wrote:
Hi all,

@Venkata, Do you have many small files being created as Arvid suggested? If yes, then I tend to agree that S3 is probably not the best sink. Although I did not get that from your description.
In addition, instead of PrintStream you can have a look at the code of the SimpleStringEncoder in Flink [1] for a bit more efficient implementation.

Cheers,
Kostas



On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise <[hidden email]> wrote:
Hi Venkata,

are the many small files intended or is it rather an issue of our commit on checkpointing? If so then FLINK-11499 [1] should help you. Design is close to done, unfortunately implementation will not make it into 1.11.


Lastly, the way you output elements looks also a bit suspicious. PrintStream is not known for great performance. I'm also surprised that it works without manual flushing.


On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke <[hidden email]> wrote:
I think S3 is a wrong storage backend for this volumes of small messages. 
Try to use a NoSQL database or write multiple messages into one file in S3 (10000 or 100000)

If you still want to go with your scenario then try a network optimized instance and use s3a in Flink and configure s3 entropy.

Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <[hidden email]>:


Hi David,

The avg size of each file is around 30KB and I have checkpoint interval of 5 minutes. Some files are even 1 kb, because of checkpoint some files are merged into 1 big file around 300MB.

With 120 million files and 4Tb, if the rate of transfer is 300 per minute, it is taking weeks to write to s3.

I have tried to increase parallelism of sink but I dont see any improvement. 

The sink record is Tuple3<String,String,String>, the actual content of file is f2. This is content is written to <s3 bucket>/f0/f1/part*-* 

I guess the prefix determination in custombucketassigner wont be causing this delay?

Could you please shed some light on writing custom s3 sink ?

Thanks


On Sun, May 31, 2020, 6:34 AM David Magalhães <[hidden email]> wrote:
Hi Venkata. 

300 requests per minute look like a 200ms per request, which should be a normal response time to send a file if there isn't any speed limitation (how big are the files?).

Have you changed the parallelization to be higher than 1? I also recommend to limit the source parallelization, because it can consume pretty fast from Kafka and create some kind of backpressure.

I don't any much experience with StreamingFileSink, because I've ended up using a custom S3Sink, but I did have some issues writing to S3 because the request wasn't parallelised. Check this thread, http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070

On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <[hidden email]> wrote:
Hello,

I have posted the same in stackoverflow but didnt get any response. So posting it here for help.


Details:

I am working on a flink application on kubernetes(eks) which consumes data from kafka and write it to s3.

We have around 120 million xml messages of size 4TB in kafka. Consuming from kafka is super fast.

These are just string messages from kafka. 

There is a high back pressure while writing to s3. We are not even hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing only 300 writes per minute to S3 which is very slow.

I am using StreamFileSink to write to s3 with Rolling policy as OnCheckpointPolicy.

Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)

Other than this I dont have any config related to s3

    StreamingFileSink<Tuple3<String,String, String>> sink = StreamingFileSink
            .forRowFormat(new Path(s3://BUCKET),
                    (Tuple3<String,String, String> element, OutputStream stream) -> {
                        PrintStream out = new PrintStream(stream);
                        out.println(element.f2);
                    })
            // Determine component type for each record
            .withBucketAssigner(new CustomBucketAssigner())
            .withRollingPolicy(OnCheckpointRollingPolicy.build())
            .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
            .build(); 

Is there anything that we can optimize on s3 from streamfilesink or in flink-conf.xml ?

Like using bulkformat or any config params like fs.s3.maxThreads etc.

For checkpointing too I am using s3:// instead of s3p or s3a

env.setStateBackend((StateBackend) new RocksDBStateBackend(s3://checkpoint_bucket, true));
        env.enableCheckpointing(300000);



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Flink s3 streaming performance

venkata sateesh` kolluru
Thanks Arvid!

Will try to increase the property you recommended and will post the update. 

On Sat, Jun 6, 2020, 7:33 AM Arvid Heise <[hidden email]> wrote:
Hi Venkata,

you can find them on the Hadoop AWS page (we are just using it as a library) [1].


On Sat, Jun 6, 2020 at 1:26 AM venkata sateesh` kolluru <[hidden email]> wrote:
Hi Kostas and Arvid,

Thanks for your suggestions. 

The small files were already created and I am trying to roll few into a big file while sinking. But due to the custom bucket assigner, it is hard getting more files with in the same prefix in specified checkinpointing time.

For example:
<bucket>/prefix1/prefix2/YY/MM/DD/HH  is our structure in s3.
checkpointing interval is 5 minutes. prefix1 has 40 different values and prefix 2 has 10000+ values
With in the 5 minute interval, we are able to get part file size in these prefixes not more than 5-10 files.

Regarding printstream, will figure out how to use SimpleStringEncoder on a Tuple as I only need to write tuple.f2 element in the file. If you can guide me on how to do it, I would appreciate it.

Will try Arvid suggestion on increasing fs.s3a.connection.maximum . I was trying to find about these parameters and could find anywhere. Is there a place that I could look at these config params list ?


On Fri, Jun 5, 2020 at 2:13 PM Kostas Kloudas <[hidden email]> wrote:
Hi all,

@Venkata, Do you have many small files being created as Arvid suggested? If yes, then I tend to agree that S3 is probably not the best sink. Although I did not get that from your description.
In addition, instead of PrintStream you can have a look at the code of the SimpleStringEncoder in Flink [1] for a bit more efficient implementation.

Cheers,
Kostas



On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise <[hidden email]> wrote:
Hi Venkata,

are the many small files intended or is it rather an issue of our commit on checkpointing? If so then FLINK-11499 [1] should help you. Design is close to done, unfortunately implementation will not make it into 1.11.


Lastly, the way you output elements looks also a bit suspicious. PrintStream is not known for great performance. I'm also surprised that it works without manual flushing.


On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke <[hidden email]> wrote:
I think S3 is a wrong storage backend for this volumes of small messages. 
Try to use a NoSQL database or write multiple messages into one file in S3 (10000 or 100000)

If you still want to go with your scenario then try a network optimized instance and use s3a in Flink and configure s3 entropy.

Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <[hidden email]>:


Hi David,

The avg size of each file is around 30KB and I have checkpoint interval of 5 minutes. Some files are even 1 kb, because of checkpoint some files are merged into 1 big file around 300MB.

With 120 million files and 4Tb, if the rate of transfer is 300 per minute, it is taking weeks to write to s3.

I have tried to increase parallelism of sink but I dont see any improvement. 

The sink record is Tuple3<String,String,String>, the actual content of file is f2. This is content is written to <s3 bucket>/f0/f1/part*-* 

I guess the prefix determination in custombucketassigner wont be causing this delay?

Could you please shed some light on writing custom s3 sink ?

Thanks


On Sun, May 31, 2020, 6:34 AM David Magalhães <[hidden email]> wrote:
Hi Venkata. 

300 requests per minute look like a 200ms per request, which should be a normal response time to send a file if there isn't any speed limitation (how big are the files?).

Have you changed the parallelization to be higher than 1? I also recommend to limit the source parallelization, because it can consume pretty fast from Kafka and create some kind of backpressure.

I don't any much experience with StreamingFileSink, because I've ended up using a custom S3Sink, but I did have some issues writing to S3 because the request wasn't parallelised. Check this thread, http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070

On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <[hidden email]> wrote:
Hello,

I have posted the same in stackoverflow but didnt get any response. So posting it here for help.


Details:

I am working on a flink application on kubernetes(eks) which consumes data from kafka and write it to s3.

We have around 120 million xml messages of size 4TB in kafka. Consuming from kafka is super fast.

These are just string messages from kafka. 

There is a high back pressure while writing to s3. We are not even hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing only 300 writes per minute to S3 which is very slow.

I am using StreamFileSink to write to s3 with Rolling policy as OnCheckpointPolicy.

Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)

Other than this I dont have any config related to s3

    StreamingFileSink<Tuple3<String,String, String>> sink = StreamingFileSink
            .forRowFormat(new Path(s3://BUCKET),
                    (Tuple3<String,String, String> element, OutputStream stream) -> {
                        PrintStream out = new PrintStream(stream);
                        out.println(element.f2);
                    })
            // Determine component type for each record
            .withBucketAssigner(new CustomBucketAssigner())
            .withRollingPolicy(OnCheckpointRollingPolicy.build())
            .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
            .build(); 

Is there anything that we can optimize on s3 from streamfilesink or in flink-conf.xml ?

Like using bulkformat or any config params like fs.s3.maxThreads etc.

For checkpointing too I am using s3:// instead of s3p or s3a

env.setStateBackend((StateBackend) new RocksDBStateBackend(s3://checkpoint_bucket, true));
        env.enableCheckpointing(300000);



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng