Hi,
I'm running Flink v1.9. I backported the commit adding serialization support for Confluent's schema registry[1]. Using the code as is, I saw a nearly 50% drop in peak throughput for my job compared to using AvroRowSerializationSchema. Looking at the code, RegistryAvroSerializationSchema.serialize() executes: public byte[] serialize(T object) { checkAvroInitialized(); if (object == null) { return null; } else { try { Encoder encoder = getEncoder(); schemaCoderProvider.get() .writeSchema(getSchema(), getOutputStream()); getDatumWriter().write(object, encoder); encoder.flush(); byte[] bytes = getOutputStream().toByteArray(); getOutputStream().reset(); return bytes; } catch (IOException e) { throw new WrappingRuntimeException("Failed to serialize schema registry.", e); } } } For every single message. ConfluentSchemaRegistryCoder.writeSchema() attempts to register the schema. public void writeSchema(Schema schema, OutputStream out) throws IOException { try { int registeredId = schemaRegistryClient.register(subject, schema); out.write(CONFLUENT_MAGIC_BYTE); byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(registeredId).array(); out.write(schemaIdBytes); } catch (RestClientException e) { throw new IOException("Could not register schema in registry", e); } } It's making an HTTP request to the Schema Registry for every single message. Since the output schema does not change over the course of a streaming job, it seems you should only need to register the schema once. I moved the schema registration call into RegistryAvroSerializationSchema.checkAvroInitialized() and added a helper function to add the magic byte and schema id bytes to be called from RegistryAvroSerializationSchema.serialize(). After this change, the jobs performance returned to comparable levels to using AvroRowSerializationSchema. Am I right in thinking this was perhaps a design flaw and not intentionally done? |
Hi, thanks a lot for your message. It's certainly not intentional to do a HTTP request for every single message :) Isn't the schemaRegistryClient an instance of CachedSchemaRegistryClient, which, as the name says, caches? Can you check with a debugger at runtime what registry client is used, and if there are indeed no cache hits? Alternatively, you could check the log of the schema registry service. Best, Robert On Tue, Feb 4, 2020 at 7:13 AM Steve Whelan <[hidden email]> wrote:
|
Hi Steve, I think your observation is correct. If I am not mistaken we
should use schemaRegistryClient.getId(subject, schema); instead
of I created an issue to track it https://issues.apache.org/jira/browse/FLINK-15941 Would you maybe like to check it and prepare a fix for it ;) ? Best, Dawid
On 06/02/2020 16:11, Robert Metzger
wrote:
signature.asc (849 bytes) Download Attachment |
Robert, You are correct that it is using a CachedSchemaRegistryClient object. Therefore, schemaRegistryClient.register() should be checking the cache first before sending a request to the Registry. However, turning on debug logging of my Registry, I can see a request being sent for every serialized message. Therefore, this meant the cache in schemaRegistryClient.register() was empty. By adding some more debug logging, I think I found the issue within RegistryAvroSerializationSchema.serialize(): public byte[] serialize(T object) { checkAvroInitialized(); if (object == null) { return null; } else { try { Encoder encoder = getEncoder(); schemaCoderProvider.get() .writeSchema(getSchema(), getOutputStream()); // get() Creates a new instance of {@link SchemaCoder}[1] getDatumWriter().write(object, encoder); encoder.flush(); byte[] bytes = getOutputStream().toByteArray(); getOutputStream().reset(); return bytes; } catch (IOException e) { throw new WrappingRuntimeException("Failed to serialize schema registry.", e); } } } This schemaCoderProvider.get() call is creating a new instance of SchemaCoder every time, instead of using the one that was instantiated inside RegistryAvroSerializationSchema.checkAvroInitialized(). This means, we get an object with a new cache every time (i.e. its empty and schemaRegistryClient.register() falls back to an HTTP request to the Registry). Simply changing the above line to: schemaCoder.writeSchema(getSchema(), getOutputStream()); solved the issue. Since RegistryAvroSerializationSchema.checkAvroInitialized() is called first inside RegistryAvroSerializationSchema.serializer(), we do not have to worry about the schemaCoder object being null. Happy to open a PR for the ticket created if this makes sense. On Thu, Feb 6, 2020 at 10:38 AM Dawid Wysakowicz <[hidden email]> wrote:
|
Steve, thanks a lot for looking into this closer! Let's discuss the resolution of the issue in the ticket Dawid has created: https://issues.apache.org/jira/browse/FLINK-15941 Best, Robert On Thu, Feb 6, 2020 at 6:59 PM Steve Whelan <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |