Hello All, I am receiving a set of events in Avro format on different topics. I want to consume these and write to s3 in parquet format. I have written a below job that creates a different stream for each event and fetches it schema from the confluent schema registry to create a parquet sink for an event. This is working fine but the only problem I am facing is whenever a new event start coming I have to change in the YAML config and restart the job every time. Is there any way I do not have to restart the job and it start consuming a new set of events. YAML config : !com.bounce.config.EventTopologyConfig Sink code : YamlReader reader = new YamlReader(topologyConfig); |
please help with this. Any suggestions. On Sat, Jun 6, 2020 at 12:20 PM aj <[hidden email]> wrote:
|
I am stuck on this . Please give some suggestions. On Tue, Jun 9, 2020, 21:40 aj <[hidden email]> wrote:
|
Hi Anuj, There is currently no way to dynamically change the topology. It would be good to know why your current approach is not working (restart taking too long? Too frequent changes?) So some ideas: - Have some kind submitter that restarts flink automatically on config change (assumes that restart time is not the issue). - Your job could periodically check for a change and then fail. However, you need to fail in a way that the topology is rebuilt. I guess it's close to a fatal error and then the driver handles that error and restart. - Rewrite the job in such a way that you have only one sink in your job graph which demultiplexes the event to several internal sinks. Then you could simply add a new sink whenever a new event occurs. The first option is the easiest and the last option the most versatile (could even have different sink types mixed). On Tue, Jun 23, 2020 at 5:34 AM aj <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Hi Arvid, Would it be possible to implement a BucketAssigner that for example loads the configuration periodically from an external source and according to the event type decides on a different sub-folder? Thanks, Rafi On Wed, Jun 24, 2020 at 10:53 PM Arvid Heise <[hidden email]> wrote:
|
Thanks, Arvide for detailed answers. - Have some kind submitter that restarts flink automatically on config change (assumes that restart time is not the issue). Yes, that can be written but that not solve the problem completely because I want to avoid job restart itself. Every time I restart I also have to restart from the last checkpoint in S3. - Your job could periodically check for a change and then fail. However, you need to fail in a way that the topology is rebuilt. I guess it's close to a fatal error and then the driver handles that error and restart. Again same reason as first also not sure how would it be implemented. Rewrite the job in such a way that you have only one sink in your job graph which demultiplexes the event to several internal sinks. Then you could simply add a new sink whenever a new event occurs. -Please provide me some idea I want to implement this, how it can be possible. [hidden email] jJuujIstdfdf I am just thinking can I use a broadcast state where this config rule which I keeping in YAML can be a push in Kafka itself. Because I just need event name and Avro schema subject mapping mainly. Please correct me if I am thinking in the wrong direction. On Thu, Jun 25, 2020 at 2:18 PM Rafi Aroch <[hidden email]> wrote:
|
Hi Anuj, Yes, broadcast sounds really good. Now you just need to hide the structural invariance (variable number of sinks) by delegating to inner sinks. public class SplittingStreamingFileSink<IN> Map<String, StreamingFileSink> sinks = ...; // event name to sink // delegate each method of RichSinkFunction, CheckpointedFunction, CheckpointListener to sinks public void invoke(IN value, SinkFunction.Context context) throws Exception { if (sink == null) { // create new StreamingFileSink add to sinks and call initializeState } sink.invoke(value, context); } } Use the SplittingStreamingFileSink as the only sink in your workflow. You definitely need to wrap the SinkFunction.Context such that state gets prefixes (eventName), but it should be rather straightforward. On Thu, Jun 25, 2020 at 3:47 PM aj <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Thanks, Arvid. I will try to implement using the broadcast approach. On Fri, Jun 26, 2020 at 1:10 AM Arvid Heise <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |