Re: Running and Maintaining Multiple Jobs

Posted by Prasanna kumar on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Running-and-Maintaining-Multiple-Jobs-tp35565p35566.html

Hi,

I also looked at this link. This says my approach is not good. Wanted to hear more on the same from the community. 

https://stackoverflow.com/questions/52009948/multiple-jobs-or-multiple-pipelines-in-one-job-in-flink  

Prasanna.

On Thu, May 28, 2020 at 11:22 PM Prasanna kumar <[hidden email]> wrote:
Hi,

I have a list of jobs that need to be run via flink. 
For PoC we are implementing via JSON configuration file. 
Sample JSON file 
{
  "registryJobs": [
    { "inputTopic": "ProfileTable1",  "outputTopic": "Channel" },
    { "inputTopic": "Salestable", "outputTopic": "SalesChannel" },
    { "inputTopic": "billingsource", "outputTopic": "billing" },
    { "inputTopic": "costs", "outputTopic": "costschannel" },
    { "inputTopic": "leadsTable",  "outputTopic": "leadsChannel" },
  ]
}
But in Long run we do want to have this detail in a RDBMS. 
There are many other properties for Job such as transformation,  filter , rules which would be captured in DB via UI.

Flink Supports Single Execution Environment. I ended up writing JobGenerator Module which reads from this JSON and creates Jobs. 

public static void Generate Jobs(Registry job, StreamExecutionEnvironment env) {

Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVER);
props.put("client.id", "flink-example1");

FlinkKafkaConsumer011 fkC = new FlinkKafkaConsumer011<>(job.getInputTopic(),new SimpleStringSchema(), props);

DataStream<String> stream = env.addSource(fkC).name("Kafka: " + job.getInputTopic());

stream.map(
SOMEMAPCODE );

stream.addSink(new FlinkKafkaProducer011<>(job.getOutputTopic(), new SimpleStringSchema(), props)).name("Kafka: " + job.getOutputTopic());
}

This created 5 tasks in a single Job and it is seen this way. 

Screen Shot 2020-05-28 at 11.15.32 PM.png

Questions 

1) Is this a good way to design ? We might end up having 500 - 1000 such tasks in say 1 year down the lane. Or there is another way possible ?

2) We cannot afford downtime in our system. Say 5 tasks are pushed to production. Say we need to add / update tasks later should we restart the cluster with the new job and JAR ? 

3) Now we have the job registry in files. Is it possible to read from the DB directly and create the Jobs (DAG) dynamically without restarting it ?

Thanks,
Prasanna.