Hey guys,
I am struggling to improve the throughput of my simple flink application. The target topology is this. read_from_kafka(byte array deserializer) --rescale--> processFunction(confluent avro deserialization) -> split -> 1. data_sink,2.dlq_sink Kafka traffic is pretty high Partitions: 128 Traffic: ~500k msg/s, 50Mbps. Flink is running on k8s writing to hdfs3. I have ~200CPU and 400G memory at hand. I have tried few configurations but I am not able to get the throughput more than 1mil per second. (Which I need for recovering from failures). I have tried increasing parallelism a lot (until 512), But it has very little impact on the throughput. Primary metric I am considering for throughput is kafka-source, numRecordsOut and message backlog. I have already increased default kafka consumer defaults like max.poll.records etc. Here are the few things I tried already. Try0: Check raw kafka consumer throughput (kafka_source -> discarding_sink) tm: 20, slots:4, parallelism 80 throughput: 10Mil/s Try1: Disable chaining to introduce network related lag. tm: 20, slots:4, parallelism 80 throughput: 1Mil/s Also tried with increasing floating-buffers to 100, and buffers-per-channel to 64. Increasing parallelism seems to have no effect. Observation: out/in buffers are always at 100% utilization. After this I have tried various different things with different network configs, parallelism,jvm sizes etc. But throughput seems to be stuck at 1Mil. Can someone please help me to figure out what key metrics to look for and how can I improve the situation. Happy to provide any details needed. Flink version: 1.11.2 |
Hi Ashwin, Thanks for reaching out to the Flink community. Since you have tested that a kafka_source -> discarding_sink can process 10 Million records/s you might also wanna test the write throughput to data_sink and dlq_sink. Maybe these sinks are limiting your overall throughput by backpressuring the data flow. If this is not the problem, then I believe that some profiling could help pinpointing the bottleneck. Cheers, Till On Sun, Nov 8, 2020 at 10:26 PM ashwin konale <[hidden email]> wrote:
|
Hi Till,
Thanks a lot for the reply. The problem I am facing is as soon as I add network(remove chaining) to discarding sink, I have huge problem with throughput. Do you have any pointers on how can I go about debugging this ? - Ashwin -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Ashwin,
Do you have any filtering or aggregation (or any operation that emits less data than it receives) in your logic? If yes, you could for example put if before the reschaling operation so that it gets chained to your source and you reduce the amount of data you ship through the network. After that, then it boils down to optimizing your code I guess, as Till said. Also you can check if the rescaling has any effect, because if not, then you could also remove it. Kostas On Mon, Nov 9, 2020 at 10:12 AM ashwinkonale <[hidden email]> wrote: > > Hi Till, > Thanks a lot for the reply. The problem I am facing is as soon as I add > network(remove chaining) to discarding sink, I have huge problem with > throughput. Do you have any pointers on how can I go about debugging this ? > > - Ashwin > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
Thanks a lot for the reply. I added some more metrics to the pipeline to understand bottleneck. Seems like avro deserialization introduces some delay. With use of histogram I found processing of a single message takes ~300us(p99). ~180(p50). Which means a single slot can output at most 3000 messages per second. This essentially means, to support QPS of 3mil/s I will need parallelism of 1000. Is my understanding correct ? Can I do anything else apart from having so many slots in my job cluster ? Also do you have any guides or pointers how to do such setups. eg, large number of taskmanagers with smaller slots or bigger TMs with many slots and bigger jvms, larger network buffers etc ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
One thing to check is how much you're serializing to the network. If you're using Avro Generic records without special handling you can wind up serializing the schema with every record, greatly increasing the amount of data you're sending across the wire.
On 11/9/20, 8:14 AM, "ashwinkonale" <[hidden email]> wrote: Hi, Thanks a lot for the reply. I added some more metrics to the pipeline to understand bottleneck. Seems like avro deserialization introduces some delay. With use of histogram I found processing of a single message takes ~300us(p99). ~180(p50). Which means a single slot can output at most 3000 messages per second. This essentially means, to support QPS of 3mil/s I will need parallelism of 1000. Is my understanding correct ? Can I do anything else apart from having so many slots in my job cluster ? Also do you have any guides or pointers how to do such setups. eg, large number of taskmanagers with smaller slots or bigger TMs with many slots and bigger jvms, larger network buffers etc ? -- Sent from: https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwICAg&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=JLzzDt63U24H1L2TG-WER0CKB0WbqSbr0WnC6dIIwS4&s=vGEnh77tTs1Mdynjks6LhXUaNZRRBvj3pS5es-Bg3cI&e= |
Hey,
I am reading messages with schema id and using confluent schema registry to deserialize to Genericrecord. After this point, pipelineline will have this objects moving across. Can you give me some examples of `special handling of avro messages` you mentioned ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, from my experience serialization contributes a lot to the maximum achievable throughput. I can strongly recommend checking out this blog post, which has a lot of details on the topic: https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html On Tue, Nov 10, 2020 at 9:46 AM ashwinkonale <[hidden email]> wrote: Hey, |
Hi,
Thanks a lot for the reply. And you both are right. Serializing GenericRecord without specifying schema was indeed a HUGE bottleneck in my app. I got to know it through jfr analysis and then read the blog post you mentioned. Now I am able to pump in lot more data per second. (In my test setup atleast). I am going to try this with kafka. But now it poses me a problem, that my app cannot handle schema changes automatically since at the startup flink needs to know schema. If there is a backward compatible change in upstream, new messages will not be read properly. Do you know any workarounds for this ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
The common solution is to use a schema registry, like Confluent schema registry [1]. All records have a small 5 byte prefix that identifies the schema and that gets fetched by deserializer [2]. Here are some resources on how to properly secure communication if needed [3]. On Thu, Nov 12, 2020 at 10:11 AM ashwinkonale <[hidden email]> wrote: Hi, -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Hi Arvid,
Thanks a lot for your reply. And yes, we do use confluent schema registry extensively. But the `ConfluentRegistryAvroDeserializationSchema` expects reader schema to be provided. That means it reads the message using writer schema and converts to reader schema. But this is not what I want always. If I have messages of different schema in the same topic, I cannot apply `ConfluentRegistryAvroDeserializationSchema` correct ? I also came across this question <https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without> . I am also doing the same thing in my pipeline by providing custom deserialiser using confluentSchemaRegistryClient. So as far as I understood, in this usecase there is no way to tell flink about `GenericRecordAvroTypeInfo` of the genericRecord which comes out of source function. Please tell me if my understanding is correct. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
If you follow the best practices, then topics should never have different schemas as you can't enforce schema compatibility. You also have very limited processing capabilities and clumsy workflows attached to it. If you want to encode different kinds of events, then the common approach is to use some kind of envelope schema where different event types are encoded as optional fields. If you want to stick with your custom approach, then you probably want to implement your own AvroDeserializationSchema that reuses the existing CachedSchemaCoderProvider. If you check the code of RegistryAvroDeserializationSchema, you will notice that the actual implementation is rather slim. @Override On Thu, Nov 12, 2020 at 1:42 PM ashwinkonale <[hidden email]> wrote: Hi Arvid, -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
So in this case, flink will fall back to default kyro serialiser right ?
-- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
You need to differentiate two serialization abstractions (which I guess you already know). One is coming from reading the source, where the DeserializationSchema is used, and it translates the bytes of Kafka into something that Flink can handle. The second serialization occurs within Flink through the TypeSerializer, such that Flink can pass data from one subtask to another subtask. That's why your custom DeserializationSchema would need to provide TypeInformation, which allows Flink to pick the TypeSerializer. Now you would probably not be able to provide a consistent TypeInformation for arbitrary types and Flink has to fall back to Kryo as you said. A solution is to also provide a custom TypeSerializer that uses the Schema registry (I wouldn't go the route of GenericRecords with schema again). Note that because of the missing TypeInformation, you will never be able to use Table API or SQL. If you ask me that's quite a bit of drawbacks coming from that approach (no schema enforcement, no proper schema evolution support, no schema compability enforcement, custom serializers, and clumsy code using lots of string-based field accesses and casts). --- I forgot to highlight another rather simple approach that works on very generic workflows with few operations quite well: use byte[]. So DeserializationSchema works as trivial as it sounds. You pass byte[] all along until you have your FlatMap (assuming you are doing some filtering validation) and only inside this flatmap you deserialize into Avro, do your custom logic, and serialize it again into byte[]. You can use Table API / SQL later on with UDFs that do the same thing. Using byte[] as the internal serialization format of Flink is also blazingly fast (there is not much to do except adding a header). The only downside is that you need to deserialize manually in each operator, but that can usually be factored out. I'd still recommend looking into using only one schema that captures all events as subschemas. On Thu, Nov 12, 2020 at 4:15 PM ashwinkonale <[hidden email]> wrote: So in this case, flink will fall back to default kyro serialiser right ? -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Hi Arvid,
Thank you so much for your detailed reply. I think I will go with one schema per topic using GenericRecordAvroTypeInfo for genericRecords and not do any custom magic. ---- Approach of sending records as byte array also seems quite interesting. Right now I am deserializing avro records so that I can pass it to StreamingFileSink's AvroWriters(Which accepts only avro objects) so that it merges bunch of avro records before dumping to sink. It seems unnecessary for me, since there could be some bulk writer implementation which could do this at byte level itself. Do you know any of such implementations ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |