Dynamic source and sink.

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Dynamic source and sink.

dinesh
Hi All,

In a flink job I have a pipeline. It is consuming data from one kafka topic and storing data to Elastic search cluster.

without restarting the job can we add another kafka cluster and another elastic search sink to the job. Which means i will supply the new kafka cluster and elastic search details in the topic.  After consuming the data can our flink job add the new source and sink to the same job.


Thanks & Regards,
Dinesh.
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic source and sink.

Danny Chan
Sorry, a job graph is solid while we compile it before submitting to the cluster, not dynamic as what you want.

You did can write some wrapper operators which response to your own PRCs to run the appended operators you want,
But the you should keep the consistency semantics by yourself.

Best,
Danny Chan
在 2020年6月28日 +0800 PM3:30,C DINESH <[hidden email]>,写道:
Hi All,

In a flink job I have a pipeline. It is consuming data from one kafka topic and storing data to Elastic search cluster.

without restarting the job can we add another kafka cluster and another elastic search sink to the job. Which means i will supply the new kafka cluster and elastic search details in the topic.  After consuming the data can our flink job add the new source and sink to the same job.


Thanks & Regards,
Dinesh.
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic source and sink.

dinesh
Hi Danny,

Thanks for the response.

In short without restarting we cannot add new sinks or sources.

For better understanding I will explain my problem more clearly.

My scenario is I have two topics, one is configuration topic and second one is event activities. 

* In the configuration topic I will get details of the kafka cluster details and elasticsearch cluster details.
* In the event activities i will get events and each event will have a tenantId
* Suppose now we are getting a new tenantId data I need to send data to the respective elastic search cluster which I will come to know in the runtime from the configuration topic.
* Is there a way to add a new elastic search sink in the same job without restarting.

Before starting a job I can create two elastic search sinks and with a condition I can route the data to the respective elastic search cluster. Is there a way to do it in runtime?


Thanks and Regards,
Doinesh.


On Wed, Jul 1, 2020 at 5:24 PM Danny Chan <[hidden email]> wrote:
Sorry, a job graph is solid while we compile it before submitting to the cluster, not dynamic as what you want.

You did can write some wrapper operators which response to your own PRCs to run the appended operators you want,
But the you should keep the consistency semantics by yourself.

Best,
Danny Chan
在 2020年6月28日 +0800 PM3:30,C DINESH <[hidden email]>,写道:
Hi All,

In a flink job I have a pipeline. It is consuming data from one kafka topic and storing data to Elastic search cluster.

without restarting the job can we add another kafka cluster and another elastic search sink to the job. Which means i will supply the new kafka cluster and elastic search details in the topic.  After consuming the data can our flink job add the new source and sink to the same job.


Thanks & Regards,
Dinesh.
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic source and sink.

hzyuemeng1
Hi All
I also had a 
scenario  which need dynamic and dynamic sink to route streaming data to different kafka
Is any way better to do it in runtime
On 7/2/2020 11:23[hidden email] wrote:
Hi Danny,

Thanks for the response.

In short without restarting we cannot add new sinks or sources.

For better understanding I will explain my problem more clearly.

My scenario is I have two topics, one is configuration topic and second one is event activities. 

* In the configuration topic I will get details of the kafka cluster details and elasticsearch cluster details.
* In the event activities i will get events and each event will have a tenantId
* Suppose now we are getting a new tenantId data I need to send data to the respective elastic search cluster which I will come to know in the runtime from the configuration topic.
* Is there a way to add a new elastic search sink in the same job without restarting.

Before starting a job I can create two elastic search sinks and with a condition I can route the data to the respective elastic search cluster. Is there a way to do it in runtime?


Thanks and Regards,
Doinesh.


On Wed, Jul 1, 2020 at 5:24 PM Danny Chan <[hidden email]> wrote:
Sorry, a job graph is solid while we compile it before submitting to the cluster, not dynamic as what you want.

You did can write some wrapper operators which response to your own PRCs to run the appended operators you want,
But the you should keep the consistency semantics by yourself.

Best,
Danny Chan
在 2020年6月28日 +0800 PM3:30,C DINESH <[hidden email]>,写道:
Hi All,

In a flink job I have a pipeline. It is consuming data from one kafka topic and storing data to Elastic search cluster.

without restarting the job can we add another kafka cluster and another elastic search sink to the job. Which means i will supply the new kafka cluster and elastic search details in the topic.  After consuming the data can our flink job add the new source and sink to the same job.


Thanks & Regards,
Dinesh.
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic source and sink.

Paul Lam
In reply to this post by dinesh
Hi Doinesh,

I think the problem you meet is quite common. 

