Multiple Sinks for a Single Soure

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Multiple Sinks for a Single Soure

Prasanna kumar
Hi,

There is a single source of events for me in my system. 

I need to process and send the events to multiple destination/sink at the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]

I am able send to one sink.

By adding more sink stream to the source stream could we achieve it . Are there any shortcomings.  

Please let me know if any one here has successfully implemented one .

Thanks,
Prasanna.
Reply | Threaded
Open this post in threaded view
|

Re: Multiple Sinks for a Single Soure

Piotr Nowojski-3
Hi,

To the best of my knowledge the following pattern should work just fine:

DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;
myStream.addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

All of the records from `myStream` would be passed to each of the sinks.

Piotrek

> On 24 May 2020, at 19:34, Prasanna kumar <[hidden email]> wrote:
>
> Hi,
>
> There is a single source of events for me in my system.
>
> I need to process and send the events to multiple destination/sink at the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>
> I am able send to one sink.
>
> By adding more sink stream to the source stream could we achieve it . Are there any shortcomings.  
>
> Please let me know if any one here has successfully implemented one .
>
> Thanks,
> Prasanna.

Reply | Threaded
Open this post in threaded view
|

Re: Multiple Sinks for a Single Soure

Prasanna kumar
Piotr, 

Thanks for the reply. 

There is one other case, where some events have to be written to multiple sinks and while other have to be written to just one sink. 

How could i have a common codeflow/DAG for the same ?

I do not want multiple jobs to do the same want to accomplish in a single job .

Could i add Stream code "myStream.addSink(sink1)" under a conditional operator such as 'if' to determine . 

But i suppose here the stream works differently compared to normal code processing.

Prasanna.


On Mon 25 May, 2020, 23:37 Piotr Nowojski, <[hidden email]> wrote:
Hi,

To the best of my knowledge the following pattern should work just fine:

DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;
myStream.addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

All of the records from `myStream` would be passed to each of the sinks.

Piotrek

> On 24 May 2020, at 19:34, Prasanna kumar <[hidden email]> wrote:
>
> Hi,
>
> There is a single source of events for me in my system.
>
> I need to process and send the events to multiple destination/sink at the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>
> I am able send to one sink.
>
> By adding more sink stream to the source stream could we achieve it . Are there any shortcomings. 
>
> Please let me know if any one here has successfully implemented one .
>
> Thanks,
> Prasanna.

Reply | Threaded
Open this post in threaded view
|

Re: Multiple Sinks for a Single Soure

Piotr Nowojski-3
Hi,

You could easily filter/map/process the streams differently before writing them to the sinks. Building on top of my previous example, this also should work fine:


DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;

myStream.baz().addSink(sink1);
myStream.addSink(sink2);
myStream.qux().quuz().corge().addSink(sink3);
 
Where foo/bar/baz/quz/quuz/corge are any stream processing functions that you wish. `foo` and `bar` would be applied once to the stream, before it’s going to be split to different sinks, while `baz`, `qux`, `quuz` and `corge` would be applied to only of the sinks AFTER splitting.

In your case, it could be:

myStream.filter(...).addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

So sink2 and sink3 would get all of the records, while sink1 only a portion of them.

Piotrek 


On 26 May 2020, at 06:45, Prasanna kumar <[hidden email]> wrote:

Piotr, 

Thanks for the reply. 

There is one other case, where some events have to be written to multiple sinks and while other have to be written to just one sink. 

How could i have a common codeflow/DAG for the same ?

I do not want multiple jobs to do the same want to accomplish in a single job .

Could i add Stream code "myStream.addSink(sink1)" under a conditional operator such as 'if' to determine . 

But i suppose here the stream works differently compared to normal code processing.

Prasanna.


On Mon 25 May, 2020, 23:37 Piotr Nowojski, <[hidden email]> wrote:
Hi,

To the best of my knowledge the following pattern should work just fine:

DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;
myStream.addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

All of the records from `myStream` would be passed to each of the sinks.

Piotrek

> On 24 May 2020, at 19:34, Prasanna kumar <[hidden email]> wrote:
>
> Hi,
>
> There is a single source of events for me in my system.
>
> I need to process and send the events to multiple destination/sink at the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>
> I am able send to one sink.
>
> By adding more sink stream to the source stream could we achieve it . Are there any shortcomings. 
>
> Please let me know if any one here has successfully implemented one .
>
> Thanks,
> Prasanna.


Reply | Threaded
Open this post in threaded view
|

Re: Multiple Sinks for a Single Soure

Prasanna kumar
Thanks Piotr for the Reply. 

I will explain my requirement in detail. 

Table Updates -> Generate Business Events -> Subscribers 

Source Side
There are CDC of 100 tables which the framework needs to listen to. 

Event Table Mapping

There would be Event associated with table in a m:n fashion. 

say there are tables TA, TB, TC. 

EA, EA2 and EA3 are generated from TA (based on conditions)
EB generated from TB (based on conditions)
EC generated from TC (no conditions.)

Say there are events EA,EB,EC generated from the tables TA, TB, TC 

Event Sink Mapping

EA has following sinks. kafka topic SA,SA2,SAC. 
EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
EC has only rest endpoint RC. 

The point is the sink are not predefined. [. But i only see the example online where , flink code having explicit myStream.addSink(sink2);   ]

We expect around 500 types of events in our platform in another 2 years time. 

We are looking at writing a generic job for the same , rather than writing one for new case.

Let me know your thoughts and flink suitability to this requirement.

Thanks
Prasanna.


On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

You could easily filter/map/process the streams differently before writing them to the sinks. Building on top of my previous example, this also should work fine:


DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;

myStream.baz().addSink(sink1);
myStream.addSink(sink2);
myStream.qux().quuz().corge().addSink(sink3);
 
Where foo/bar/baz/quz/quuz/corge are any stream processing functions that you wish. `foo` and `bar` would be applied once to the stream, before it’s going to be split to different sinks, while `baz`, `qux`, `quuz` and `corge` would be applied to only of the sinks AFTER splitting.

In your case, it could be:

myStream.filter(...).addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

So sink2 and sink3 would get all of the records, while sink1 only a portion of them.

Piotrek 


On 26 May 2020, at 06:45, Prasanna kumar <[hidden email]> wrote:

Piotr, 

Thanks for the reply. 

There is one other case, where some events have to be written to multiple sinks and while other have to be written to just one sink. 

How could i have a common codeflow/DAG for the same ?

I do not want multiple jobs to do the same want to accomplish in a single job .

Could i add Stream code "myStream.addSink(sink1)" under a conditional operator such as 'if' to determine . 

But i suppose here the stream works differently compared to normal code processing.

Prasanna.


On Mon 25 May, 2020, 23:37 Piotr Nowojski, <[hidden email]> wrote:
Hi,

To the best of my knowledge the following pattern should work just fine:

DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;
myStream.addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

All of the records from `myStream` would be passed to each of the sinks.

Piotrek

> On 24 May 2020, at 19:34, Prasanna kumar <[hidden email]> wrote:
>
> Hi,
>
> There is a single source of events for me in my system.
>
> I need to process and send the events to multiple destination/sink at the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>
> I am able send to one sink.
>
> By adding more sink stream to the source stream could we achieve it . Are there any shortcomings. 
>
> Please let me know if any one here has successfully implemented one .
>
> Thanks,
> Prasanna.


Reply | Threaded
Open this post in threaded view
|

Re: Multiple Sinks for a Single Soure

Piotr Nowojski-3
Hi,

I’m not sure if I fully understand what do you mean by

> The point is the sink are not predefined.

You must know before submitting the job, what sinks are going to be used in the job. You can have some custom logic, that would filter out records before writing them to the sinks, as I proposed before, or you could use side outputs [1] would be better suited to your use case? 

Piotrek

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

On 26 May 2020, at 12:20, Prasanna kumar <[hidden email]> wrote:

Thanks Piotr for the Reply. 

I will explain my requirement in detail. 

Table Updates -> Generate Business Events -> Subscribers 

Source Side
There are CDC of 100 tables which the framework needs to listen to. 

Event Table Mapping

There would be Event associated with table in a m:n fashion. 

say there are tables TA, TB, TC. 

EA, EA2 and EA3 are generated from TA (based on conditions)
EB generated from TB (based on conditions)
EC generated from TC (no conditions.)

Say there are events EA,EB,EC generated from the tables TA, TB, TC 

Event Sink Mapping

EA has following sinks. kafka topic SA,SA2,SAC. 
EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
EC has only rest endpoint RC. 

The point is the sink are not predefined. [. But i only see the example online where , flink code having explicit myStream.addSink(sink2);   ]

We expect around 500 types of events in our platform in another 2 years time. 

We are looking at writing a generic job for the same , rather than writing one for new case.

Let me know your thoughts and flink suitability to this requirement.

Thanks
Prasanna.


On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

You could easily filter/map/process the streams differently before writing them to the sinks. Building on top of my previous example, this also should work fine:


DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;

myStream.baz().addSink(sink1);
myStream.addSink(sink2);
myStream.qux().quuz().corge().addSink(sink3);
 
Where foo/bar/baz/quz/quuz/corge are any stream processing functions that you wish. `foo` and `bar` would be applied once to the stream, before it’s going to be split to different sinks, while `baz`, `qux`, `quuz` and `corge` would be applied to only of the sinks AFTER splitting.

In your case, it could be:

myStream.filter(...).addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

So sink2 and sink3 would get all of the records, while sink1 only a portion of them.

Piotrek 


On 26 May 2020, at 06:45, Prasanna kumar <[hidden email]> wrote:

Piotr, 

Thanks for the reply. 

There is one other case, where some events have to be written to multiple sinks and while other have to be written to just one sink. 

How could i have a common codeflow/DAG for the same ?

I do not want multiple jobs to do the same want to accomplish in a single job .

Could i add Stream code "myStream.addSink(sink1)" under a conditional operator such as 'if' to determine . 

But i suppose here the stream works differently compared to normal code processing.

Prasanna.


On Mon 25 May, 2020, 23:37 Piotr Nowojski, <[hidden email]> wrote:
Hi,

To the best of my knowledge the following pattern should work just fine:

DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;
myStream.addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

All of the records from `myStream` would be passed to each of the sinks.

Piotrek

> On 24 May 2020, at 19:34, Prasanna kumar <[hidden email]> wrote:
>
> Hi,
>
> There is a single source of events for me in my system.
>
> I need to process and send the events to multiple destination/sink at the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>
> I am able send to one sink.
>
> By adding more sink stream to the source stream could we achieve it . Are there any shortcomings. 
>
> Please let me know if any one here has successfully implemented one .
>
> Thanks,
> Prasanna.



Reply | Threaded
Open this post in threaded view
|

Re: Multiple Sinks for a Single Soure

Prasanna kumar
Piotr, 

There is an event and subscriber registry as JSON file which has the table event mapping and event-subscriber mapping as mentioned below.

Based on the set JSON , we need to job to go through the table updates and create events and for each event there is a way set how to sink them.

The sink streams have to be added based on this JSON. Thats what i mentioned as no predefined sink in code earlier.

You could see that each event has different set of sinks. 

Just checking how much generic could Side-output streams be ?.

Source -> generate events -> (find out sinks dynamically in code ) -> write to the respective sinks. 

{
  " tablename ": "source.table1",
  "events": [
    {
      "operation": "update",
      "eventstobecreated": [
        {
          "eventname": "USERUPDATE",
          "Columnoperation": "and",
          "ColumnChanges": [
            {
              "columnname": "name"
            },
            {
              "columnname": "loginenabled",
              "value": "Y"
            }
          ],
          "Subscribers": [
            {
              "customername": "c1",
              "method": "Kafka",
              "methodparams": {
                "topicname": "USERTOPIC"
              }
            },
            {
              "customername": "c2",
              "method": "S3",
              "methodparams": {
                "folder": "aws://folderC2"
              }}, ]}]
    },
    {
      "operation": "insert",
      "eventstobecreated": [
          "eventname": "USERINSERT",
          "operation": "insert",
          "Subscribers": [
            {
              "teamname": "General",
              "method": "Kafka",
              "methodparams": {
                "topicname": "new_users"
              }
            },
            {
              "teamname": "General",
              "method": "kinesis",
              "methodparams": {
                "URL": "new_users",
                "username": "uname",
                "password":  "pwd"
              }}, ]}]
    },
    {
      "operation": "delete",
      "eventstobecreated": [
        {
          "eventname": "USERDELETE",
          "Subscribers": [
            {
              "customername": "c1",
              "method": "Kafka",
              "methodparams": {
                "topicname": "USERTOPIC"
              }
            },
            {
              "customername": "c4",
              "method": "Kafka",
              "methodparams": {
                "topicname": "deleterecords"
             }}, ]}]
     },
}

Please let me know your thoughts on this.

Thanks,
Prasanna.

On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure if I fully understand what do you mean by

> The point is the sink are not predefined.

You must know before submitting the job, what sinks are going to be used in the job. You can have some custom logic, that would filter out records before writing them to the sinks, as I proposed before, or you could use side outputs [1] would be better suited to your use case? 

Piotrek

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

On 26 May 2020, at 12:20, Prasanna kumar <[hidden email]> wrote:

Thanks Piotr for the Reply. 

I will explain my requirement in detail. 

Table Updates -> Generate Business Events -> Subscribers 

Source Side
There are CDC of 100 tables which the framework needs to listen to. 

Event Table Mapping

There would be Event associated with table in a m:n fashion. 

say there are tables TA, TB, TC. 

EA, EA2 and EA3 are generated from TA (based on conditions)
EB generated from TB (based on conditions)
EC generated from TC (no conditions.)

Say there are events EA,EB,EC generated from the tables TA, TB, TC 

Event Sink Mapping

EA has following sinks. kafka topic SA,SA2,SAC. 
EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
EC has only rest endpoint RC. 

The point is the sink are not predefined. [. But i only see the example online where , flink code having explicit myStream.addSink(sink2);   ]

We expect around 500 types of events in our platform in another 2 years time. 

We are looking at writing a generic job for the same , rather than writing one for new case.

Let me know your thoughts and flink suitability to this requirement.

Thanks
Prasanna.


On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

You could easily filter/map/process the streams differently before writing them to the sinks. Building on top of my previous example, this also should work fine:


DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;

myStream.baz().addSink(sink1);
myStream.addSink(sink2);
myStream.qux().quuz().corge().addSink(sink3);
 
Where foo/bar/baz/quz/quuz/corge are any stream processing functions that you wish. `foo` and `bar` would be applied once to the stream, before it’s going to be split to different sinks, while `baz`, `qux`, `quuz` and `corge` would be applied to only of the sinks AFTER splitting.

In your case, it could be:

myStream.filter(...).addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

So sink2 and sink3 would get all of the records, while sink1 only a portion of them.

Piotrek 


On 26 May 2020, at 06:45, Prasanna kumar <[hidden email]> wrote:

Piotr, 

Thanks for the reply. 

There is one other case, where some events have to be written to multiple sinks and while other have to be written to just one sink. 

How could i have a common codeflow/DAG for the same ?

I do not want multiple jobs to do the same want to accomplish in a single job .

Could i add Stream code "myStream.addSink(sink1)" under a conditional operator such as 'if' to determine . 

But i suppose here the stream works differently compared to normal code processing.

Prasanna.


On Mon 25 May, 2020, 23:37 Piotr Nowojski, <[hidden email]> wrote:
Hi,

To the best of my knowledge the following pattern should work just fine:

DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;
myStream.addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

All of the records from `myStream` would be passed to each of the sinks.

Piotrek

> On 24 May 2020, at 19:34, Prasanna kumar <[hidden email]> wrote:
>
> Hi,
>
> There is a single source of events for me in my system.
>
> I need to process and send the events to multiple destination/sink at the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>
> I am able send to one sink.
>
> By adding more sink stream to the source stream could we achieve it . Are there any shortcomings. 
>
> Please let me know if any one here has successfully implemented one .
>
> Thanks,
> Prasanna.



Reply | Threaded
Open this post in threaded view
|

Re: Multiple Sinks for a Single Soure

Alexander Fedulov
Hi Prasanna,

if the set of all possible sinks is known in advance, side outputs will be generic enough to express your requirements. Side output produces a stream. Create all of the side output tags, associate each of them with one sink, add conditional logic around `ctx.output(outputTag, ... );`  to decide where to dispatch the messages  (see [1]), collect to none or many side outputs, depending on your logic.

--

Alexander Fedulov | Solutions Architect



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time



On Tue, May 26, 2020 at 2:57 PM Prasanna kumar <[hidden email]> wrote:
Piotr, 

There is an event and subscriber registry as JSON file which has the table event mapping and event-subscriber mapping as mentioned below.

Based on the set JSON , we need to job to go through the table updates and create events and for each event there is a way set how to sink them.

The sink streams have to be added based on this JSON. Thats what i mentioned as no predefined sink in code earlier.

You could see that each event has different set of sinks. 

Just checking how much generic could Side-output streams be ?.

Source -> generate events -> (find out sinks dynamically in code ) -> write to the respective sinks. 

{
  " tablename ": "source.table1",
  "events": [
    {
      "operation": "update",
      "eventstobecreated": [
        {
          "eventname": "USERUPDATE",
          "Columnoperation": "and",
          "ColumnChanges": [
            {
              "columnname": "name"
            },
            {
              "columnname": "loginenabled",
              "value": "Y"
            }
          ],
          "Subscribers": [
            {
              "customername": "c1",
              "method": "Kafka",
              "methodparams": {
                "topicname": "USERTOPIC"
              }
            },
            {
              "customername": "c2",
              "method": "S3",
              "methodparams": {
                "folder": "aws://folderC2"
              }}, ]}]
    },
    {
      "operation": "insert",
      "eventstobecreated": [
          "eventname": "USERINSERT",
          "operation": "insert",
          "Subscribers": [
            {
              "teamname": "General",
              "method": "Kafka",
              "methodparams": {
                "topicname": "new_users"
              }
            },
            {
              "teamname": "General",
              "method": "kinesis",
              "methodparams": {
                "URL": "new_users",
                "username": "uname",
                "password":  "pwd"
              }}, ]}]
    },
    {
      "operation": "delete",
      "eventstobecreated": [
        {
          "eventname": "USERDELETE",
          "Subscribers": [
            {
              "customername": "c1",
              "method": "Kafka",
              "methodparams": {
                "topicname": "USERTOPIC"
              }
            },
            {
              "customername": "c4",
              "method": "Kafka",
              "methodparams": {
                "topicname": "deleterecords"
             }}, ]}]
     },
}

Please let me know your thoughts on this.

Thanks,
Prasanna.

On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure if I fully understand what do you mean by

> The point is the sink are not predefined.

You must know before submitting the job, what sinks are going to be used in the job. You can have some custom logic, that would filter out records before writing them to the sinks, as I proposed before, or you could use side outputs [1] would be better suited to your use case? 

Piotrek

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

On 26 May 2020, at 12:20, Prasanna kumar <[hidden email]> wrote:

Thanks Piotr for the Reply. 

I will explain my requirement in detail. 

Table Updates -> Generate Business Events -> Subscribers 

Source Side
There are CDC of 100 tables which the framework needs to listen to. 

Event Table Mapping

There would be Event associated with table in a m:n fashion. 

say there are tables TA, TB, TC. 

EA, EA2 and EA3 are generated from TA (based on conditions)
EB generated from TB (based on conditions)
EC generated from TC (no conditions.)

Say there are events EA,EB,EC generated from the tables TA, TB, TC 

Event Sink Mapping

EA has following sinks. kafka topic SA,SA2,SAC. 
EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
EC has only rest endpoint RC. 

The point is the sink are not predefined. [. But i only see the example online where , flink code having explicit myStream.addSink(sink2);   ]

We expect around 500 types of events in our platform in another 2 years time. 

We are looking at writing a generic job for the same , rather than writing one for new case.

Let me know your thoughts and flink suitability to this requirement.

Thanks
Prasanna.


On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

You could easily filter/map/process the streams differently before writing them to the sinks. Building on top of my previous example, this also should work fine:


DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;

myStream.baz().addSink(sink1);
myStream.addSink(sink2);
myStream.qux().quuz().corge().addSink(sink3);
 
Where foo/bar/baz/quz/quuz/corge are any stream processing functions that you wish. `foo` and `bar` would be applied once to the stream, before it’s going to be split to different sinks, while `baz`, `qux`, `quuz` and `corge` would be applied to only of the sinks AFTER splitting.

In your case, it could be:

myStream.filter(...).addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

So sink2 and sink3 would get all of the records, while sink1 only a portion of them.

Piotrek 


On 26 May 2020, at 06:45, Prasanna kumar <[hidden email]> wrote:

Piotr, 

Thanks for the reply. 

There is one other case, where some events have to be written to multiple sinks and while other have to be written to just one sink. 

How could i have a common codeflow/DAG for the same ?

I do not want multiple jobs to do the same want to accomplish in a single job .

Could i add Stream code "myStream.addSink(sink1)" under a conditional operator such as 'if' to determine . 

But i suppose here the stream works differently compared to normal code processing.

Prasanna.


On Mon 25 May, 2020, 23:37 Piotr Nowojski, <[hidden email]> wrote:
Hi,

To the best of my knowledge the following pattern should work just fine:

DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;
myStream.addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

All of the records from `myStream` would be passed to each of the sinks.

Piotrek

> On 24 May 2020, at 19:34, Prasanna kumar <[hidden email]> wrote:
>
> Hi,
>
> There is a single source of events for me in my system.
>
> I need to process and send the events to multiple destination/sink at the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>
> I am able send to one sink.
>
> By adding more sink stream to the source stream could we achieve it . Are there any shortcomings. 
>
> Please let me know if any one here has successfully implemented one .
>
> Thanks,
> Prasanna.



Reply | Threaded
Open this post in threaded view
|

Re: Multiple Sinks for a Single Soure

Prasanna kumar
Alexander, 

Thanks for the reply. Will implement and come back in case of any questions. 

Prasanna.

On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov <[hidden email]> wrote:
Hi Prasanna,

if the set of all possible sinks is known in advance, side outputs will be generic enough to express your requirements. Side output produces a stream. Create all of the side output tags, associate each of them with one sink, add conditional logic around `ctx.output(outputTag, ... );`  to decide where to dispatch the messages  (see [1]), collect to none or many side outputs, depending on your logic.

--

Alexander Fedulov | Solutions Architect



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time



On Tue, May 26, 2020 at 2:57 PM Prasanna kumar <[hidden email]> wrote:
Piotr, 

There is an event and subscriber registry as JSON file which has the table event mapping and event-subscriber mapping as mentioned below.

Based on the set JSON , we need to job to go through the table updates and create events and for each event there is a way set how to sink them.

The sink streams have to be added based on this JSON. Thats what i mentioned as no predefined sink in code earlier.

You could see that each event has different set of sinks. 

Just checking how much generic could Side-output streams be ?.

Source -> generate events -> (find out sinks dynamically in code ) -> write to the respective sinks. 

{
  " tablename ": "source.table1",
  "events": [
    {
      "operation": "update",
      "eventstobecreated": [
        {
          "eventname": "USERUPDATE",
          "Columnoperation": "and",
          "ColumnChanges": [
            {
              "columnname": "name"
            },
            {
              "columnname": "loginenabled",
              "value": "Y"
            }
          ],
          "Subscribers": [
            {
              "customername": "c1",
              "method": "Kafka",
              "methodparams": {
                "topicname": "USERTOPIC"
              }
            },
            {
              "customername": "c2",
              "method": "S3",
              "methodparams": {
                "folder": "aws://folderC2"
              }}, ]}]
    },
    {
      "operation": "insert",
      "eventstobecreated": [
          "eventname": "USERINSERT",
          "operation": "insert",
          "Subscribers": [
            {
              "teamname": "General",
              "method": "Kafka",
              "methodparams": {
                "topicname": "new_users"
              }
            },
            {
              "teamname": "General",
              "method": "kinesis",
              "methodparams": {
                "URL": "new_users",
                "username": "uname",
                "password":  "pwd"
              }}, ]}]
    },
    {
      "operation": "delete",
      "eventstobecreated": [
        {
          "eventname": "USERDELETE",
          "Subscribers": [
            {
              "customername": "c1",
              "method": "Kafka",
              "methodparams": {
                "topicname": "USERTOPIC"
              }
            },
            {
              "customername": "c4",
              "method": "Kafka",
              "methodparams": {
                "topicname": "deleterecords"
             }}, ]}]
     },
}

Please let me know your thoughts on this.

Thanks,
Prasanna.

On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure if I fully understand what do you mean by

> The point is the sink are not predefined.

You must know before submitting the job, what sinks are going to be used in the job. You can have some custom logic, that would filter out records before writing them to the sinks, as I proposed before, or you could use side outputs [1] would be better suited to your use case? 

Piotrek

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

On 26 May 2020, at 12:20, Prasanna kumar <[hidden email]> wrote:

Thanks Piotr for the Reply. 

I will explain my requirement in detail. 

Table Updates -> Generate Business Events -> Subscribers 

Source Side
There are CDC of 100 tables which the framework needs to listen to. 

Event Table Mapping

There would be Event associated with table in a m:n fashion. 

say there are tables TA, TB, TC. 

EA, EA2 and EA3 are generated from TA (based on conditions)
EB generated from TB (based on conditions)
EC generated from TC (no conditions.)

Say there are events EA,EB,EC generated from the tables TA, TB, TC 

Event Sink Mapping

EA has following sinks. kafka topic SA,SA2,SAC. 
EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
EC has only rest endpoint RC. 

The point is the sink are not predefined. [. But i only see the example online where , flink code having explicit myStream.addSink(sink2);   ]

We expect around 500 types of events in our platform in another 2 years time. 

We are looking at writing a generic job for the same , rather than writing one for new case.

Let me know your thoughts and flink suitability to this requirement.

Thanks
Prasanna.


On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

You could easily filter/map/process the streams differently before writing them to the sinks. Building on top of my previous example, this also should work fine:


DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;

myStream.baz().addSink(sink1);
myStream.addSink(sink2);
myStream.qux().quuz().corge().addSink(sink3);
 
Where foo/bar/baz/quz/quuz/corge are any stream processing functions that you wish. `foo` and `bar` would be applied once to the stream, before it’s going to be split to different sinks, while `baz`, `qux`, `quuz` and `corge` would be applied to only of the sinks AFTER splitting.

In your case, it could be:

myStream.filter(...).addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

So sink2 and sink3 would get all of the records, while sink1 only a portion of them.

Piotrek 


On 26 May 2020, at 06:45, Prasanna kumar <[hidden email]> wrote:

Piotr, 

Thanks for the reply. 

There is one other case, where some events have to be written to multiple sinks and while other have to be written to just one sink. 

How could i have a common codeflow/DAG for the same ?

I do not want multiple jobs to do the same want to accomplish in a single job .

Could i add Stream code "myStream.addSink(sink1)" under a conditional operator such as 'if' to determine . 

But i suppose here the stream works differently compared to normal code processing.

Prasanna.


On Mon 25 May, 2020, 23:37 Piotr Nowojski, <[hidden email]> wrote:
Hi,

To the best of my knowledge the following pattern should work just fine:

DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;
myStream.addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

All of the records from `myStream` would be passed to each of the sinks.

Piotrek

> On 24 May 2020, at 19:34, Prasanna kumar <[hidden email]> wrote:
>
> Hi,
>
> There is a single source of events for me in my system.
>
> I need to process and send the events to multiple destination/sink at the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>
> I am able send to one sink.
>
> By adding more sink stream to the source stream could we achieve it . Are there any shortcomings. 
>
> Please let me know if any one here has successfully implemented one .
>
> Thanks,
> Prasanna.



Reply | Threaded
Open this post in threaded view
|

Re: Multiple Sinks for a Single Soure

Prasanna kumar
Hi ,

I have a Event router Registry as this. By reading this as input i need to create a Job which would redirect the messages to the correct sink as per condition.
{
"eventRouterRegistry": [
{ "eventType": "biling", "outputTopic": "billing" },
{ "eventType": "cost", "outputTopic": "cost" },
{ "eventType": "marketing", "outputTopic": "marketing" }
]
}
I tried the following two approaches. 
1) Using the Filter method

public static void setupRouterJobsFilter(
List<eventRouterRegistry> registryList, StreamExecutionEnvironment env) {

Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVER);
props.put("client.id", "flink-example1");

FlinkKafkaConsumer011 fkC =
new FlinkKafkaConsumer011<>("EVENTTOPIC", new SimpleStringSchema(), props);

DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC");

for (eventRouterRegistry record : registryList) {
System.out.print(record.getEventType() + " <==> " + record.getOutputTopic());

FlinkKafkaProducer011 fkp =
new FlinkKafkaProducer011<>(record.getOutputTopic(), new SimpleStringSchema(), props);

inputStream.filter(msg -> msg.equals(record.getEventType()) );
//sideOutputStream.print();
inputStream.addSink(fkp).name(record.getOutputTopic());
}
}
Here am getting the following error. 

 ./bin/flink run -c firstflinkpackage.GenericStreamRouter ../../myrepository/flink001/target/flink001-1.0.jar

Starting execution of program

---------------------------

 The program finished with the following exception:


The implementation of the FilterFunction is not serializable. The object probably contains or references non serializable fields.

org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)

org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)

org.apache.flink.streaming.api.datastream.DataStream.filter(DataStream.java:686)

firstflinkpackage.GenericStreamRouter.setupRouterJobsFilter(GenericStreamRouter.java:118)

firstflinkpackage.GenericStreamRouter.main(GenericStreamRouter.java:93)


Looks like I should not use  record.getEventType() as this is outside of the stream. 

Is there a any way to use external variable here mainly to Generalise the process.

2) Using the Side Output method

Following code is my attempt in creating a generic way for sideoutput creation. 

I am able to create the Sink Streams based on the list (eventRouterRegistry). 

But i could not generalise the Output tag creation. 
The issue here is the output tag is fixed. 
My output tag need to be the Event Type and that needs to be in Process Function too. 

How do i implement. Should I write my own process function ?
 public static void setupRouterJobs(
List<eventRouterRegistry> registryList, StreamExecutionEnvironment env) {

Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVER);
props.put("client.id", "flink-example1");

FlinkKafkaConsumer011 fkC =
new FlinkKafkaConsumer011<>("EVENTTOPIC", new SimpleStringSchema(), props);

DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC");
     //Even if i try to generalise OUtput tag here. How do i do it inside ProcessFunction
     final OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
SingleOutputStreamOperator<String> mainDataStream =
inputStream.process(
new ProcessFunction<String, String>() {

@Override
public void processElement(String value, Context ctx, Collector<String> out)
throws Exception {
// emit data to side output
ctx.output(OutputTag, value);
}
});

for (eventRouterRegistry record : registryList) {
System.out.print(record.getEventType() + " <==> " + record.getOutputTopic());

FlinkKafkaProducer011 fkp =
new FlinkKafkaProducer011<>(record.getOutputTopic(), new SimpleStringSchema(), props);

DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
sideOutputStream.print();
sideOutputStream.addSink(fkp).name(record.getOutputTopic());
}
}

Thanks,
Prasanna.



On Thu, May 28, 2020 at 8:24 PM Prasanna kumar <[hidden email]> wrote:
Alexander, 

Thanks for the reply. Will implement and come back in case of any questions. 

Prasanna.

On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov <[hidden email]> wrote:
Hi Prasanna,

if the set of all possible sinks is known in advance, side outputs will be generic enough to express your requirements. Side output produces a stream. Create all of the side output tags, associate each of them with one sink, add conditional logic around `ctx.output(outputTag, ... );`  to decide where to dispatch the messages  (see [1]), collect to none or many side outputs, depending on your logic.

--

Alexander Fedulov | Solutions Architect


image.png













Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time



On Tue, May 26, 2020 at 2:57 PM Prasanna kumar <[hidden email]> wrote:
Piotr, 

There is an event and subscriber registry as JSON file which has the table event mapping and event-subscriber mapping as mentioned below.

Based on the set JSON , we need to job to go through the table updates and create events and for each event there is a way set how to sink them.

The sink streams have to be added based on this JSON. Thats what i mentioned as no predefined sink in code earlier.

You could see that each event has different set of sinks. 

Just checking how much generic could Side-output streams be ?.

Source -> generate events -> (find out sinks dynamically in code ) -> write to the respective sinks. 

{
  " tablename ": "source.table1",
  "events": [
    {
      "operation": "update",
      "eventstobecreated": [
        {
          "eventname": "USERUPDATE",
          "Columnoperation": "and",
          "ColumnChanges": [
            {
              "columnname": "name"
            },
            {
              "columnname": "loginenabled",
              "value": "Y"
            }
          ],
          "Subscribers": [
            {
              "customername": "c1",
              "method": "Kafka",
              "methodparams": {
                "topicname": "USERTOPIC"
              }
            },
            {
              "customername": "c2",
              "method": "S3",
              "methodparams": {
                "folder": "aws://folderC2"
              }}, ]}]
    },
    {
      "operation": "insert",
      "eventstobecreated": [
          "eventname": "USERINSERT",
          "operation": "insert",
          "Subscribers": [
            {
              "teamname": "General",
              "method": "Kafka",
              "methodparams": {
                "topicname": "new_users"
              }
            },
            {
              "teamname": "General",
              "method": "kinesis",
              "methodparams": {
                "URL": "new_users",
                "username": "uname",
                "password":  "pwd"
              }}, ]}]
    },
    {
      "operation": "delete",
      "eventstobecreated": [
        {
          "eventname": "USERDELETE",
          "Subscribers": [
            {
              "customername": "c1",
              "method": "Kafka",
              "methodparams": {
                "topicname": "USERTOPIC"
              }
            },
            {
              "customername": "c4",
              "method": "Kafka",
              "methodparams": {
                "topicname": "deleterecords"
             }}, ]}]
     },
}

Please let me know your thoughts on this.

Thanks,
Prasanna.

On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure if I fully understand what do you mean by

> The point is the sink are not predefined.

You must know before submitting the job, what sinks are going to be used in the job. You can have some custom logic, that would filter out records before writing them to the sinks, as I proposed before, or you could use side outputs [1] would be better suited to your use case? 

Piotrek

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

On 26 May 2020, at 12:20, Prasanna kumar <[hidden email]> wrote:

Thanks Piotr for the Reply. 

I will explain my requirement in detail. 

Table Updates -> Generate Business Events -> Subscribers 

Source Side
There are CDC of 100 tables which the framework needs to listen to. 

Event Table Mapping

There would be Event associated with table in a m:n fashion. 

say there are tables TA, TB, TC. 

EA, EA2 and EA3 are generated from TA (based on conditions)
EB generated from TB (based on conditions)
EC generated from TC (no conditions.)

Say there are events EA,EB,EC generated from the tables TA, TB, TC 

Event Sink Mapping

EA has following sinks. kafka topic SA,SA2,SAC. 
EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
EC has only rest endpoint RC. 

The point is the sink are not predefined. [. But i only see the example online where , flink code having explicit myStream.addSink(sink2);   ]

We expect around 500 types of events in our platform in another 2 years time. 

We are looking at writing a generic job for the same , rather than writing one for new case.

Let me know your thoughts and flink suitability to this requirement.

Thanks
Prasanna.


On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

You could easily filter/map/process the streams differently before writing them to the sinks. Building on top of my previous example, this also should work fine:


DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;

myStream.baz().addSink(sink1);
myStream.addSink(sink2);
myStream.qux().quuz().corge().addSink(sink3);
 
Where foo/bar/baz/quz/quuz/corge are any stream processing functions that you wish. `foo` and `bar` would be applied once to the stream, before it’s going to be split to different sinks, while `baz`, `qux`, `quuz` and `corge` would be applied to only of the sinks AFTER splitting.

In your case, it could be:

myStream.filter(...).addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

So sink2 and sink3 would get all of the records, while sink1 only a portion of them.

Piotrek 


On 26 May 2020, at 06:45, Prasanna kumar <[hidden email]> wrote:

Piotr, 

Thanks for the reply. 

There is one other case, where some events have to be written to multiple sinks and while other have to be written to just one sink. 

How could i have a common codeflow/DAG for the same ?

I do not want multiple jobs to do the same want to accomplish in a single job .

Could i add Stream code "myStream.addSink(sink1)" under a conditional operator such as 'if' to determine . 

But i suppose here the stream works differently compared to normal code processing.

Prasanna.


On Mon 25 May, 2020, 23:37 Piotr Nowojski, <[hidden email]> wrote:
Hi,

To the best of my knowledge the following pattern should work just fine:

DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;
myStream.addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

All of the records from `myStream` would be passed to each of the sinks.

Piotrek

> On 24 May 2020, at 19:34, Prasanna kumar <[hidden email]> wrote:
>
> Hi,
>
> There is a single source of events for me in my system.
>
> I need to process and send the events to multiple destination/sink at the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>
> I am able send to one sink.
>
> By adding more sink stream to the source stream could we achieve it . Are there any shortcomings. 
>
> Please let me know if any one here has successfully implemented one .
>
> Thanks,
> Prasanna.



Reply | Threaded
Open this post in threaded view
|

Re: Multiple Sinks for a Single Soure

Piotr Nowojski-3
Hi Prasanna,

1.

> The object probably contains or references non serializable fields.

That should speak for itself. Flink was not able to distribute your code to the worker nodes. 

You have used a lambda function that turned out to be non serialisable. You should unit test your code and in this case add a serialisation/deserialisation round trip unit test for the filter function. For starters I would suggest to not use lambda function, but a full/proper named class and work from there.

2.

Can not you create an array/map/collection of OutputTags corresponding to the the sinks/topics combinations. One OutputTag per sink(/topic) and use this array/map/collection inside your process function?

Piotrek 

On 2 Jun 2020, at 13:49, Prasanna kumar <[hidden email]> wrote:

Hi ,

I have a Event router Registry as this. By reading this as input i need to create a Job which would redirect the messages to the correct sink as per condition.
{
"eventRouterRegistry": [
{ "eventType": "biling", "outputTopic": "billing" },
{ "eventType": "cost", "outputTopic": "cost" },
{ "eventType": "marketing", "outputTopic": "marketing" }
]
}
I tried the following two approaches. 
1) Using the Filter method

public static void setupRouterJobsFilter(
List<eventRouterRegistry> registryList, StreamExecutionEnvironment env) {

Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVER);
props.put("client.id", "flink-example1");

FlinkKafkaConsumer011 fkC =
new FlinkKafkaConsumer011<>("EVENTTOPIC", new SimpleStringSchema(), props);

DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC");

for (eventRouterRegistry record : registryList) {
System.out.print(record.getEventType() + " <==> " + record.getOutputTopic());

FlinkKafkaProducer011 fkp =
new FlinkKafkaProducer011<>(record.getOutputTopic(), new SimpleStringSchema(), props);

inputStream.filter(msg -> msg.equals(record.getEventType()) );
//sideOutputStream.print();
inputStream.addSink(fkp).name(record.getOutputTopic());
}
}
Here am getting the following error. 
 ./bin/flink run -c firstflinkpackage.GenericStreamRouter ../../myrepository/flink001/target/flink001-1.0.jar
Starting execution of program
---------------------------
 The program finished with the following exception:

The implementation of the FilterFunction is not serializable. The object probably contains or references non serializable fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
org.apache.flink.streaming.api.datastream.DataStream.filter(DataStream.java:686)
firstflinkpackage.GenericStreamRouter.setupRouterJobsFilter(GenericStreamRouter.java:118)
firstflinkpackage.GenericStreamRouter.main(GenericStreamRouter.java:93)

Looks like I should not use  record.getEventType() as this is outside of the stream. 

Is there a any way to use external variable here mainly to Generalise the process.

2) Using the Side Output method

Following code is my attempt in creating a generic way for sideoutput creation. 

I am able to create the Sink Streams based on the list (eventRouterRegistry). 

But i could not generalise the Output tag creation. 
The issue here is the output tag is fixed. 
My output tag need to be the Event Type and that needs to be in Process Function too. 

How do i implement. Should I write my own process function ?
 public static void setupRouterJobs(
List<eventRouterRegistry> registryList, StreamExecutionEnvironment env) {

Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVER);
props.put("client.id", "flink-example1");

FlinkKafkaConsumer011 fkC =
new FlinkKafkaConsumer011<>("EVENTTOPIC", new SimpleStringSchema(), props);

DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC");
     //Even if i try to generalise OUtput tag here. How do i do it inside ProcessFunction
     final OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
SingleOutputStreamOperator<String> mainDataStream =
inputStream.process(
new ProcessFunction<String, String>() {

@Override
public void processElement(String value, Context ctx, Collector<String> out)
throws Exception {
// emit data to side output
ctx.output(OutputTag, value);
}
});

for (eventRouterRegistry record : registryList) {
System.out.print(record.getEventType() + " <==> " + record.getOutputTopic());

FlinkKafkaProducer011 fkp =
new FlinkKafkaProducer011<>(record.getOutputTopic(), new SimpleStringSchema(), props);

DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
sideOutputStream.print();
sideOutputStream.addSink(fkp).name(record.getOutputTopic());
}
}

Thanks,
Prasanna.



On Thu, May 28, 2020 at 8:24 PM Prasanna kumar <[hidden email]> wrote:
Alexander, 

Thanks for the reply. Will implement and come back in case of any questions. 

Prasanna.

On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov <[hidden email]> wrote:
Hi Prasanna,

if the set of all possible sinks is known in advance, side outputs will be generic enough to express your requirements. Side output produces a stream. Create all of the side output tags, associate each of them with one sink, add conditional logic around `ctx.output(outputTag, ... );`  to decide where to dispatch the messages  (see [1]), collect to none or many side outputs, depending on your logic.

--
Alexander Fedulov | Solutions Architect

<image.png>













Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time


On Tue, May 26, 2020 at 2:57 PM Prasanna kumar <[hidden email]> wrote:
Piotr, 

There is an event and subscriber registry as JSON file which has the table event mapping and event-subscriber mapping as mentioned below.

Based on the set JSON , we need to job to go through the table updates and create events and for each event there is a way set how to sink them.

The sink streams have to be added based on this JSON. Thats what i mentioned as no predefined sink in code earlier.

You could see that each event has different set of sinks. 

Just checking how much generic could Side-output streams be ?.

Source -> generate events -> (find out sinks dynamically in code ) -> write to the respective sinks. 

{
  " tablename ": "source.table1",
  "events": [
    {
      "operation": "update",
      "eventstobecreated": [
        {
          "eventname": "USERUPDATE",
          "Columnoperation": "and",
          "ColumnChanges": [
            {
              "columnname": "name"
            },
            {
              "columnname": "loginenabled",
              "value": "Y"
            }
          ],
          "Subscribers": [
            {
              "customername": "c1",
              "method": "Kafka",
              "methodparams": {
                "topicname": "USERTOPIC"
              }
            },
            {
              "customername": "c2",
              "method": "S3",
              "methodparams": {
                "folder": "<a href="aws://folderC2" class="">aws://folderC2"
              }}, ]}]
    },
    {
      "operation": "insert",
      "eventstobecreated": [
          "eventname": "USERINSERT",
          "operation": "insert",
          "Subscribers": [
            {
              "teamname": "General",
              "method": "Kafka",
              "methodparams": {
                "topicname": "new_users"
              }
            },
            {
              "teamname": "General",
              "method": "kinesis",
              "methodparams": {
                "URL": "new_users",
                "username": "uname",
                "password":  "pwd"
              }}, ]}]
    },
    {
      "operation": "delete",
      "eventstobecreated": [
        {
          "eventname": "USERDELETE",
          "Subscribers": [
            {
              "customername": "c1",
              "method": "Kafka",
              "methodparams": {
                "topicname": "USERTOPIC"
              }
            },
            {
              "customername": "c4",
              "method": "Kafka",
              "methodparams": {
                "topicname": "deleterecords"
             }}, ]}]
     },
}

Please let me know your thoughts on this.

Thanks,
Prasanna.

On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure if I fully understand what do you mean by

> The point is the sink are not predefined.

You must know before submitting the job, what sinks are going to be used in the job. You can have some custom logic, that would filter out records before writing them to the sinks, as I proposed before, or you could use side outputs [1] would be better suited to your use case? 

Piotrek

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

On 26 May 2020, at 12:20, Prasanna kumar <[hidden email]> wrote:

Thanks Piotr for the Reply. 

I will explain my requirement in detail. 

Table Updates -> Generate Business Events -> Subscribers 

Source Side
There are CDC of 100 tables which the framework needs to listen to. 

Event Table Mapping

There would be Event associated with table in a m:n fashion. 

say there are tables TA, TB, TC. 

EA, EA2 and EA3 are generated from TA (based on conditions)
EB generated from TB (based on conditions)
EC generated from TC (no conditions.)

Say there are events EA,EB,EC generated from the tables TA, TB, TC 

Event Sink Mapping

EA has following sinks. kafka topic SA,SA2,SAC. 
EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
EC has only rest endpoint RC. 

The point is the sink are not predefined. [. But i only see the example online where , flink code having explicit myStream.addSink(sink2);   ]

We expect around 500 types of events in our platform in another 2 years time. 

We are looking at writing a generic job for the same , rather than writing one for new case.

Let me know your thoughts and flink suitability to this requirement.

Thanks
Prasanna.


On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

You could easily filter/map/process the streams differently before writing them to the sinks. Building on top of my previous example, this also should work fine:


DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;

myStream.baz().addSink(sink1);
myStream.addSink(sink2);
myStream.qux().quuz().corge().addSink(sink3);
 
Where foo/bar/baz/quz/quuz/corge are any stream processing functions that you wish. `foo` and `bar` would be applied once to the stream, before it’s going to be split to different sinks, while `baz`, `qux`, `quuz` and `corge` would be applied to only of the sinks AFTER splitting.

In your case, it could be:

myStream.filter(...).addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

So sink2 and sink3 would get all of the records, while sink1 only a portion of them.

Piotrek 


On 26 May 2020, at 06:45, Prasanna kumar <[hidden email]> wrote:

Piotr, 

Thanks for the reply. 

There is one other case, where some events have to be written to multiple sinks and while other have to be written to just one sink. 

How could i have a common codeflow/DAG for the same ?

I do not want multiple jobs to do the same want to accomplish in a single job .

Could i add Stream code "myStream.addSink(sink1)" under a conditional operator such as 'if' to determine . 

But i suppose here the stream works differently compared to normal code processing.

Prasanna.


On Mon 25 May, 2020, 23:37 Piotr Nowojski, <[hidden email]> wrote:
Hi,

To the best of my knowledge the following pattern should work just fine:

DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;
myStream.addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

All of the records from `myStream` would be passed to each of the sinks.

Piotrek

> On 24 May 2020, at 19:34, Prasanna kumar <[hidden email]> wrote:
> 
> Hi,
> 
> There is a single source of events for me in my system. 
> 
> I need to process and send the events to multiple destination/sink at the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
> 
> I am able send to one sink.
> 
> By adding more sink stream to the source stream could we achieve it . Are there any shortcomings.  
> 
> Please let me know if any one here has successfully implemented one .
> 
> Thanks,
> Prasanna.
Reply | Threaded
Open this post in threaded view
|

Re: Multiple Sinks for a Single Soure

Prasanna kumar
Piotr and Alexander , 

I have fixed the programmatic error in filter method and it is working now.

Thanks for the detailed help from both of you. Am able to add the sinks based on the JSON and create DAG.

Thanks,
Prasanna.

On Wed, Jun 3, 2020 at 4:51 PM Piotr Nowojski <[hidden email]> wrote:
Hi Prasanna,

1.

> The object probably contains or references non serializable fields.

That should speak for itself. Flink was not able to distribute your code to the worker nodes. 

You have used a lambda function that turned out to be non serialisable. You should unit test your code and in this case add a serialisation/deserialisation round trip unit test for the filter function. For starters I would suggest to not use lambda function, but a full/proper named class and work from there.

2.

Can not you create an array/map/collection of OutputTags corresponding to the the sinks/topics combinations. One OutputTag per sink(/topic) and use this array/map/collection inside your process function?

Piotrek 

On 2 Jun 2020, at 13:49, Prasanna kumar <[hidden email]> wrote:

Hi ,

I have a Event router Registry as this. By reading this as input i need to create a Job which would redirect the messages to the correct sink as per condition.
{
"eventRouterRegistry": [
{ "eventType": "biling", "outputTopic": "billing" },
{ "eventType": "cost", "outputTopic": "cost" },
{ "eventType": "marketing", "outputTopic": "marketing" }
]
}
I tried the following two approaches. 
1) Using the Filter method

public static void setupRouterJobsFilter(
List<eventRouterRegistry> registryList, StreamExecutionEnvironment env) {

Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVER);
props.put("client.id", "flink-example1");

FlinkKafkaConsumer011 fkC =
new FlinkKafkaConsumer011<>("EVENTTOPIC", new SimpleStringSchema(), props);

DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC");

for (eventRouterRegistry record : registryList) {
System.out.print(record.getEventType() + " <==> " + record.getOutputTopic());

FlinkKafkaProducer011 fkp =
new FlinkKafkaProducer011<>(record.getOutputTopic(), new SimpleStringSchema(), props);

inputStream.filter(msg -> msg.equals(record.getEventType()) );
//sideOutputStream.print();
inputStream.addSink(fkp).name(record.getOutputTopic());
}
}
Here am getting the following error. 
 ./bin/flink run -c firstflinkpackage.GenericStreamRouter ../../myrepository/flink001/target/flink001-1.0.jar
Starting execution of program
---------------------------
 The program finished with the following exception:

The implementation of the FilterFunction is not serializable. The object probably contains or references non serializable fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
org.apache.flink.streaming.api.datastream.DataStream.filter(DataStream.java:686)
firstflinkpackage.GenericStreamRouter.setupRouterJobsFilter(GenericStreamRouter.java:118)
firstflinkpackage.GenericStreamRouter.main(GenericStreamRouter.java:93)

Looks like I should not use  record.getEventType() as this is outside of the stream. 

Is there a any way to use external variable here mainly to Generalise the process.

2) Using the Side Output method

Following code is my attempt in creating a generic way for sideoutput creation. 

I am able to create the Sink Streams based on the list (eventRouterRegistry). 

But i could not generalise the Output tag creation. 
The issue here is the output tag is fixed. 
My output tag need to be the Event Type and that needs to be in Process Function too. 

How do i implement. Should I write my own process function ?
 public static void setupRouterJobs(
List<eventRouterRegistry> registryList, StreamExecutionEnvironment env) {

Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVER);
props.put("client.id", "flink-example1");

FlinkKafkaConsumer011 fkC =
new FlinkKafkaConsumer011<>("EVENTTOPIC", new SimpleStringSchema(), props);

DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC");
     //Even if i try to generalise OUtput tag here. How do i do it inside ProcessFunction
     final OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
SingleOutputStreamOperator<String> mainDataStream =
inputStream.process(
new ProcessFunction<String, String>() {

@Override
public void processElement(String value, Context ctx, Collector<String> out)
throws Exception {
// emit data to side output
ctx.output(OutputTag, value);
}
});

for (eventRouterRegistry record : registryList) {
System.out.print(record.getEventType() + " <==> " + record.getOutputTopic());

FlinkKafkaProducer011 fkp =
new FlinkKafkaProducer011<>(record.getOutputTopic(), new SimpleStringSchema(), props);

DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
sideOutputStream.print();
sideOutputStream.addSink(fkp).name(record.getOutputTopic());
}
}

Thanks,
Prasanna.



On Thu, May 28, 2020 at 8:24 PM Prasanna kumar <[hidden email]> wrote:
Alexander, 

Thanks for the reply. Will implement and come back in case of any questions. 

Prasanna.

On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov <[hidden email]> wrote:
Hi Prasanna,

if the set of all possible sinks is known in advance, side outputs will be generic enough to express your requirements. Side output produces a stream. Create all of the side output tags, associate each of them with one sink, add conditional logic around `ctx.output(outputTag, ... );`  to decide where to dispatch the messages  (see [1]), collect to none or many side outputs, depending on your logic.

--
Alexander Fedulov | Solutions Architect

<image.png>













Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time


On Tue, May 26, 2020 at 2:57 PM Prasanna kumar <[hidden email]> wrote:
Piotr, 

There is an event and subscriber registry as JSON file which has the table event mapping and event-subscriber mapping as mentioned below.

Based on the set JSON , we need to job to go through the table updates and create events and for each event there is a way set how to sink them.

The sink streams have to be added based on this JSON. Thats what i mentioned as no predefined sink in code earlier.

You could see that each event has different set of sinks. 

Just checking how much generic could Side-output streams be ?.

Source -> generate events -> (find out sinks dynamically in code ) -> write to the respective sinks. 

{
  " tablename ": "source.table1",
  "events": [
    {
      "operation": "update",
      "eventstobecreated": [
        {
          "eventname": "USERUPDATE",
          "Columnoperation": "and",
          "ColumnChanges": [
            {
              "columnname": "name"
            },
            {
              "columnname": "loginenabled",
              "value": "Y"
            }
          ],
          "Subscribers": [
            {
              "customername": "c1",
              "method": "Kafka",
              "methodparams": {
                "topicname": "USERTOPIC"
              }
            },
            {
              "customername": "c2",
              "method": "S3",
              "methodparams": {
                "folder": "aws://folderC2"
              }}, ]}]
    },
    {
      "operation": "insert",
      "eventstobecreated": [
          "eventname": "USERINSERT",
          "operation": "insert",
          "Subscribers": [
            {
              "teamname": "General",
              "method": "Kafka",
              "methodparams": {
                "topicname": "new_users"
              }
            },
            {
              "teamname": "General",
              "method": "kinesis",
              "methodparams": {
                "URL": "new_users",
                "username": "uname",
                "password":  "pwd"
              }}, ]}]
    },
    {
      "operation": "delete",
      "eventstobecreated": [
        {
          "eventname": "USERDELETE",
          "Subscribers": [
            {
              "customername": "c1",
              "method": "Kafka",
              "methodparams": {
                "topicname": "USERTOPIC"
              }
            },
            {
              "customername": "c4",
              "method": "Kafka",
              "methodparams": {
                "topicname": "deleterecords"
             }}, ]}]
     },
}

Please let me know your thoughts on this.

Thanks,
Prasanna.

On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure if I fully understand what do you mean by

> The point is the sink are not predefined.

You must know before submitting the job, what sinks are going to be used in the job. You can have some custom logic, that would filter out records before writing them to the sinks, as I proposed before, or you could use side outputs [1] would be better suited to your use case? 

Piotrek

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

On 26 May 2020, at 12:20, Prasanna kumar <[hidden email]> wrote:

Thanks Piotr for the Reply. 

I will explain my requirement in detail. 

Table Updates -> Generate Business Events -> Subscribers 

Source Side
There are CDC of 100 tables which the framework needs to listen to. 

Event Table Mapping

There would be Event associated with table in a m:n fashion. 

say there are tables TA, TB, TC. 

EA, EA2 and EA3 are generated from TA (based on conditions)
EB generated from TB (based on conditions)
EC generated from TC (no conditions.)

Say there are events EA,EB,EC generated from the tables TA, TB, TC 

Event Sink Mapping

EA has following sinks. kafka topic SA,SA2,SAC. 
EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
EC has only rest endpoint RC. 

The point is the sink are not predefined. [. But i only see the example online where , flink code having explicit myStream.addSink(sink2);   ]

We expect around 500 types of events in our platform in another 2 years time. 

We are looking at writing a generic job for the same , rather than writing one for new case.

Let me know your thoughts and flink suitability to this requirement.

Thanks
Prasanna.


On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

You could easily filter/map/process the streams differently before writing them to the sinks. Building on top of my previous example, this also should work fine:


DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;

myStream.baz().addSink(sink1);
myStream.addSink(sink2);
myStream.qux().quuz().corge().addSink(sink3);
 
Where foo/bar/baz/quz/quuz/corge are any stream processing functions that you wish. `foo` and `bar` would be applied once to the stream, before it’s going to be split to different sinks, while `baz`, `qux`, `quuz` and `corge` would be applied to only of the sinks AFTER splitting.

In your case, it could be:

myStream.filter(...).addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

So sink2 and sink3 would get all of the records, while sink1 only a portion of them.

Piotrek 


On 26 May 2020, at 06:45, Prasanna kumar <[hidden email]> wrote:

Piotr, 

Thanks for the reply. 

There is one other case, where some events have to be written to multiple sinks and while other have to be written to just one sink. 

How could i have a common codeflow/DAG for the same ?

I do not want multiple jobs to do the same want to accomplish in a single job .

Could i add Stream code "myStream.addSink(sink1)" under a conditional operator such as 'if' to determine . 

But i suppose here the stream works differently compared to normal code processing.

Prasanna.


On Mon 25 May, 2020, 23:37 Piotr Nowojski, <[hidden email]> wrote:
Hi,

To the best of my knowledge the following pattern should work just fine:

DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;
myStream.addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

All of the records from `myStream` would be passed to each of the sinks.

Piotrek

> On 24 May 2020, at 19:34, Prasanna kumar <[hidden email]> wrote:
> 
> Hi,
> 
> There is a single source of events for me in my system. 
> 
> I need to process and send the events to multiple destination/sink at the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
> 
> I am able send to one sink.
> 
> By adding more sink stream to the source stream could we achieve it . Are there any shortcomings.  
> 
> Please let me know if any one here has successfully implemented one .
> 
> Thanks,
> Prasanna.
Reply | Threaded
Open this post in threaded view
|

Re: Multiple Sinks for a Single Soure

Piotr Nowojski-3
Hi Prasanna,

That’s good to hear and thanks for confirming that it works :)

Piotrek

On 3 Jun 2020, at 16:09, Prasanna kumar <[hidden email]> wrote:

Piotr and Alexander , 

I have fixed the programmatic error in filter method and it is working now.

Thanks for the detailed help from both of you. Am able to add the sinks based on the JSON and create DAG.

Thanks,
Prasanna.

On Wed, Jun 3, 2020 at 4:51 PM Piotr Nowojski <[hidden email]> wrote:
Hi Prasanna,

1.

> The object probably contains or references non serializable fields.

That should speak for itself. Flink was not able to distribute your code to the worker nodes. 

You have used a lambda function that turned out to be non serialisable. You should unit test your code and in this case add a serialisation/deserialisation round trip unit test for the filter function. For starters I would suggest to not use lambda function, but a full/proper named class and work from there.

2.

Can not you create an array/map/collection of OutputTags corresponding to the the sinks/topics combinations. One OutputTag per sink(/topic) and use this array/map/collection inside your process function?

Piotrek 

On 2 Jun 2020, at 13:49, Prasanna kumar <[hidden email]> wrote:

Hi ,

I have a Event router Registry as this. By reading this as input i need to create a Job which would redirect the messages to the correct sink as per condition.
{
"eventRouterRegistry": [
{ "eventType": "biling", "outputTopic": "billing" },
{ "eventType": "cost", "outputTopic": "cost" },
{ "eventType": "marketing", "outputTopic": "marketing" }
]
}
I tried the following two approaches. 
1) Using the Filter method

public static void setupRouterJobsFilter(
List<eventRouterRegistry> registryList, StreamExecutionEnvironment env) {

Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVER);
props.put("client.id", "flink-example1");

FlinkKafkaConsumer011 fkC =
new FlinkKafkaConsumer011<>("EVENTTOPIC", new SimpleStringSchema(), props);

DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC");

