doing demultiplexing using Apache flink

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

doing demultiplexing using Apache flink

dhurandar S
Hi , 

We have a use case where we have to demultiplex the incoming stream to multiple output streams.

We read from 1 Kafka topic and as an output we generate multiple Kafka topics. The logic of generating each new Kafka topic is different and not known beforehand. Users of the system keep adding new logic and henceforth the system needs to generate the data in the new topic with logic applied to the incoming stream.

 Input to the system would be logic code or SQL statement and destination topic or S3 location. The system should be able to read this configuration and emit those, hopefully at runtime.

Any guidance if this is possible in flink . and some pointers how this can be achieved.

regards,
Dhurandar
Reply | Threaded
Open this post in threaded view
|

doing demultiplexing using Apache flink

dhurandar S


Hi , 

We have a use case where we have to demultiplex the incoming stream to multiple output streams.

We read from 1 Kafka topic and as an output we generate multiple Kafka topics. The logic of generating each new Kafka topic is different and not known beforehand. Users of the system keep adding new logic and henceforth the system needs to generate the data in the new topic with logic applied to the incoming stream.

 Input to the system would be logic code or SQL statement and destination topic or S3 location. The system should be able to read this configuration and emit those, hopefully at runtime.

Any guidance if this is possible in flink . and some pointers how this can be achieved.

regards,
Dhurandar
Reply | Threaded
Open this post in threaded view
|

doing demultiplexing using Apache flink

dhurandar S
Hi , 

We have a use case where we have to demultiplex the incoming stream to multiple output streams.

We read from 1 Kafka topic and as an output we generate multiple Kafka topics. The logic of generating each new Kafka topic is different and not known beforehand. Users of the system keep adding new logic and henceforth the system needs to generate the data in the new topic with logic applied to the incoming stream.

 Input to the system would be logic code or SQL statement and destination topic or S3 location. The system should be able to read this configuration and emit those, hopefully at runtime.

Any guidance if this is possible in flink . and some pointers how this can be achieved.

regards,
Dhurandar
Reply | Threaded
Open this post in threaded view
|

Re: doing demultiplexing using Apache flink

Alexander Fedulov
In reply to this post by dhurandar S
Hi Dhurandar,

it is not supported out of the box, however, I think it is possible by doing the following:
1) Create a wrapper type, containing the original message and a topic destination where it is supposed to be sent. You can enrich the messages with it in accordance to the configuration you've mentioned.
2) Extend `KeyedSerializationSchema` and make its `getTargetTopic` return the desired topic
3) Initialize `FlinkKafkaProducer011` with this custom `KeyedSerializationSchema`
Please mind that `KeyedSerializationSchema` and is marked as deprecated and is supposed to be substituted by the new `KafkaSerializationSchema`, which would require a slight modification, but, from what I can tell, it will still be possible to achieve such dynamic events dispatching.

Best regards,
Alexander Fedulov
Reply | Threaded
Open this post in threaded view
|

Re: doing demultiplexing using Apache flink

dhurandar S
Thank you Alexander for the response. This is very helpful.
Can i apply the same pattern to S3 as well , as in read from Kafka or Kinesis and write multiple files in S3 or multiple topics in Kinesis ?

regards,
Rahul

On Wed, Apr 29, 2020 at 2:32 PM Alexander Fedulov <[hidden email]> wrote:
Hi Dhurandar,

it is not supported out of the box, however, I think it is possible by doing the following:
1) Create a wrapper type, containing the original message and a topic destination where it is supposed to be sent. You can enrich the messages with it in accordance to the configuration you've mentioned.
2) Extend `KeyedSerializationSchema` and make its `getTargetTopic` return the desired topic
3) Initialize `FlinkKafkaProducer011` with this custom `KeyedSerializationSchema`
Please mind that `KeyedSerializationSchema` and is marked as deprecated and is supposed to be substituted by the new `KafkaSerializationSchema`, which would require a slight modification, but, from what I can tell, it will still be possible to achieve such dynamic events dispatching.

Best regards,
Alexander Fedulov
Reply | Threaded
Open this post in threaded view
|

Re: doing demultiplexing using Apache flink

Alexander Fedulov
This too, should be possible.
Flink uses `StreamingFileSink` to transfer data to S3 [1]. You can pass it your custom bucket assigner [2]:
public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
which, similarly to the `KeyedSerializationSchema`, returns a destination for each input element:
BucketID getBucketId(IN element, BucketAssigner.Context context);

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng




On Thu, Apr 30, 2020 at 5:21 AM dhurandar S <[hidden email]> wrote:
Thank you Alexander for the response. This is very helpful.
Can i apply the same pattern to S3 as well , as in read from Kafka or Kinesis and write multiple files in S3 or multiple topics in Kinesis ?

regards,
Rahul

On Wed, Apr 29, 2020 at 2:32 PM Alexander Fedulov <[hidden email]> wrote:
Hi Dhurandar,

it is not supported out of the box, however, I think it is possible by doing the following:
1) Create a wrapper type, containing the original message and a topic destination where it is supposed to be sent. You can enrich the messages with it in accordance to the configuration you've mentioned.
2) Extend `KeyedSerializationSchema` and make its `getTargetTopic` return the desired topic
3) Initialize `FlinkKafkaProducer011` with this custom `KeyedSerializationSchema`
Please mind that `KeyedSerializationSchema` and is marked as deprecated and is supposed to be substituted by the new `KafkaSerializationSchema`, which would require a slight modification, but, from what I can tell, it will still be possible to achieve such dynamic events dispatching.

Best regards,
Alexander Fedulov
Reply | Threaded
Open this post in threaded view
|

Re: doing demultiplexing using Apache flink

dhurandar S
Thank you, Alexander, This is really helpful.

Can the input be Flink SQL? Idea is to provide the capability to take SQL as the input and create new streams on-demand for the given SQL.

So users of the system provide "SQL" in the configuration files and henceforth they can start listening to a topic or start reading data from files. 
Can we load Flink SQL at runtime ??

regards,
Dhurandar

On Thu, Apr 30, 2020 at 2:02 AM Alexander Fedulov <[hidden email]> wrote:
This too, should be possible.
Flink uses `StreamingFileSink` to transfer data to S3 [1]. You can pass it your custom bucket assigner [2]:
public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
which, similarly to the `KeyedSerializationSchema`, returns a destination for each input element:
BucketID getBucketId(IN element, BucketAssigner.Context context);

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng




On Thu, Apr 30, 2020 at 5:21 AM dhurandar S <[hidden email]> wrote:
Thank you Alexander for the response. This is very helpful.
Can i apply the same pattern to S3 as well , as in read from Kafka or Kinesis and write multiple files in S3 or multiple topics in Kinesis ?

regards,
Rahul

On Wed, Apr 29, 2020 at 2:32 PM Alexander Fedulov <[hidden email]> wrote:
Hi Dhurandar,

it is not supported out of the box, however, I think it is possible by doing the following:
1) Create a wrapper type, containing the original message and a topic destination where it is supposed to be sent. You can enrich the messages with it in accordance to the configuration you've mentioned.
2) Extend `KeyedSerializationSchema` and make its `getTargetTopic` return the desired topic
3) Initialize `FlinkKafkaProducer011` with this custom `KeyedSerializationSchema`
Please mind that `KeyedSerializationSchema` and is marked as deprecated and is supposed to be substituted by the new `KafkaSerializationSchema`, which would require a slight modification, but, from what I can tell, it will still be possible to achieve such dynamic events dispatching.

Best regards,
Alexander Fedulov


--
Thank you and regards,
Dhurandar