But with the current Flink architecture, operators must be determined at compile time (when you submit your job). This is by design IIUC.
Suppose the operators are changeable, then Flink would need to go through the compile-optimize-schedule phases once a new operator
is added. That would be little difference with restarting a job.

I see two alternative solutions, FYI:

1. Implement a custom sink function as Danny suggested. The sink function dynamically creates a new client for the respective ES cluster
 on receiving a new tenant configuration.
2. Still restart the job, and optimize the downtime by using session mode.

Best,
Paul Lam

2020年7月2日 11:23,C DINESH <[hidden email]> 写道:

Hi Danny,

Thanks for the response.

In short without restarting we cannot add new sinks or sources.

For better understanding I will explain my problem more clearly.

My scenario is I have two topics, one is configuration topic and second one is event activities. 

* In the configuration topic I will get details of the kafka cluster details and elasticsearch cluster details.
* In the event activities i will get events and each event will have a tenantId
* Suppose now we are getting a new tenantId data I need to send data to the respective elastic search cluster which I will come to know in the runtime from the configuration topic.
* Is there a way to add a new elastic search sink in the same job without restarting.

Before starting a job I can create two elastic search sinks and with a condition I can route the data to the respective elastic search cluster. Is there a way to do it in runtime?


Thanks and Regards,
Doinesh.


On Wed, Jul 1, 2020 at 5:24 PM Danny Chan <[hidden email]> wrote:
Sorry, a job graph is solid while we compile it before submitting to the cluster, not dynamic as what you want.

You did can write some wrapper operators which response to your own PRCs to run the appended operators you want,
But the you should keep the consistency semantics by yourself.

Best,
Danny Chan
在 2020年6月28日 +0800 PM3:30,C DINESH <[hidden email]>,写道:
Hi All,

In a flink job I have a pipeline. It is consuming data from one kafka topic and storing data to Elastic search cluster.

without restarting the job can we add another kafka cluster and another elastic search sink to the job. Which means i will supply the new kafka cluster and elastic search details in the topic.  After consuming the data can our flink job add the new source and sink to the same job.


Thanks & Regards,
Dinesh.

Reply | Threaded
Open this post in threaded view
|

Re: Dynamic source and sink.

dinesh
Hi paul,

Thanks for the response.

Can you point out an example of how to create a dynamic client or wrapper operator.


Thanks and Regards,
Dinesh.




On Thu, Jul 2, 2020 at 12:28 PM Paul Lam <[hidden email]> wrote:
Hi Doinesh,

I think the problem you meet is quite common. 

But with the current Flink architecture, operators must be determined at compile time (when you submit your job). This is by design IIUC.
Suppose the operators are changeable, then Flink would need to go through the compile-optimize-schedule phases once a new operator
is added. That would be little difference with restarting a job.

I see two alternative solutions, FYI:

1. Implement a custom sink function as Danny suggested. The sink function dynamically creates a new client for the respective ES cluster
 on receiving a new tenant configuration.
2. Still restart the job, and optimize the downtime by using session mode.

Best,
Paul Lam

2020年7月2日 11:23,C DINESH <[hidden email]> 写道:

Hi Danny,

Thanks for the response.

In short without restarting we cannot add new sinks or sources.

For better understanding I will explain my problem more clearly.

My scenario is I have two topics, one is configuration topic and second one is event activities. 

* In the configuration topic I will get details of the kafka cluster details and elasticsearch cluster details.
* In the event activities i will get events and each event will have a tenantId
* Suppose now we are getting a new tenantId data I need to send data to the respective elastic search cluster which I will come to know in the runtime from the configuration topic.
* Is there a way to add a new elastic search sink in the same job without restarting.

Before starting a job I can create two elastic search sinks and with a condition I can route the data to the respective elastic search cluster. Is there a way to do it in runtime?


Thanks and Regards,
Doinesh.


On Wed, Jul 1, 2020 at 5:24 PM Danny Chan <[hidden email]> wrote:
Sorry, a job graph is solid while we compile it before submitting to the cluster, not dynamic as what you want.

You did can write some wrapper operators which response to your own PRCs to run the appended operators you want,
But the you should keep the consistency semantics by yourself.

Best,
Danny Chan
在 2020年6月28日 +0800 PM3:30,C DINESH <[hidden email]>,写道:
Hi All,

In a flink job I have a pipeline. It is consuming data from one kafka topic and storing data to Elastic search cluster.

without restarting the job can we add another kafka cluster and another elastic search sink to the job. Which means i will supply the new kafka cluster and elastic search details in the topic.  After consuming the data can our flink job add the new source and sink to the same job.


Thanks & Regards,
Dinesh.