for (eventRouterRegistry record : registryList) {
System.out.print(record.getEventType() + " <==> " + record.getOutputTopic());

FlinkKafkaProducer011 fkp =
new FlinkKafkaProducer011<>(record.getOutputTopic(), new SimpleStringSchema(), props);

inputStream.filter(msg -> msg.equals(record.getEventType()) );
//sideOutputStream.print();
inputStream.addSink(fkp).name(record.getOutputTopic());
}
}
Here am getting the following error. 
 ./bin/flink run -c firstflinkpackage.GenericStreamRouter ../../myrepository/flink001/target/flink001-1.0.jar
Starting execution of program
---------------------------
 The program finished with the following exception:

The implementation of the FilterFunction is not serializable. The object probably contains or references non serializable fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
org.apache.flink.streaming.api.datastream.DataStream.filter(DataStream.java:686)
firstflinkpackage.GenericStreamRouter.setupRouterJobsFilter(GenericStreamRouter.java:118)
firstflinkpackage.GenericStreamRouter.main(GenericStreamRouter.java:93)

Looks like I should not use  record.getEventType() as this is outside of the stream. 

Is there a any way to use external variable here mainly to Generalise the process.

2) Using the Side Output method

Following code is my attempt in creating a generic way for sideoutput creation. 

I am able to create the Sink Streams based on the list (eventRouterRegistry). 

But i could not generalise the Output tag creation. 
The issue here is the output tag is fixed. 
My output tag need to be the Event Type and that needs to be in Process Function too. 

How do i implement. Should I write my own process function ?
 public static void setupRouterJobs(
List<eventRouterRegistry> registryList, StreamExecutionEnvironment env) {

Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVER);
props.put("client.id", "flink-example1");

FlinkKafkaConsumer011 fkC =
new FlinkKafkaConsumer011<>("EVENTTOPIC", new SimpleStringSchema(), props);

DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC");
     //Even if i try to generalise OUtput tag here. How do i do it inside ProcessFunction
     final OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
SingleOutputStreamOperator<String> mainDataStream =
inputStream.process(
new ProcessFunction<String, String>() {

@Override
public void processElement(String value, Context ctx, Collector<String> out)
throws Exception {
// emit data to side output
ctx.output(OutputTag, value);
}
});

for (eventRouterRegistry record : registryList) {
System.out.print(record.getEventType() + " <==> " + record.getOutputTopic());

FlinkKafkaProducer011 fkp =
new FlinkKafkaProducer011<>(record.getOutputTopic(), new SimpleStringSchema(), props);

DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
sideOutputStream.print();
sideOutputStream.addSink(fkp).name(record.getOutputTopic());
}
}

Thanks,
Prasanna.



On Thu, May 28, 2020 at 8:24 PM Prasanna kumar <[hidden email]> wrote:
Alexander, 

Thanks for the reply. Will implement and come back in case of any questions. 

Prasanna.

On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov <[hidden email]> wrote:
Hi Prasanna,

if the set of all possible sinks is known in advance, side outputs will be generic enough to express your requirements. Side output produces a stream. Create all of the side output tags, associate each of them with one sink, add conditional logic around `ctx.output(outputTag, ... );`  to decide where to dispatch the messages  (see [1]), collect to none or many side outputs, depending on your logic.

--
Alexander Fedulov | Solutions Architect

<image.png>













Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time


On Tue, May 26, 2020 at 2:57 PM Prasanna kumar <[hidden email]> wrote:
Piotr, 

There is an event and subscriber registry as JSON file which has the table event mapping and event-subscriber mapping as mentioned below.

Based on the set JSON , we need to job to go through the table updates and create events and for each event there is a way set how to sink them.

The sink streams have to be added based on this JSON. Thats what i mentioned as no predefined sink in code earlier.

You could see that each event has different set of sinks. 

Just checking how much generic could Side-output streams be ?.

Source -> generate events -> (find out sinks dynamically in code ) -> write to the respective sinks. 

{
  " tablename ": "source.table1",
  "events": [
    {
      "operation": "update",
      "eventstobecreated": [
        {
          "eventname": "USERUPDATE",
          "Columnoperation": "and",
          "ColumnChanges": [
            {
              "columnname": "name"
            },
            {
              "columnname": "loginenabled",
              "value": "Y"
            }
          ],
          "Subscribers": [
            {
              "customername": "c1",
              "method": "Kafka",
              "methodparams": {
                "topicname": "USERTOPIC"
              }
            },
            {
              "customername": "c2",
              "method": "S3",
              "methodparams": {
                "folder": "aws://folderC2"
              }}, ]}]
    },
    {
      "operation": "insert",
      "eventstobecreated": [
          "eventname": "USERINSERT",
          "operation": "insert",
          "Subscribers": [
            {
              "teamname": "General",
              "method": "Kafka",
              "methodparams": {
                "topicname": "new_users"
              }
            },
            {
              "teamname": "General",
              "method": "kinesis",
              "methodparams": {
                "URL": "new_users",
                "username": "uname",
                "password":  "pwd"
              }}, ]}]
    },
    {
      "operation": "delete",
      "eventstobecreated": [
        {
          "eventname": "USERDELETE",
          "Subscribers": [
            {
              "customername": "c1",
              "method": "Kafka",
              "methodparams": {
                "topicname": "USERTOPIC"
              }
            },
            {
              "customername": "c4",
              "method": "Kafka",
              "methodparams": {
                "topicname": "deleterecords"
             }}, ]}]
     },
}

Please let me know your thoughts on this.

Thanks,
Prasanna.

On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure if I fully understand what do you mean by

> The point is the sink are not predefined.

You must know before submitting the job, what sinks are going to be used in the job. You can have some custom logic, that would filter out records before writing them to the sinks, as I proposed before, or you could use side outputs [1] would be better suited to your use case? 

Piotrek

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

On 26 May 2020, at 12:20, Prasanna kumar <[hidden email]> wrote:

Thanks Piotr for the Reply. 

I will explain my requirement in detail. 

Table Updates -> Generate Business Events -> Subscribers 

Source Side
There are CDC of 100 tables which the framework needs to listen to. 

Event Table Mapping

There would be Event associated with table in a m:n fashion. 

say there are tables TA, TB, TC. 

EA, EA2 and EA3 are generated from TA (based on conditions)
EB generated from TB (based on conditions)
EC generated from TC (no conditions.)

Say there are events EA,EB,EC generated from the tables TA, TB, TC 

Event Sink Mapping

EA has following sinks. kafka topic SA,SA2,SAC. 
EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
EC has only rest endpoint RC. 

The point is the sink are not predefined. [. But i only see the example online where , flink code having explicit myStream.addSink(sink2);   ]

We expect around 500 types of events in our platform in another 2 years time. 

We are looking at writing a generic job for the same , rather than writing one for new case.

Let me know your thoughts and flink suitability to this requirement.

Thanks
Prasanna.


On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

You could easily filter/map/process the streams differently before writing them to the sinks. Building on top of my previous example, this also should work fine:


DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;

myStream.baz().addSink(sink1);
myStream.addSink(sink2);
myStream.qux().quuz().corge().addSink(sink3);
 
Where foo/bar/baz/quz/quuz/corge are any stream processing functions that you wish. `foo` and `bar` would be applied once to the stream, before it’s going to be split to different sinks, while `baz`, `qux`, `quuz` and `corge` would be applied to only of the sinks AFTER splitting.

In your case, it could be:

myStream.filter(...).addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

So sink2 and sink3 would get all of the records, while sink1 only a portion of them.

Piotrek 


On 26 May 2020, at 06:45, Prasanna kumar <[hidden email]> wrote:

Piotr, 

Thanks for the reply. 

There is one other case, where some events have to be written to multiple sinks and while other have to be written to just one sink. 

How could i have a common codeflow/DAG for the same ?

I do not want multiple jobs to do the same want to accomplish in a single job .

Could i add Stream code "myStream.addSink(sink1)" under a conditional operator such as 'if' to determine . 

But i suppose here the stream works differently compared to normal code processing.

Prasanna.


On Mon 25 May, 2020, 23:37 Piotr Nowojski, <[hidden email]> wrote:
Hi,

To the best of my knowledge the following pattern should work just fine:

DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ;
myStream.addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

All of the records from `myStream` would be passed to each of the sinks.

Piotrek

> On 24 May 2020, at 19:34, Prasanna kumar <[hidden email]> wrote:
> 
> Hi,
> 
> There is a single source of events for me in my system. 
> 
> I need to process and send the events to multiple destination/sink at the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
> 
> I am able send to one sink.
> 
> By adding more sink stream to the source stream could we achieve it . Are there any shortcomings.  
> 
> Please let me know if any one here has successfully implemented one .
> 
> Thanks,
> Prasanna.