Broadcasting control messages to a sink

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Broadcasting control messages to a sink

Jaffe, Julian

Hey all,

 

I’m building a Flink app that pulls in messages from a Kafka topic and writes them out to disk using a custom bucketed sink. Each message needs to be parsed using a schema that is also needed when writing in the sink. This schema is read from a remote file on a distributed file system (it could also be fetched from a service). The schema will be updated very infrequently.

 

In order to support schema evolution, I have created a custom source that occasionally polls for updates and if it finds one parses the new schema and sends a message containing the serialized schema. I’ve connected these two streams and then use a RichCoFlatMapFunction to flatten them back into a single output stream (schema events get used to update the parser, messages get parsed using the parser and emitted).

 

However, I need some way to communicate the updated schema to every task of the sink. Simply emitting a control message that is ignored when writing to disk means that only one sink partition will receive the message and thus update the schema. I thought about sending the control message as side output and then broadcasting the resulting stream to the sink alongside the processed event input but I couldn’t figure out a way to do so. For now, I’m bundling the schema used to parse each event with the event, storing the schema in the sink, and then checking every event’s schema against the stored schema but this is fairly inefficient. Also, I’d like to eventually increase the types of control messages I can send to the sink, some of which may not be idempotent. Is there a better way to handle this pattern?


(Bonus question: ideally, I’d like to be able to perform an action when all sink partitions have picked up the new schema. I’m not aware of any way to emit metadata of this sort from Flink tasks beyond abusing the metrics system. This approach still leaves open the possibility of tasks picking up the new schema and then crashing for unrelated reasons thus inflating the count of tasks using a specific schema and moreover requires tracking at least the current level of parallelism and probably also Flink task state outside of Flink. Are there any patterns for reporting metadata like this to the job manager?)

 

I’m using Flink 1.8.

Reply | Threaded
Open this post in threaded view
|

Re: Broadcasting control messages to a sink

Piotr Nowojski-4
Hi Julian,

Have you seen Broadcast State [1]? I have never used it personally, but it sounds like something you want. Maybe your job should look like:

1. read raw messages from Kafka, without using the schema
2. read schema changes and broadcast them to 3. and 5.
3. deserialize kafka records in BroadcastProcessFunction by using combined 1. and 2.
4. do your logic o
5. serialize records using schema in another BroadcastProcessFunction by using combined 4. and 2.
6. write raw records using BucketingSink
?

Best,
Piotrek


śr., 14 paź 2020 o 11:01 Jaffe, Julian <[hidden email]> napisał(a):

Hey all,

 

I’m building a Flink app that pulls in messages from a Kafka topic and writes them out to disk using a custom bucketed sink. Each message needs to be parsed using a schema that is also needed when writing in the sink. This schema is read from a remote file on a distributed file system (it could also be fetched from a service). The schema will be updated very infrequently.

 

In order to support schema evolution, I have created a custom source that occasionally polls for updates and if it finds one parses the new schema and sends a message containing the serialized schema. I’ve connected these two streams and then use a RichCoFlatMapFunction to flatten them back into a single output stream (schema events get used to update the parser, messages get parsed using the parser and emitted).

 

However, I need some way to communicate the updated schema to every task of the sink. Simply emitting a control message that is ignored when writing to disk means that only one sink partition will receive the message and thus update the schema. I thought about sending the control message as side output and then broadcasting the resulting stream to the sink alongside the processed event input but I couldn’t figure out a way to do so. For now, I’m bundling the schema used to parse each event with the event, storing the schema in the sink, and then checking every event’s schema against the stored schema but this is fairly inefficient. Also, I’d like to eventually increase the types of control messages I can send to the sink, some of which may not be idempotent. Is there a better way to handle this pattern?


(Bonus question: ideally, I’d like to be able to perform an action when all sink partitions have picked up the new schema. I’m not aware of any way to emit metadata of this sort from Flink tasks beyond abusing the metrics system. This approach still leaves open the possibility of tasks picking up the new schema and then crashing for unrelated reasons thus inflating the count of tasks using a specific schema and moreover requires tracking at least the current level of parallelism and probably also Flink task state outside of Flink. Are there any patterns for reporting metadata like this to the job manager?)

 

I’m using Flink 1.8.

Reply | Threaded
Open this post in threaded view
|

Re: Broadcasting control messages to a sink

Jaffe, Julian

Thanks for the suggestion Piotr!

 

The problem is that the sink needs to have access to the schema (so that it can write the schema only once per file instead of record) and thus needs to know when the schema has been updated. In this proposed architecture, I think the sink would still need to check each record to see if the current schema matches the new record or not? The main problem I encountered when playing around with broadcast state was that I couldn’t figure out how to access the broadcast state within the sink, but perhaps I just haven’t thought about it the right way. I’ll meditate on the docs further  🙂

 

Julian

 

From: Piotr Nowojski <[hidden email]>
Date: Wednesday, October 14, 2020 at 6:35 AM
To: "Jaffe, Julian" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Broadcasting control messages to a sink

 

Hi Julian,

 

Have you seen Broadcast State [1]? I have never used it personally, but it sounds like something you want. Maybe your job should look like:

 

1. read raw messages from Kafka, without using the schema

2. read schema changes and broadcast them to 3. and 5.

3. deserialize kafka records in BroadcastProcessFunction by using combined 1. and 2.

4. do your logic o

5. serialize records using schema in another BroadcastProcessFunction by using combined 4. and 2.

6. write raw records using BucketingSink

?

 

Best,

Piotrek

 

 

śr., 14 paź 2020 o 11:01 Jaffe, Julian <[hidden email]> napisał(a):

Hey all,

 

I’m building a Flink app that pulls in messages from a Kafka topic and writes them out to disk using a custom bucketed sink. Each message needs to be parsed using a schema that is also needed when writing in the sink. This schema is read from a remote file on a distributed file system (it could also be fetched from a service). The schema will be updated very infrequently.

 

In order to support schema evolution, I have created a custom source that occasionally polls for updates and if it finds one parses the new schema and sends a message containing the serialized schema. I’ve connected these two streams and then use a RichCoFlatMapFunction to flatten them back into a single output stream (schema events get used to update the parser, messages get parsed using the parser and emitted).

 

However, I need some way to communicate the updated schema to every task of the sink. Simply emitting a control message that is ignored when writing to disk means that only one sink partition will receive the message and thus update the schema. I thought about sending the control message as side output and then broadcasting the resulting stream to the sink alongside the processed event input but I couldn’t figure out a way to do so. For now, I’m bundling the schema used to parse each event with the event, storing the schema in the sink, and then checking every event’s schema against the stored schema but this is fairly inefficient. Also, I’d like to eventually increase the types of control messages I can send to the sink, some of which may not be idempotent. Is there a better way to handle this pattern?


(Bonus question: ideally, I’d like to be able to perform an action when all sink partitions have picked up the new schema. I’m not aware of any way to emit metadata of this sort from Flink tasks beyond abusing the metrics system. This approach still leaves open the possibility of tasks picking up the new schema and then crashing for unrelated reasons thus inflating the count of tasks using a specific schema and moreover requires tracking at least the current level of parallelism and probably also Flink task state outside of Flink. Are there any patterns for reporting metadata like this to the job manager?)

 

I’m using Flink 1.8.

Reply | Threaded
Open this post in threaded view
|

Re: Broadcasting control messages to a sink

Piotr Nowojski-4
Hi Julian,

I think the problem is that BroadcastProcessFunction and SinkFunction will be executed by separate operators, so they won't be able to share state. If you can not split your logic into two, I think you will have to workaround this problem differently.

1. Relay on operator chaining and wire both of them together.

If you set up your BroadcastProcessFunction and SinkFunction one after another, with the same parallelism, with the default chaining, without any rebalance/keyBy in between, you can be sure they will be chained together. So the output type of your record between BroadcastProcessFunction and SinkFunction, can be a Union type, of a) your actual payload, b) broadcasted message. Upon initialization/before processing first record, if you have any broadcast state, you would need to forward it's content to the downstream SinkFunction as well.

2. Another solution is that maybe you can try to embed SinkFunction inside the BroadcastProcessFunction? This will require some careful proxying and wrapping calls.
3. As always, you can also write a custom operator that will be doing the same thing.

For the 2. and 3. I'm not entirely sure if there are some gotchas that I haven't thought through (state handling?), so if you can make 1. work for you, it will probably be a safer route.

Best,
Piotrek




śr., 14 paź 2020 o 19:42 Jaffe, Julian <[hidden email]> napisał(a):

Thanks for the suggestion Piotr!

 

The problem is that the sink needs to have access to the schema (so that it can write the schema only once per file instead of record) and thus needs to know when the schema has been updated. In this proposed architecture, I think the sink would still need to check each record to see if the current schema matches the new record or not? The main problem I encountered when playing around with broadcast state was that I couldn’t figure out how to access the broadcast state within the sink, but perhaps I just haven’t thought about it the right way. I’ll meditate on the docs further  🙂

 

Julian

 

From: Piotr Nowojski <[hidden email]>
Date: Wednesday, October 14, 2020 at 6:35 AM
To: "Jaffe, Julian" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Broadcasting control messages to a sink

 

Hi Julian,

 

Have you seen Broadcast State [1]? I have never used it personally, but it sounds like something you want. Maybe your job should look like:

 

1. read raw messages from Kafka, without using the schema

2. read schema changes and broadcast them to 3. and 5.

3. deserialize kafka records in BroadcastProcessFunction by using combined 1. and 2.

4. do your logic o

5. serialize records using schema in another BroadcastProcessFunction by using combined 4. and 2.

6. write raw records using BucketingSink

?

 

Best,

Piotrek

 

 

śr., 14 paź 2020 o 11:01 Jaffe, Julian <[hidden email]> napisał(a):

Hey all,

 

I’m building a Flink app that pulls in messages from a Kafka topic and writes them out to disk using a custom bucketed sink. Each message needs to be parsed using a schema that is also needed when writing in the sink. This schema is read from a remote file on a distributed file system (it could also be fetched from a service). The schema will be updated very infrequently.

 

In order to support schema evolution, I have created a custom source that occasionally polls for updates and if it finds one parses the new schema and sends a message containing the serialized schema. I’ve connected these two streams and then use a RichCoFlatMapFunction to flatten them back into a single output stream (schema events get used to update the parser, messages get parsed using the parser and emitted).

 

However, I need some way to communicate the updated schema to every task of the sink. Simply emitting a control message that is ignored when writing to disk means that only one sink partition will receive the message and thus update the schema. I thought about sending the control message as side output and then broadcasting the resulting stream to the sink alongside the processed event input but I couldn’t figure out a way to do so. For now, I’m bundling the schema used to parse each event with the event, storing the schema in the sink, and then checking every event’s schema against the stored schema but this is fairly inefficient. Also, I’d like to eventually increase the types of control messages I can send to the sink, some of which may not be idempotent. Is there a better way to handle this pattern?


(Bonus question: ideally, I’d like to be able to perform an action when all sink partitions have picked up the new schema. I’m not aware of any way to emit metadata of this sort from Flink tasks beyond abusing the metrics system. This approach still leaves open the possibility of tasks picking up the new schema and then crashing for unrelated reasons thus inflating the count of tasks using a specific schema and moreover requires tracking at least the current level of parallelism and probably also Flink task state outside of Flink. Are there any patterns for reporting metadata like this to the job manager?)

 

I’m using Flink 1.8.

Reply | Threaded
Open this post in threaded view
|

Re: Broadcasting control messages to a sink

anuj.aj07
In reply to this post by Jaffe, Julian
Hi Jaffe,

I am also working on something similar type of a problem. 

I am receiving a set of events in Avro format on different topics. I want to consume these and write to s3 in parquet format. 
I have written a  job that creates a different stream for each event and fetches its schema from the confluent schema registry to create a parquet sink for an event.   
This is working fine but the only problem I am facing is whenever a new event start coming or any change in the schema  I have to change in the YAML config and restart the job every time. Is there any way I do not have to restart the job and it starts consuming a new set of events?

As I see you are handling schema evolution can u help me with this and also how can I handle the new events.  In the config, I am keeping a mapping of events and schema subjects.  Please share how you solving this. 


So currently this is the way I am doing it but wanna know some better way to handle it. 
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

try {
for (EventConfig eventConfig : eventTypesList) {

LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), registryClient)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

DataStream<GenericRecord> outStream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord != null && genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
return true;
}
return false;
});
outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

}
} catch (Exception e) {
e.printStackTrace();
}

On Wed, Oct 14, 2020, 23:12 Jaffe, Julian <[hidden email]> wrote:

Thanks for the suggestion Piotr!

 

The problem is that the sink needs to have access to the schema (so that it can write the schema only once per file instead of record) and thus needs to know when the schema has been updated. In this proposed architecture, I think the sink would still need to check each record to see if the current schema matches the new record or not? The main problem I encountered when playing around with broadcast state was that I couldn’t figure out how to access the broadcast state within the sink, but perhaps I just haven’t thought about it the right way. I’ll meditate on the docs further  🙂

 

Julian

 

From: Piotr Nowojski <[hidden email]>
Date: Wednesday, October 14, 2020 at 6:35 AM
To: "Jaffe, Julian" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Broadcasting control messages to a sink

 

Hi Julian,

 

Have you seen Broadcast State [1]? I have never used it personally, but it sounds like something you want. Maybe your job should look like:

 

1. read raw messages from Kafka, without using the schema

2. read schema changes and broadcast them to 3. and 5.

3. deserialize kafka records in BroadcastProcessFunction by using combined 1. and 2.

4. do your logic o

5. serialize records using schema in another BroadcastProcessFunction by using combined 4. and 2.

6. write raw records using BucketingSink

?

 

Best,

Piotrek

 

 

śr., 14 paź 2020 o 11:01 Jaffe, Julian <[hidden email]> napisał(a):

Hey all,

 

I’m building a Flink app that pulls in messages from a Kafka topic and writes them out to disk using a custom bucketed sink. Each message needs to be parsed using a schema that is also needed when writing in the sink. This schema is read from a remote file on a distributed file system (it could also be fetched from a service). The schema will be updated very infrequently.

 

In order to support schema evolution, I have created a custom source that occasionally polls for updates and if it finds one parses the new schema and sends a message containing the serialized schema. I’ve connected these two streams and then use a RichCoFlatMapFunction to flatten them back into a single output stream (schema events get used to update the parser, messages get parsed using the parser and emitted).

 

However, I need some way to communicate the updated schema to every task of the sink. Simply emitting a control message that is ignored when writing to disk means that only one sink partition will receive the message and thus update the schema. I thought about sending the control message as side output and then broadcasting the resulting stream to the sink alongside the processed event input but I couldn’t figure out a way to do so. For now, I’m bundling the schema used to parse each event with the event, storing the schema in the sink, and then checking every event’s schema against the stored schema but this is fairly inefficient. Also, I’d like to eventually increase the types of control messages I can send to the sink, some of which may not be idempotent. Is there a better way to handle this pattern?


(Bonus question: ideally, I’d like to be able to perform an action when all sink partitions have picked up the new schema. I’m not aware of any way to emit metadata of this sort from Flink tasks beyond abusing the metrics system. This approach still leaves open the possibility of tasks picking up the new schema and then crashing for unrelated reasons thus inflating the count of tasks using a specific schema and moreover requires tracking at least the current level of parallelism and probably also Flink task state outside of Flink. Are there any patterns for reporting metadata like this to the job manager?)

 

I’m using Flink 1.8.

Reply | Threaded
Open this post in threaded view
|

Re: Broadcasting control messages to a sink

Jaffe, Julian

Hey AJ,

 

I’m not familiar with the stock parquet sink, but if it requires a schema on creation you won’t be able to change the output schema without restarting the job. I’m using a custom sink that can update the schema it uses. The problem I’m facing is how to communicate those updates in an efficient way. Currently I’m checking every record for differences with the stored schema which is both a lot of overhead and creates discrepancies in when partitions update the schema they use to write and when a schema can be marked as “in use”.

 

For your use case, the analogous approach would be replace the Parquet sink with a custom sink that managed the lifecycle of the underlying parquet writer itself. Then you could control closing the current writer and creating a new one with an updated schema yourself and thus do it in code instead of via a restart.

 

Julian

 

From: aj <[hidden email]>
Date: Thursday, October 15, 2020 at 4:12 AM
To: "Jaffe, Julian" <[hidden email]>
Cc: Piotr Nowojski <[hidden email]>, user <[hidden email]>
Subject: Re: Broadcasting control messages to a sink

 

Hi Jaffe,

 

I am also working on something similar type of a problem. 

 

I am receiving a set of events in Avro format on different topics. I want to consume these and write to s3 in parquet format. 

I have written a  job that creates a different stream for each event and fetches its schema from the confluent schema registry to create a parquet sink for an event.   

This is working fine but the only problem I am facing is whenever a new event start coming or any change in the schema  I have to change in the YAML config and restart the job every time. Is there any way I do not have to restart the job and it starts consuming a new set of events?

 

As I see you are handling schema evolution can u help me with this and also how can I handle the new events.  In the config, I am keeping a mapping of events and schema subjects.  Please share how you solving this. 

 

 

So currently this is the way I am doing it but wanna know some better way to handle it. 

FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
       
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
       
properties);

       
DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

       
try {
       
for (EventConfig eventConfig : eventTypesList) {

       
LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
       
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), registryClient)))
        .
withBucketAssigner(new EventTimeBucketAssigner())
        .
build();

       
DataStream<GenericRecord> outStream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
       
if (genericRecord != null && genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
       
return true;
        }
       
return false;
        });
       
outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

        }
        }
catch (Exception e) {
       
e.printStackTrace();
        }

 

On Wed, Oct 14, 2020, 23:12 Jaffe, Julian <[hidden email]> wrote:

Thanks for the suggestion Piotr!

 

The problem is that the sink needs to have access to the schema (so that it can write the schema only once per file instead of record) and thus needs to know when the schema has been updated. In this proposed architecture, I think the sink would still need to check each record to see if the current schema matches the new record or not? The main problem I encountered when playing around with broadcast state was that I couldn’t figure out how to access the broadcast state within the sink, but perhaps I just haven’t thought about it the right way. I’ll meditate on the docs further  🙂

 

Julian

 

From: Piotr Nowojski <[hidden email]>
Date: Wednesday, October 14, 2020 at 6:35 AM
To: "Jaffe, Julian" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Broadcasting control messages to a sink

 

Hi Julian,

 

Have you seen Broadcast State [1]? I have never used it personally, but it sounds like something you want. Maybe your job should look like:

 

1. read raw messages from Kafka, without using the schema

2. read schema changes and broadcast them to 3. and 5.

3. deserialize kafka records in BroadcastProcessFunction by using combined 1. and 2.

4. do your logic o

5. serialize records using schema in another BroadcastProcessFunction by using combined 4. and 2.

6. write raw records using BucketingSink

?

 

Best,

Piotrek

 

 

śr., 14 paź 2020 o 11:01 Jaffe, Julian <[hidden email]> napisał(a):

Hey all,

 

I’m building a Flink app that pulls in messages from a Kafka topic and writes them out to disk using a custom bucketed sink. Each message needs to be parsed using a schema that is also needed when writing in the sink. This schema is read from a remote file on a distributed file system (it could also be fetched from a service). The schema will be updated very infrequently.

 

In order to support schema evolution, I have created a custom source that occasionally polls for updates and if it finds one parses the new schema and sends a message containing the serialized schema. I’ve connected these two streams and then use a RichCoFlatMapFunction to flatten them back into a single output stream (schema events get used to update the parser, messages get parsed using the parser and emitted).

 

However, I need some way to communicate the updated schema to every task of the sink. Simply emitting a control message that is ignored when writing to disk means that only one sink partition will receive the message and thus update the schema. I thought about sending the control message as side output and then broadcasting the resulting stream to the sink alongside the processed event input but I couldn’t figure out a way to do so. For now, I’m bundling the schema used to parse each event with the event, storing the schema in the sink, and then checking every event’s schema against the stored schema but this is fairly inefficient. Also, I’d like to eventually increase the types of control messages I can send to the sink, some of which may not be idempotent. Is there a better way to handle this pattern?


(Bonus question: ideally, I’d like to be able to perform an action when all sink partitions have picked up the new schema. I’m not aware of any way to emit metadata of this sort from Flink tasks beyond abusing the metrics system. This approach still leaves open the possibility of tasks picking up the new schema and then crashing for unrelated reasons thus inflating the count of tasks using a specific schema and moreover requires tracking at least the current level of parallelism and probably also Flink task state outside of Flink. Are there any patterns for reporting metadata like this to the job manager?)

 

I’m using Flink 1.8.

Reply | Threaded
Open this post in threaded view
|

Re: Broadcasting control messages to a sink

Jaffe, Julian
In reply to this post by Piotr Nowojski-4

Hey Piotr,

 

Thanks for your help! The main thing I was missing was the .broadcast partition operation on a stream (searching for “broadcasting” obviously brought up the broadcast state pattern). This coupled with my misunderstanding of an error in my code as being an error in Flink code resulted in me making this a much harder problem than it needed to be.

 

For anyone who may find this in the future, Piotr’s suggestion is pretty spot-on. I wound up broadcasting (as in the partitioning strategy) my schema stream and connecting it to my event stream. I then processed those using a CoProcessFunction, using the schema messages to update the parsing for the events. I also emitted a side output message when I processed a new schema, using the same type as my main output messages. I once again broadcast-as-in-partitioning the side output stream, unioned it with my processed output from the CoProcessFunction and passed it to my sink, making sure to handle control messages before attempting to do any bucketing.

 

In poor ASCII art, it looks something like the below:

 

 

_______________                       ____________

| Schema Source |                | Event Source |

-----------------------                  -------------------

              |                                         |

       Broadcast                                 |

              |        __________               |

               ----- | Processor | -----------

                      |                  | -----------       Control message side output

                       ---------------               |

                                 |                      |

                                 |               Broadcast

                                 |                      |

                            Union  --------------

                                 |

                          _______

                         |   Sink   |

                          -----------

 

I hope this is helpful to someone.

 

Julian

 

From: Piotr Nowojski <[hidden email]>
Date: Wednesday, October 14, 2020 at 11:22 PM
To: "Jaffe, Julian" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Broadcasting control messages to a sink

 

Hi Julian,

 

I think the problem is that BroadcastProcessFunction and SinkFunction will be executed by separate operators, so they won't be able to share state. If you can not split your logic into two, I think you will have to workaround this problem differently.

 

1. Relay on operator chaining and wire both of them together.

 

If you set up your BroadcastProcessFunction and SinkFunction one after another, with the same parallelism, with the default chaining, without any rebalance/keyBy in between, you can be sure they will be chained together. So the output type of your record between BroadcastProcessFunction and SinkFunction, can be a Union type, of a) your actual payload, b) broadcasted message. Upon initialization/before processing first record, if you have any broadcast state, you would need to forward it's content to the downstream SinkFunction as well.

 

2. Another solution is that maybe you can try to embed SinkFunction inside the BroadcastProcessFunction? This will require some careful proxying and wrapping calls.

3. As always, you can also write a custom operator that will be doing the same thing.

 

For the 2. and 3. I'm not entirely sure if there are some gotchas that I haven't thought through (state handling?), so if you can make 1. work for you, it will probably be a safer route.

 

Best,

Piotrek

 

 

 

 

śr., 14 paź 2020 o 19:42 Jaffe, Julian <[hidden email]> napisał(a):

Thanks for the suggestion Piotr!

 

The problem is that the sink needs to have access to the schema (so that it can write the schema only once per file instead of record) and thus needs to know when the schema has been updated. In this proposed architecture, I think the sink would still need to check each record to see if the current schema matches the new record or not? The main problem I encountered when playing around with broadcast state was that I couldn’t figure out how to access the broadcast state within the sink, but perhaps I just haven’t thought about it the right way. I’ll meditate on the docs further  🙂

 

Julian

 

From: Piotr Nowojski <[hidden email]>
Date: Wednesday, October 14, 2020 at 6:35 AM
To: "Jaffe, Julian" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Broadcasting control messages to a sink

 

Hi Julian,

 

Have you seen Broadcast State [1]? I have never used it personally, but it sounds like something you want. Maybe your job should look like:

 

1. read raw messages from Kafka, without using the schema

2. read schema changes and broadcast them to 3. and 5.

3. deserialize kafka records in BroadcastProcessFunction by using combined 1. and 2.

4. do your logic o

5. serialize records using schema in another BroadcastProcessFunction by using combined 4. and 2.

6. write raw records using BucketingSink

?

 

Best,

Piotrek

 

 

śr., 14 paź 2020 o 11:01 Jaffe, Julian <[hidden email]> napisał(a):

Hey all,

 

I’m building a Flink app that pulls in messages from a Kafka topic and writes them out to disk using a custom bucketed sink. Each message needs to be parsed using a schema that is also needed when writing in the sink. This schema is read from a remote file on a distributed file system (it could also be fetched from a service). The schema will be updated very infrequently.

 

In order to support schema evolution, I have created a custom source that occasionally polls for updates and if it finds one parses the new schema and sends a message containing the serialized schema. I’ve connected these two streams and then use a RichCoFlatMapFunction to flatten them back into a single output stream (schema events get used to update the parser, messages get parsed using the parser and emitted).

 

However, I need some way to communicate the updated schema to every task of the sink. Simply emitting a control message that is ignored when writing to disk means that only one sink partition will receive the message and thus update the schema. I thought about sending the control message as side output and then broadcasting the resulting stream to the sink alongside the processed event input but I couldn’t figure out a way to do so. For now, I’m bundling the schema used to parse each event with the event, storing the schema in the sink, and then checking every event’s schema against the stored schema but this is fairly inefficient. Also, I’d like to eventually increase the types of control messages I can send to the sink, some of which may not be idempotent. Is there a better way to handle this pattern?


(Bonus question: ideally, I’d like to be able to perform an action when all sink partitions have picked up the new schema. I’m not aware of any way to emit metadata of this sort from Flink tasks beyond abusing the metrics system. This approach still leaves open the possibility of tasks picking up the new schema and then crashing for unrelated reasons thus inflating the count of tasks using a specific schema and moreover requires tracking at least the current level of parallelism and probably also Flink task state outside of Flink. Are there any patterns for reporting metadata like this to the job manager?)

 

I’m using Flink 1.8.

Reply | Threaded
Open this post in threaded view
|

Re: Broadcasting control messages to a sink

Piotr Nowojski-4
Hi Julian,

Glad to hear it worked! And thanks for coming back to us :)

Best,
Piotrek

sob., 17 paź 2020 o 04:22 Jaffe, Julian <[hidden email]> napisał(a):

Hey Piotr,

 

Thanks for your help! The main thing I was missing was the .broadcast partition operation on a stream (searching for “broadcasting” obviously brought up the broadcast state pattern). This coupled with my misunderstanding of an error in my code as being an error in Flink code resulted in me making this a much harder problem than it needed to be.

 

For anyone who may find this in the future, Piotr’s suggestion is pretty spot-on. I wound up broadcasting (as in the partitioning strategy) my schema stream and connecting it to my event stream. I then processed those using a CoProcessFunction, using the schema messages to update the parsing for the events. I also emitted a side output message when I processed a new schema, using the same type as my main output messages. I once again broadcast-as-in-partitioning the side output stream, unioned it with my processed output from the CoProcessFunction and passed it to my sink, making sure to handle control messages before attempting to do any bucketing.

 

In poor ASCII art, it looks something like the below:

 

 

_______________                       ____________

| Schema Source |                | Event Source |

-----------------------                  -------------------

              |                                         |

       Broadcast                                 |

              |        __________               |

               ----- | Processor | -----------

                      |                  | -----------       Control message side output

                       ---------------               |

                                 |                      |

                                 |               Broadcast

                                 |                      |

                            Union  --------------

                                 |

                          _______

                         |   Sink   |

                          -----------

 

I hope this is helpful to someone.

 

Julian

 

From: Piotr Nowojski <[hidden email]>
Date: Wednesday, October 14, 2020 at 11:22 PM
To: "Jaffe, Julian" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Broadcasting control messages to a sink

 

Hi Julian,

 

I think the problem is that BroadcastProcessFunction and SinkFunction will be executed by separate operators, so they won't be able to share state. If you can not split your logic into two, I think you will have to workaround this problem differently.

 

1. Relay on operator chaining and wire both of them together.

 

If you set up your BroadcastProcessFunction and SinkFunction one after another, with the same parallelism, with the default chaining, without any rebalance/keyBy in between, you can be sure they will be chained together. So the output type of your record between BroadcastProcessFunction and SinkFunction, can be a Union type, of a) your actual payload, b) broadcasted message. Upon initialization/before processing first record, if you have any broadcast state, you would need to forward it's content to the downstream SinkFunction as well.

 

2. Another solution is that maybe you can try to embed SinkFunction inside the BroadcastProcessFunction? This will require some careful proxying and wrapping calls.

3. As always, you can also write a custom operator that will be doing the same thing.

 

For the 2. and 3. I'm not entirely sure if there are some gotchas that I haven't thought through (state handling?), so if you can make 1. work for you, it will probably be a safer route.

 

Best,

Piotrek

 

 

 

 

śr., 14 paź 2020 o 19:42 Jaffe, Julian <[hidden email]> napisał(a):

Thanks for the suggestion Piotr!

 

The problem is that the sink needs to have access to the schema (so that it can write the schema only once per file instead of record) and thus needs to know when the schema has been updated. In this proposed architecture, I think the sink would still need to check each record to see if the current schema matches the new record or not? The main problem I encountered when playing around with broadcast state was that I couldn’t figure out how to access the broadcast state within the sink, but perhaps I just haven’t thought about it the right way. I’ll meditate on the docs further  🙂

 

Julian

 

From: Piotr Nowojski <[hidden email]>
Date: Wednesday, October 14, 2020 at 6:35 AM
To: "Jaffe, Julian" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Broadcasting control messages to a sink

 

Hi Julian,

 

Have you seen Broadcast State [1]? I have never used it personally, but it sounds like something you want. Maybe your job should look like:

 

1. read raw messages from Kafka, without using the schema

2. read schema changes and broadcast them to 3. and 5.

3. deserialize kafka records in BroadcastProcessFunction by using combined 1. and 2.

4. do your logic o

5. serialize records using schema in another BroadcastProcessFunction by using combined 4. and 2.

6. write raw records using BucketingSink

?

 

Best,

Piotrek

 

 

śr., 14 paź 2020 o 11:01 Jaffe, Julian <[hidden email]> napisał(a):

Hey all,

 

I’m building a Flink app that pulls in messages from a Kafka topic and writes them out to disk using a custom bucketed sink. Each message needs to be parsed using a schema that is also needed when writing in the sink. This schema is read from a remote file on a distributed file system (it could also be fetched from a service). The schema will be updated very infrequently.

 

In order to support schema evolution, I have created a custom source that occasionally polls for updates and if it finds one parses the new schema and sends a message containing the serialized schema. I’ve connected these two streams and then use a RichCoFlatMapFunction to flatten them back into a single output stream (schema events get used to update the parser, messages get parsed using the parser and emitted).

 

However, I need some way to communicate the updated schema to every task of the sink. Simply emitting a control message that is ignored when writing to disk means that only one sink partition will receive the message and thus update the schema. I thought about sending the control message as side output and then broadcasting the resulting stream to the sink alongside the processed event input but I couldn’t figure out a way to do so. For now, I’m bundling the schema used to parse each event with the event, storing the schema in the sink, and then checking every event’s schema against the stored schema but this is fairly inefficient. Also, I’d like to eventually increase the types of control messages I can send to the sink, some of which may not be idempotent. Is there a better way to handle this pattern?


(Bonus question: ideally, I’d like to be able to perform an action when all sink partitions have picked up the new schema. I’m not aware of any way to emit metadata of this sort from Flink tasks beyond abusing the metrics system. This approach still leaves open the possibility of tasks picking up the new schema and then crashing for unrelated reasons thus inflating the count of tasks using a specific schema and moreover requires tracking at least the current level of parallelism and probably also Flink task state outside of Flink. Are there any patterns for reporting metadata like this to the job manager?)

 

I’m using Flink 1.8.

Reply | Threaded
Open this post in threaded view
|

Re: Broadcasting control messages to a sink

anuj.aj07
Hi Jafee, 

Can u please help me out with the sample code how you have written the custom sink and how you using this broadcast pattern to update schema at run time. It will help me.

On Sat, Oct 17, 2020 at 1:55 PM Piotr Nowojski <[hidden email]> wrote:
Hi Julian,

Glad to hear it worked! And thanks for coming back to us :)

Best,
Piotrek

sob., 17 paź 2020 o 04:22 Jaffe, Julian <[hidden email]> napisał(a):

Hey Piotr,

 

Thanks for your help! The main thing I was missing was the .broadcast partition operation on a stream (searching for “broadcasting” obviously brought up the broadcast state pattern). This coupled with my misunderstanding of an error in my code as being an error in Flink code resulted in me making this a much harder problem than it needed to be.

 

For anyone who may find this in the future, Piotr’s suggestion is pretty spot-on. I wound up broadcasting (as in the partitioning strategy) my schema stream and connecting it to my event stream. I then processed those using a CoProcessFunction, using the schema messages to update the parsing for the events. I also emitted a side output message when I processed a new schema, using the same type as my main output messages. I once again broadcast-as-in-partitioning the side output stream, unioned it with my processed output from the CoProcessFunction and passed it to my sink, making sure to handle control messages before attempting to do any bucketing.

 

In poor ASCII art, it looks something like the below:

 

 

_______________                       ____________

| Schema Source |                | Event Source |

-----------------------                  -------------------

              |                                         |

       Broadcast                                 |

              |        __________               |

               ----- | Processor | -----------

                      |                  | -----------       Control message side output

                       ---------------               |

                                 |                      |

                                 |               Broadcast

                                 |                      |

                            Union  --------------

                                 |

                          _______

                         |   Sink   |

                          -----------

 

I hope this is helpful to someone.

 

Julian

 

From: Piotr Nowojski <[hidden email]>
Date: Wednesday, October 14, 2020 at 11:22 PM
To: "Jaffe, Julian" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Broadcasting control messages to a sink

 

Hi Julian,

 

I think the problem is that BroadcastProcessFunction and SinkFunction will be executed by separate operators, so they won't be able to share state. If you can not split your logic into two, I think you will have to workaround this problem differently.

 

1. Relay on operator chaining and wire both of them together.

 

If you set up your BroadcastProcessFunction and SinkFunction one after another, with the same parallelism, with the default chaining, without any rebalance/keyBy in between, you can be sure they will be chained together. So the output type of your record between BroadcastProcessFunction and SinkFunction, can be a Union type, of a) your actual payload, b) broadcasted message. Upon initialization/before processing first record, if you have any broadcast state, you would need to forward it's content to the downstream SinkFunction as well.

 

2. Another solution is that maybe you can try to embed SinkFunction inside the BroadcastProcessFunction? This will require some careful proxying and wrapping calls.

3. As always, you can also write a custom operator that will be doing the same thing.

 

For the 2. and 3. I'm not entirely sure if there are some gotchas that I haven't thought through (state handling?), so if you can make 1. work for you, it will probably be a safer route.

 

Best,

Piotrek

 

 

 

 

śr., 14 paź 2020 o 19:42 Jaffe, Julian <[hidden email]> napisał(a):

Thanks for the suggestion Piotr!

 

The problem is that the sink needs to have access to the schema (so that it can write the schema only once per file instead of record) and thus needs to know when the schema has been updated. In this proposed architecture, I think the sink would still need to check each record to see if the current schema matches the new record or not? The main problem I encountered when playing around with broadcast state was that I couldn’t figure out how to access the broadcast state within the sink, but perhaps I just haven’t thought about it the right way. I’ll meditate on the docs further  🙂

 

Julian

 

