Help needed to increase throughput of simple flink app

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

Help needed to increase throughput of simple flink app

ashwinkonale
Hey guys,
I am struggling to improve the throughput of my simple flink application. The target topology is this.

read_from_kafka(byte array deserializer) --rescale--> processFunction(confluent avro deserialization) -> split -> 1. data_sink,2.dlq_sink

Kafka traffic is pretty high
Partitions: 128
Traffic:  ~500k msg/s, 50Mbps.  

Flink is running on k8s writing to hdfs3. I have ~200CPU and 400G memory at hand. I have tried few configurations but I am not able to get the throughput more than 1mil per second. (Which I need for recovering from failures). I have tried increasing parallelism a lot (until 512), But it has very little impact on the throughput. Primary metric I am considering for throughput is kafka-source, numRecordsOut and message backlog. I have already increased default kafka consumer defaults like max.poll.records etc. Here are the few things I tried already.
Try0: Check raw kafka consumer throughput (kafka_source -> discarding_sink)
tm: 20, slots:4, parallelism 80
throughput: 10Mil/s

Try1: Disable chaining to introduce network related lag.
tm: 20, slots:4, parallelism 80
throughput: 1Mil/s
Also tried with increasing floating-buffers to 100, and buffers-per-channel to 64. Increasing parallelism seems to have no effect.
Observation: out/in buffers are always at 100% utilization.

After this I have tried various different things with different network configs, parallelism,jvm sizes etc. But throughput seems to be stuck at 1Mil. Can someone please help me to figure out what key metrics to look for and how can I improve the situation. Happy to provide any details needed.

Flink version: 1.11.2
Reply | Threaded
Open this post in threaded view
|

Re: Help needed to increase throughput of simple flink app

Till Rohrmann
Hi Ashwin,

Thanks for reaching out to the Flink community. Since you have tested that a kafka_source -> discarding_sink can process 10 Million records/s you might also wanna test the write throughput to data_sink and dlq_sink. Maybe these sinks are limiting your overall throughput by backpressuring the data flow. If this is not the problem, then I believe that some profiling could help pinpointing the bottleneck.

Cheers,
Till

On Sun, Nov 8, 2020 at 10:26 PM ashwin konale <[hidden email]> wrote:
Hey guys,
I am struggling to improve the throughput of my simple flink application. The target topology is this.

read_from_kafka(byte array deserializer) --rescale--> processFunction(confluent avro deserialization) -> split -> 1. data_sink,2.dlq_sink

Kafka traffic is pretty high
Partitions: 128
Traffic:  ~500k msg/s, 50Mbps.  

Flink is running on k8s writing to hdfs3. I have ~200CPU and 400G memory at hand. I have tried few configurations but I am not able to get the throughput more than 1mil per second. (Which I need for recovering from failures). I have tried increasing parallelism a lot (until 512), But it has very little impact on the throughput. Primary metric I am considering for throughput is kafka-source, numRecordsOut and message backlog. I have already increased default kafka consumer defaults like max.poll.records etc. Here are the few things I tried already.
Try0: Check raw kafka consumer throughput (kafka_source -> discarding_sink)
tm: 20, slots:4, parallelism 80
throughput: 10Mil/s

Try1: Disable chaining to introduce network related lag.
tm: 20, slots:4, parallelism 80
throughput: 1Mil/s
Also tried with increasing floating-buffers to 100, and buffers-per-channel to 64. Increasing parallelism seems to have no effect.
Observation: out/in buffers are always at 100% utilization.

After this I have tried various different things with different network configs, parallelism,jvm sizes etc. But throughput seems to be stuck at 1Mil. Can someone please help me to figure out what key metrics to look for and how can I improve the situation. Happy to provide any details needed.

Flink version: 1.11.2
Reply | Threaded
Open this post in threaded view
|

Re: Help needed to increase throughput of simple flink app

ashwinkonale
Hi Till,
Thanks a lot for the reply. The problem I am facing is as soon as I add
network(remove chaining) to discarding sink, I have huge problem with
throughput. Do you have any pointers on how can I go about debugging this ?

- Ashwin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Help needed to increase throughput of simple flink app

Kostas Kloudas-2
Hi Ashwin,

Do you have any filtering or aggregation (or any operation that emits
less data than it receives) in your logic? If yes, you could for
example put if before the reschaling operation so that it gets chained
to your source and you reduce the amount of data you ship through the
network. After that, then it boils down to optimizing your code I
guess, as Till said. Also you can check if the rescaling has any
effect, because if not, then you could also remove it.

Kostas

On Mon, Nov 9, 2020 at 10:12 AM ashwinkonale <[hidden email]> wrote:

