Hi ,
We have a use case where we have to demultiplex the incoming stream to multiple output streams. We read from 1 Kafka topic and as an output we generate multiple Kafka topics. The logic of generating each new Kafka topic is different and not known beforehand. Users of the system keep adding new logic and henceforth the system needs to generate the data in the new topic with logic applied to the incoming stream. Input to the system would be logic code or SQL statement and destination topic or S3 location. The system should be able to read this configuration and emit those, hopefully at runtime. Any guidance if this is possible in flink . and some pointers how this can be achieved. regards, Dhurandar |
Hi ,
We have a use case where we have to demultiplex the incoming stream to multiple output streams. We read from 1 Kafka topic and as an output we generate multiple Kafka topics. The logic of generating each new Kafka topic is different and not known beforehand. Users of the system keep adding new logic and henceforth the system needs to generate the data in the new topic with logic applied to the incoming stream. Input to the system would be logic code or SQL statement and destination topic or S3 location. The system should be able to read this configuration and emit those, hopefully at runtime. Any guidance if this is possible in flink . and some pointers how this can be achieved. regards, Dhurandar |
|
In reply to this post by dhurandar S
Hi Dhurandar, it is not supported out of the box, however, I think it is possible by doing the following: 1) Create a wrapper type, containing the original message and a topic destination where it is supposed to be sent. You can enrich the messages with it in accordance to the configuration you've mentioned. 2) Extend `KeyedSerializationSchema` and make its `getTargetTopic` return the desired topic 3) Initialize `FlinkKafkaProducer011` with this custom `KeyedSerializationSchema` Please mind that `KeyedSerializationSchema` and is marked as deprecated and is supposed to be substituted by the new `KafkaSerializationSchema`, which would require a slight modification, but, from what I can tell, it will still be possible to achieve such dynamic events dispatching. Best regards, Alexander Fedulov |
Thank you Alexander for the response. This is very helpful. Can i apply the same pattern to S3 as well , as in read from Kafka or Kinesis and write multiple files in S3 or multiple topics in Kinesis ? regards, Rahul On Wed, Apr 29, 2020 at 2:32 PM Alexander Fedulov <[hidden email]> wrote:
|
This too, should be possible. Flink uses `StreamingFileSink` to transfer data to S3 [1]. You can pass it your custom bucket assigner [2]: public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) { which, similarly to the `KeyedSerializationSchema`, returns a destination for each input element: BucketID getBucketId(IN element, BucketAssigner.Context context); -- Alexander Fedulov | Solutions Architect +49 1514 6265796 Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH On Thu, Apr 30, 2020 at 5:21 AM dhurandar S <[hidden email]> wrote:
|
Thank you, Alexander, This is really helpful. Can the input be Flink SQL? Idea is to provide the capability to take SQL as the input and create new streams on-demand for the given SQL. So users of the system provide "SQL" in the configuration files and henceforth they can start listening to a topic or start reading data from files. Can we load Flink SQL at runtime ?? regards, Dhurandar On Thu, Apr 30, 2020 at 2:02 AM Alexander Fedulov <[hidden email]> wrote:
Thank you and regards, Dhurandar |
Free forum by Nabble | Edit this page |