Hi,
I have a list of jobs that need to be run via flink.
For PoC we are implementing via JSON configuration file.
Sample JSON file
{
"registryJobs": [
{ "inputTopic": "ProfileTable1", "outputTopic": "Channel" },
{ "inputTopic": "Salestable", "outputTopic": "SalesChannel" },
{ "inputTopic": "billingsource", "outputTopic": "billing" },
{ "inputTopic": "costs", "outputTopic": "costschannel" },
{ "inputTopic": "leadsTable", "outputTopic": "leadsChannel" },
]
}
But in Long run we do want to have this detail in a RDBMS.
There are many other properties for Job such as transformation, filter , rules which would be captured in DB via UI.
Flink Supports Single Execution Environment. I ended up writing JobGenerator Module which reads from this JSON and creates Jobs.
public static void Generate Jobs(Registry job, StreamExecutionEnvironment env) {
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVER);
props.put("client.id", "flink-example1");
FlinkKafkaConsumer011 fkC = new FlinkKafkaConsumer011<>(job.getInputTopic(),new SimpleStringSchema(), props);
DataStream<String> stream = env.addSource(fkC).name("Kafka: " + job.getInputTopic());
stream.map( SOMEMAPCODE );
stream.addSink(new FlinkKafkaProducer011<>(job.getOutputTopic(), new SimpleStringSchema(), props)).name("Kafka: " + job.getOutputTopic());
}
This created 5 tasks in a single Job and it is seen this way.
Questions
1) Is this a good way to design ? We might end up having 500 - 1000 such tasks in say 1 year down the lane. Or there is another way possible ?
2) We cannot afford downtime in our system. Say 5 tasks are pushed to production. Say we need to add / update tasks later should we restart the cluster with the new job and JAR ?
3) Now we have the job registry in files. Is it possible to read from the DB directly and create the Jobs (DAG) dynamically without restarting it ?
Thanks,
Prasanna.