>
> Hi Till,
> Thanks a lot for the reply. The problem I am facing is as soon as I add
> network(remove chaining) to discarding sink, I have huge problem with
> throughput. Do you have any pointers on how can I go about debugging this ?
>
> - Ashwin
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Help needed to increase throughput of simple flink app

ashwinkonale
Hi,
Thanks a lot for the reply. I added some more metrics to the pipeline to
understand bottleneck. Seems like avro deserialization introduces some
delay. With use of histogram I found processing of a single message takes
~300us(p99). ~180(p50). Which means a single slot can output at most 3000
messages per second. This essentially means, to support QPS of 3mil/s I will
need parallelism of 1000. Is my understanding correct ? Can I do anything
else apart from having so many slots in my job cluster ? Also do you have
any guides or pointers how to do such setups. eg, large number of
taskmanagers with smaller slots or bigger TMs with many slots and bigger
jvms, larger network buffers etc ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Help needed to increase throughput of simple flink app

Jaffe, Julian
One thing to check is how much you're serializing to the network. If you're using Avro Generic records without special handling you can wind up serializing the schema with every record, greatly increasing the amount of data you're sending across the wire.

On 11/9/20, 8:14 AM, "ashwinkonale" <[hidden email]> wrote:

    Hi,
    Thanks a lot for the reply. I added some more metrics to the pipeline to
    understand bottleneck. Seems like avro deserialization introduces some
    delay. With use of histogram I found processing of a single message takes
    ~300us(p99). ~180(p50). Which means a single slot can output at most 3000
    messages per second. This essentially means, to support QPS of 3mil/s I will
    need parallelism of 1000. Is my understanding correct ? Can I do anything
    else apart from having so many slots in my job cluster ? Also do you have
    any guides or pointers how to do such setups. eg, large number of
    taskmanagers with smaller slots or bigger TMs with many slots and bigger
    jvms, larger network buffers etc ?



    --
    Sent from: https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwICAg&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=JLzzDt63U24H1L2TG-WER0CKB0WbqSbr0WnC6dIIwS4&s=vGEnh77tTs1Mdynjks6LhXUaNZRRBvj3pS5es-Bg3cI&e= 

Reply | Threaded
Open this post in threaded view
|

Re: Help needed to increase throughput of simple flink app

ashwinkonale
Hey,
I am reading messages with schema id and using confluent schema registry to
deserialize to Genericrecord. After this point, pipelineline will have this
objects moving across. Can you give me some examples of `special handling of
avro messages` you mentioned ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Help needed to increase throughput of simple flink app

rmetzger0
Hi,
from my experience serialization contributes a lot to the maximum achievable throughput. I can strongly recommend checking out this blog post, which has a lot of details on the topic: https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html

On Tue, Nov 10, 2020 at 9:46 AM ashwinkonale <[hidden email]> wrote:
Hey,
I am reading messages with schema id and using confluent schema registry to
deserialize to Genericrecord. After this point, pipelineline will have this
objects moving across. Can you give me some examples of `special handling of
avro messages` you mentioned ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Help needed to increase throughput of simple flink app

ashwinkonale
Hi,
Thanks a lot for the reply. And you both are right. Serializing
GenericRecord without specifying schema was indeed a HUGE bottleneck in my
app. I got to know it through jfr analysis and then read the blog post you
mentioned. Now I am able to pump in lot more data per second. (In my test
setup atleast). I am going to try this with kafka.
But now it poses me a problem, that my app cannot handle schema changes
automatically since at the startup flink needs to know schema. If there is a
backward compatible change in upstream, new messages will not be read
properly. Do you know any workarounds for this ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Help needed to increase throughput of simple flink app

Arvid Heise-3
The common solution is to use a schema registry, like Confluent schema registry [1]. All records have a small 5 byte prefix that identifies the schema and that gets fetched by deserializer [2]. Here are some resources on how to properly secure communication if needed [3].


On Thu, Nov 12, 2020 at 10:11 AM ashwinkonale <[hidden email]> wrote:
Hi,
Thanks a lot for the reply. And you both are right. Serializing
GenericRecord without specifying schema was indeed a HUGE bottleneck in my
app. I got to know it through jfr analysis and then read the blog post you
mentioned. Now I am able to pump in lot more data per second. (In my test
setup atleast). I am going to try this with kafka.
But now it poses me a problem, that my app cannot handle schema changes
automatically since at the startup flink needs to know schema. If there is a
backward compatible change in upstream, new messages will not be read
properly. Do you know any workarounds for this ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Help needed to increase throughput of simple flink app

ashwinkonale
Hi Arvid,
Thanks a lot for your reply. And yes, we do use confluent schema registry
extensively. But the `ConfluentRegistryAvroDeserializationSchema` expects
reader schema to be provided. That means it reads the message using writer
schema and converts to reader schema. But this is not what I want always. If
I have messages of different schema in the same topic, I cannot apply
`ConfluentRegistryAvroDeserializationSchema` correct ? I also came across
this  question
<https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without>
. I am also doing the same thing in my pipeline by providing custom
deserialiser using confluentSchemaRegistryClient. So as far as I understood,
in this usecase there is no way to tell flink about
`GenericRecordAvroTypeInfo` of the genericRecord which comes out of source
function. Please tell me if my understanding is correct.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Help needed to increase throughput of simple flink app

Arvid Heise-3
If you follow the best practices, then topics should never have different schemas as you can't enforce schema compatibility. You also have very limited processing capabilities and clumsy workflows attached to it.
If you want to encode different kinds of events, then the common approach is to use some kind of envelope schema where different event types are encoded as optional fields.

If you want to stick with your custom approach, then you probably want to implement your own AvroDeserializationSchema that reuses the existing CachedSchemaCoderProvider. If you check the code of RegistryAvroDeserializationSchema, you will notice that the actual implementation is rather slim.

@Override
public T deserialize(byte[] message) throws IOException {
checkAvroInitialized();
getInputStream().setBuffer(message);
Schema writerSchema = schemaCoder.readSchema(getInputStream());

GenericDatumReader<T> datumReader = getDatumReader();

datumReader.setSchema(writerSchema);
datumReader.setExpected(writerSchema); // <-- the difference

return datumReader.read(null, getDecoder());
}

On Thu, Nov 12, 2020 at 1:42 PM ashwinkonale <[hidden email]> wrote:
Hi Arvid,
Thanks a lot for your reply. And yes, we do use confluent schema registry
extensively. But the `ConfluentRegistryAvroDeserializationSchema` expects
reader schema to be provided. That means it reads the message using writer
schema and converts to reader schema. But this is not what I want always. If
I have messages of different schema in the same topic, I cannot apply
`ConfluentRegistryAvroDeserializationSchema` correct ? I also came across
this  question
<https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without>
. I am also doing the same thing in my pipeline by providing custom
deserialiser using confluentSchemaRegistryClient. So as far as I understood,
in this usecase there is no way to tell flink about
`GenericRecordAvroTypeInfo` of the genericRecord which comes out of source
function. Please tell me if my understanding is correct.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Help needed to increase throughput of simple flink app

ashwinkonale
So in this case, flink will fall back to default kyro serialiser right ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Help needed to increase throughput of simple flink app

Arvid Heise-3
You need to differentiate two serialization abstractions (which I guess you already know). One is coming from reading the source, where the DeserializationSchema is used, and it translates the bytes of Kafka into something that Flink can handle.

The second serialization occurs within Flink through the TypeSerializer, such that Flink can pass data from one subtask to another subtask. That's why your custom DeserializationSchema would need to provide TypeInformation, which allows Flink to pick the TypeSerializer.

Now you would probably not be able to provide a consistent TypeInformation for arbitrary types and Flink has to fall back to Kryo as you said. A solution is to also provide a custom TypeSerializer that uses the Schema registry (I wouldn't go the route of GenericRecords with schema again).

Note that because of the missing TypeInformation, you will never be able to use Table API or SQL. If you ask me that's quite a bit of drawbacks coming from that approach (no schema enforcement, no proper schema evolution support, no schema compability enforcement, custom serializers, and clumsy code using lots of string-based field accesses and casts).

---

I forgot to highlight another rather simple approach that works on very generic workflows with few operations quite well: use byte[]. So DeserializationSchema works as trivial as it sounds. You pass byte[] all along until you have your FlatMap (assuming you are doing some filtering validation) and only inside this flatmap you deserialize into Avro, do your custom logic, and serialize it again into byte[]. You can use Table API / SQL later on with UDFs that do the same thing. Using byte[] as the internal serialization format of Flink is also blazingly fast (there is not much to do except adding a header). The only downside is that you need to deserialize manually in each operator, but that can usually be factored out.

I'd still recommend looking into using only one schema that captures all events as subschemas.

On Thu, Nov 12, 2020 at 4:15 PM ashwinkonale <[hidden email]> wrote:
So in this case, flink will fall back to default kyro serialiser right ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Help needed to increase throughput of simple flink app

ashwinkonale
Hi Arvid,
Thank you so much for your detailed reply. I think I will go with one schema
per topic using GenericRecordAvroTypeInfo for genericRecords and not do any
custom magic.
----
Approach of sending records as byte array also seems quite interesting.
Right now I am deserializing avro records so that I can pass it to
StreamingFileSink's AvroWriters(Which accepts only avro objects) so that it
merges bunch of avro records before dumping to sink. It seems unnecessary
for me, since there could be some bulk writer implementation which could do
this at byte level itself. Do you know any of such implementations ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/