From: Piotr Nowojski <[hidden email]>
Date: Wednesday, October 14, 2020 at 6:35 AM
To: "Jaffe, Julian" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Broadcasting control messages to a sink

 

Hi Julian,

 

Have you seen Broadcast State [1]? I have never used it personally, but it sounds like something you want. Maybe your job should look like:

 

1. read raw messages from Kafka, without using the schema

2. read schema changes and broadcast them to 3. and 5.

3. deserialize kafka records in BroadcastProcessFunction by using combined 1. and 2.

4. do your logic o

5. serialize records using schema in another BroadcastProcessFunction by using combined 4. and 2.

6. write raw records using BucketingSink

?

 

Best,

Piotrek

 

 

śr., 14 paź 2020 o 11:01 Jaffe, Julian <[hidden email]> napisał(a):

Hey all,

 

I’m building a Flink app that pulls in messages from a Kafka topic and writes them out to disk using a custom bucketed sink. Each message needs to be parsed using a schema that is also needed when writing in the sink. This schema is read from a remote file on a distributed file system (it could also be fetched from a service). The schema will be updated very infrequently.

 

In order to support schema evolution, I have created a custom source that occasionally polls for updates and if it finds one parses the new schema and sends a message containing the serialized schema. I’ve connected these two streams and then use a RichCoFlatMapFunction to flatten them back into a single output stream (schema events get used to update the parser, messages get parsed using the parser and emitted).

 

However, I need some way to communicate the updated schema to every task of the sink. Simply emitting a control message that is ignored when writing to disk means that only one sink partition will receive the message and thus update the schema. I thought about sending the control message as side output and then broadcasting the resulting stream to the sink alongside the processed event input but I couldn’t figure out a way to do so. For now, I’m bundling the schema used to parse each event with the event, storing the schema in the sink, and then checking every event’s schema against the stored schema but this is fairly inefficient. Also, I’d like to eventually increase the types of control messages I can send to the sink, some of which may not be idempotent. Is there a better way to handle this pattern?


(Bonus question: ideally, I’d like to be able to perform an action when all sink partitions have picked up the new schema. I’m not aware of any way to emit metadata of this sort from Flink tasks beyond abusing the metrics system. This approach still leaves open the possibility of tasks picking up the new schema and then crashing for unrelated reasons thus inflating the count of tasks using a specific schema and moreover requires tracking at least the current level of parallelism and probably also Flink task state outside of Flink. Are there any patterns for reporting metadata like this to the job manager?)

 

I’m using Flink 1.8.



--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07



Reply | Threaded
Open this post in threaded view
|

Re: Broadcasting control messages to a sink

Jaffe, Julian

Hey AJ,

 

Depending on your control messages and what you’re trying to accomplish you can simplify the application even further by stripping out the second broadcast and letting operator chaining guarantee that control messages flow appropriately.  This results in

 

_______________                       ____________

| Schema Source |                | Event Source |

-----------------------                  -------------------

              |                                         |

       Broadcast                                 |

              |        __________               |

               ----- | Processor | -----------

                      |                  |

                       --------------- 

                                 |

                          _______

                         |   Sink   |

                          -----------

 

Whatever your final topology ends up being, your sink needs to know about the updated schema. If you aren’t using a schema registry, this means you need some way to distinguish between events and control messages in the invoke method of your sink, for example:


@Override

public void invoke(ValueType value, Context ctx) throws Exception {

    if (value.isControlMessage()) {

        updateSchema(value);

        return;

    }

    writeValue(value)

}

 

 

I don’t know what types you’re using etc. and how you’re storing output, so I can’t be much more concrete than this, but if it helps, here’s a simplified version of what I’m doing (I’m reading in avro events and writing them out in bucketed files):

 

 

@Override

public void invoke(GenericRecord value, Context ctx) throws Exception {

    if (value != null && value.getSchema().equals(CONTROL_SCHEMA)) {

        /*

          * My schema update control messages don’t actually contain a new schema. This function

          * fetches the schema from a remote location, performs various checks and updates various

          * systems, and then sets the working schema to the new schema.

          */

        updateSchema(value);

        return;

    }

 

    // … determining the correct bucket for the provided value and initializing the bucketer if necessary …

 

    if (!bucketState.schema.equals(this.schema)) {

        bucketState.refreshSchema(this.schema, this.schemaTimestamp);

        openNewPartFile(bucketPath, bucketState);

    } else if (shouldRoll(bucketState, currentProcessingTime)) {

        openNewPartFile(bucketPath, bucketState);

    }

 

    bucketState.fileWriter.append(value);

}

 

 

 

I’ve tried to use the method names that match the BucketingSink interface as much as possible to aid comprehension. I’ve also stripped out a lot of internal logic, instrumentation, sanity checks, etc. The main idea is that

 

  1. Your sink needs to keep track of your schema and update it in response to control messages (in the simplest case, the control messages can just contain the new schema)
  2. You need some way to refresh the schema used by your actual writers, which may involve closing current outputs and creating new ones
  3. Your schema needs to be serializable so that it can be snapshotted in checkpoints and restored, or you need some way to determine the correct schema in the open() or initializeState() methods

 

I hope this helps!

 

 

 

For anyone reading this who might be using Avro GenericRecords, note that Flink doesn’t serialize GenericRecords efficiently. You’ll want to write a custom serializer on the pattern of http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Serialization-performance-td12019.html, the eBay GenericAvroSerializer for Spark, or the many other GenericRecord serializers out there that replace the full schema with the schema’s fingerprint. If your schema is very large, you’ll want to go further and work on schema hashcodes instead of fingerprints (with appropriate safeguards!) because fingerprinting and checking RecordSchemas for equality quickly becomes expensive. In the sample code above, I include the check `(!bucketState.schema.equals(this.schema))`. Make sure that you’re actually comparing schema fingerprints or the like instead of directly calling schema.equals(otherSchema).

 

 

Julian

 

 

From: aj <[hidden email]>
Date: Friday, December 4, 2020 at 7:20 AM
To: Piotr Nowojski <[hidden email]>
Cc: "Jaffe, Julian" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Broadcasting control messages to a sink

 

Hi Jafee, 

 

Can u please help me out with the sample code how you have written the custom sink and how you using this broadcast pattern to update schema at run time. It will help me.

 

On Sat, Oct 17, 2020 at 1:55 PM Piotr Nowojski <[hidden email]> wrote:

Hi Julian,

 

Glad to hear it worked! And thanks for coming back to us :)

 

Best,
Piotrek

 

sob., 17 paź 2020 o 04:22 Jaffe, Julian <[hidden email]> napisał(a):

Hey Piotr,

 

Thanks for your help! The main thing I was missing was the .broadcast partition operation on a stream (searching for “broadcasting” obviously brought up the broadcast state pattern). This coupled with my misunderstanding of an error in my code as being an error in Flink code resulted in me making this a much harder problem than it needed to be.

 

For anyone who may find this in the future, Piotr’s suggestion is pretty spot-on. I wound up broadcasting (as in the partitioning strategy) my schema stream and connecting it to my event stream. I then processed those using a CoProcessFunction, using the schema messages to update the parsing for the events. I also emitted a side output message when I processed a new schema, using the same type as my main output messages. I once again broadcast-as-in-partitioning the side output stream, unioned it with my processed output from the CoProcessFunction and passed it to my sink, making sure to handle control messages before attempting to do any bucketing.

 

In poor ASCII art, it looks something like the below:

 

 

_______________                       ____________

| Schema Source |                | Event Source |

-----------------------                  -------------------

              |                                         |

       Broadcast                                 |

              |        __________               |

               ----- | Processor | -----------

                      |                  | -----------       ß Control message side output

                       ---------------               |

                                 |                      |

                                 |               Broadcast

                                 |                      |

                            Union  --------------

                                 |

                          _______

                         |   Sink   |

                          -----------

 

I hope this is helpful to someone.

 

Julian

 

From: Piotr Nowojski <[hidden email]>
Date: Wednesday, October 14, 2020 at 11:22 PM
To: "Jaffe, Julian" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Broadcasting control messages to a sink

 

Hi Julian,

 

I think the problem is that BroadcastProcessFunction and SinkFunction will be executed by separate operators, so they won't be able to share state. If you can not split your logic into two, I think you will have to workaround this problem differently.

 

1. Relay on operator chaining and wire both of them together.

 

If you set up your BroadcastProcessFunction and SinkFunction one after another, with the same parallelism, with the default chaining, without any rebalance/keyBy in between, you can be sure they will be chained together. So the output type of your record between BroadcastProcessFunction and SinkFunction, can be a Union type, of a) your actual payload, b) broadcasted message. Upon initialization/before processing first record, if you have any broadcast state, you would need to forward it's content to the downstream SinkFunction as well.

 

2. Another solution is that maybe you can try to embed SinkFunction inside the BroadcastProcessFunction? This will require some careful proxying and wrapping calls.

3. As always, you can also write a custom operator that will be doing the same thing.

 

For the 2. and 3. I'm not entirely sure if there are some gotchas that I haven't thought through (state handling?), so if you can make 1. work for you, it will probably be a safer route.

 

Best,

Piotrek

 

 

 

 

śr., 14 paź 2020 o 19:42 Jaffe, Julian <[hidden email]> napisał(a):

Thanks for the suggestion Piotr!

 

The problem is that the sink needs to have access to the schema (so that it can write the schema only once per file instead of record) and thus needs to know when the schema has been updated. In this proposed architecture, I think the sink would still need to check each record to see if the current schema matches the new record or not? The main problem I encountered when playing around with broadcast state was that I couldn’t figure out how to access the broadcast state within the sink, but perhaps I just haven’t thought about it the right way. I’ll meditate on the docs further  🙂

 

Julian

 

From: Piotr Nowojski <[hidden email]>
Date: Wednesday, October 14, 2020 at 6:35 AM
To: "Jaffe, Julian" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Broadcasting control messages to a sink

 

Hi Julian,

 

Have you seen Broadcast State [1]? I have never used it personally, but it sounds like something you want. Maybe your job should look like:

 

1. read raw messages from Kafka, without using the schema

2. read schema changes and broadcast them to 3. and 5.

3. deserialize kafka records in BroadcastProcessFunction by using combined 1. and 2.

4. do your logic o

5. serialize records using schema in another BroadcastProcessFunction by using combined 4. and 2.

6. write raw records using BucketingSink

?

 

Best,

Piotrek

 

 

śr., 14 paź 2020 o 11:01 Jaffe, Julian <[hidden email]> napisał(a):

Hey all,

 

I’m building a Flink app that pulls in messages from a Kafka topic and writes them out to disk using a custom bucketed sink. Each message needs to be parsed using a schema that is also needed when writing in the sink. This schema is read from a remote file on a distributed file system (it could also be fetched from a service). The schema will be updated very infrequently.

 

In order to support schema evolution, I have created a custom source that occasionally polls for updates and if it finds one parses the new schema and sends a message containing the serialized schema. I’ve connected these two streams and then use a RichCoFlatMapFunction to flatten them back into a single output stream (schema events get used to update the parser, messages get parsed using the parser and emitted).

 

However, I need some way to communicate the updated schema to every task of the sink. Simply emitting a control message that is ignored when writing to disk means that only one sink partition will receive the message and thus update the schema. I thought about sending the control message as side output and then broadcasting the resulting stream to the sink alongside the processed event input but I couldn’t figure out a way to do so. For now, I’m bundling the schema used to parse each event with the event, storing the schema in the sink, and then checking every event’s schema against the stored schema but this is fairly inefficient. Also, I’d like to eventually increase the types of control messages I can send to the sink, some of which may not be idempotent. Is there a better way to handle this pattern?


(Bonus question: ideally, I’d like to be able to perform an action when all sink partitions have picked up the new schema. I’m not aware of any way to emit metadata of this sort from Flink tasks beyond abusing the metrics system. This approach still leaves open the possibility of tasks picking up the new schema and then crashing for unrelated reasons thus inflating the count of tasks using a specific schema and moreover requires tracking at least the current level of parallelism and probably also Flink task state outside of Flink. Are there any patterns for reporting metadata like this to the job manager?)

 

I’m using Flink 1.8.


 

--

Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07