Multi-stream SQL-like processing

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Multi-stream SQL-like processing

Krzysztof Zarzycki
Hi community, I would like to confront one idea with you. 

I was thinking that Flink SQL could be a Flink's answer for Kafka Connect (more powerful, with advantages like being decoupled from Kafka). Flink SQL would be the configuration language for Flink "connectors", sounds great!. 
But one thing does not allow me to implement this idea: There is no possibility to run SQL-based processing over multiple similar inputs and produce multiple similar outputs (counted in tens or hundreds). 
As a problem example that I need to solve, consider that I have a hundred of Kafka topics, with similar data in each. And I would like to sink them to a SQL database. With Kafka connect, I can use a single connector with JDBC sink, that properly configured will dump each topic to a separate table properly keeping the schema (based on what is in the schema registry). 
With Flink SQL I would need to run a query per topic/table, I believe.
Similarly with sourcing data. There is this cool project flink-cdc-connectors [1] that leverages Debezium in Flink to apply CDC on SQL database, but when used with SQL, it can only pull in one table per query.  
These cases can be solved using the datastream API. With it I can code pulling in/pushing out multiple table streams. But then "the configuration" is a much bigger effort, because it requires using java code. And that is a few hours vs few days case, an enormous difference. 

So in the end some questions: 
* Do you know how SQL could be extended to support handling such cases elegantly, with a single job in the end? 
* Or do you believe SQL should not be used for that case and we should come up with a different tool and configuration language? I.e. sth like Kafka Connect
* Do you know of any other project that implements this idea?

I definitely believe that this is a great use case for Flink to be an easy-to-use ingress from/egress to Kafka/HDFS/whatever system, therefore there is a need for a solution for my case.

Thanks for answering! 
Krzysztof 

Reply | Threaded
Open this post in threaded view
|

Re: Multi-stream SQL-like processing

Jark Wu-3
Hi Krzysztof,

This is a very interesting idea. 

I think SQL is not a suitable tool for this use case, because SQL is a structured query language
 where the table schema is fixed and never changes during job running. 

However, I think it can be a configuration tool project on top of Flink SQL. 
The configuration tool can dynamically generate all the queries according to the config
 and submit them in one job.

For example, if the configuration says "synchronize from mysql address 'xxxx' to kafka broker 'yyyy'",
then the generated Flink SQL would like:

CREATE TABLE db (
  `database_name` STRING,
  `table_name` STRING,
  `data` BYTES  // encodes all the columns value, can be a better structure for performance
) WITH (
  connector = ...   // a new connector scan all tables from the mysql address
  url = 'jdbc:mysql://localhost:3306/flink-test'
);

// the configuration tool will generate multiple INSERT INTO according to how many tables in the DB
INSERT INTO kafka_table1
SELECT parse_data(table_name, data)   // the parse_data UDF will infer schema from database 
FROM db WHERE table = 'table1'            // or schema registry and deserialize the data into columns with different types.

INSERT INTO kafka_table2
SELECT parse_data(table_name, data)
FROM db WHERE table = 'table2'

...

The configuration tool can use `StatementSet` to package all the INSERT INTO queries together and submit them in one job. 
With the `StatementSet`, the job will share the common source task, so the tables in MySQL are only read once. 

Best,
Jark










On Tue, 3 Nov 2020 at 14:00, Krzysztof Zarzycki <[hidden email]> wrote:
Hi community, I would like to confront one idea with you. 

I was thinking that Flink SQL could be a Flink's answer for Kafka Connect (more powerful, with advantages like being decoupled from Kafka). Flink SQL would be the configuration language for Flink "connectors", sounds great!. 
But one thing does not allow me to implement this idea: There is no possibility to run SQL-based processing over multiple similar inputs and produce multiple similar outputs (counted in tens or hundreds). 
As a problem example that I need to solve, consider that I have a hundred of Kafka topics, with similar data in each. And I would like to sink them to a SQL database. With Kafka connect, I can use a single connector with JDBC sink, that properly configured will dump each topic to a separate table properly keeping the schema (based on what is in the schema registry). 
With Flink SQL I would need to run a query per topic/table, I believe.
Similarly with sourcing data. There is this cool project flink-cdc-connectors [1] that leverages Debezium in Flink to apply CDC on SQL database, but when used with SQL, it can only pull in one table per query.  
These cases can be solved using the datastream API. With it I can code pulling in/pushing out multiple table streams. But then "the configuration" is a much bigger effort, because it requires using java code. And that is a few hours vs few days case, an enormous difference. 

So in the end some questions: 
* Do you know how SQL could be extended to support handling such cases elegantly, with a single job in the end? 
* Or do you believe SQL should not be used for that case and we should come up with a different tool and configuration language? I.e. sth like Kafka Connect
* Do you know of any other project that implements this idea?

I definitely believe that this is a great use case for Flink to be an easy-to-use ingress from/egress to Kafka/HDFS/whatever system, therefore there is a need for a solution for my case.

Thanks for answering! 
Krzysztof 

Reply | Threaded
Open this post in threaded view
|

Re: Multi-stream SQL-like processing

Krzysztof Zarzycki
Hi Jark, thanks for joining the discussion!
I understand your point of view that SQL environment is probably not the best for what I was looking to achieve.
The idea of a configuration tool sounds almost perfect :) Almost , because:
Without the "StatementSet" that you mentioned at the end I would be worried about resource consumption (job & task manager objects, buffers, connections) of having one topology per table. That would be a significant loss against architecture of Kafka Connect kind. 
With StatementSet I understand this is not a case, but there is another issue: We lose the dynamism. That is, the job won't be able to discover new tables. We would need to always restart the whole (reconfigured) StatementSet job. (Anyway, this approach sounds good enough to try it out in my current assignment.)
The other issue I see is that I still need to define the DSL for the configuration(sth like config of KConnect). SQL will not be it, it will probably be barely a way to implement the tool. 

I would appreciate your comments, Jark. 
Also if anyone would like to add other ideas, feel welcome!   

Best,
Krzysztof

śr., 4 lis 2020 o 09:37 Jark Wu <[hidden email]> napisał(a):
Hi Krzysztof,

This is a very interesting idea. 

I think SQL is not a suitable tool for this use case, because SQL is a structured query language
 where the table schema is fixed and never changes during job running. 

However, I think it can be a configuration tool project on top of Flink SQL. 
The configuration tool can dynamically generate all the queries according to the config
 and submit them in one job.

For example, if the configuration says "synchronize from mysql address 'xxxx' to kafka broker 'yyyy'",
then the generated Flink SQL would like:

CREATE TABLE db (
  `database_name` STRING,
  `table_name` STRING,
  `data` BYTES  // encodes all the columns value, can be a better structure for performance
) WITH (
  connector = ...   // a new connector scan all tables from the mysql address
  url = 'jdbc:mysql://localhost:3306/flink-test'
);

// the configuration tool will generate multiple INSERT INTO according to how many tables in the DB
INSERT INTO kafka_table1
SELECT parse_data(table_name, data)   // the parse_data UDF will infer schema from database 
FROM db WHERE table = 'table1'            // or schema registry and deserialize the data into columns with different types.

INSERT INTO kafka_table2
SELECT parse_data(table_name, data)
FROM db WHERE table = 'table2'

...

The configuration tool can use `StatementSet` to package all the INSERT INTO queries together and submit them in one job. 
With the `StatementSet`, the job will share the common source task, so the tables in MySQL are only read once. 

Best,
Jark










On Tue, 3 Nov 2020 at 14:00, Krzysztof Zarzycki <[hidden email]> wrote:
Hi community, I would like to confront one idea with you. 

I was thinking that Flink SQL could be a Flink's answer for Kafka Connect (more powerful, with advantages like being decoupled from Kafka). Flink SQL would be the configuration language for Flink "connectors", sounds great!. 
But one thing does not allow me to implement this idea: There is no possibility to run SQL-based processing over multiple similar inputs and produce multiple similar outputs (counted in tens or hundreds). 
As a problem example that I need to solve, consider that I have a hundred of Kafka topics, with similar data in each. And I would like to sink them to a SQL database. With Kafka connect, I can use a single connector with JDBC sink, that properly configured will dump each topic to a separate table properly keeping the schema (based on what is in the schema registry). 
With Flink SQL I would need to run a query per topic/table, I believe.
Similarly with sourcing data. There is this cool project flink-cdc-connectors [1] that leverages Debezium in Flink to apply CDC on SQL database, but when used with SQL, it can only pull in one table per query.  
These cases can be solved using the datastream API. With it I can code pulling in/pushing out multiple table streams. But then "the configuration" is a much bigger effort, because it requires using java code. And that is a few hours vs few days case, an enormous difference. 

So in the end some questions: 
* Do you know how SQL could be extended to support handling such cases elegantly, with a single job in the end? 
* Or do you believe SQL should not be used for that case and we should come up with a different tool and configuration language? I.e. sth like Kafka Connect
* Do you know of any other project that implements this idea?

I definitely believe that this is a great use case for Flink to be an easy-to-use ingress from/egress to Kafka/HDFS/whatever system, therefore there is a need for a solution for my case.

Thanks for answering! 
Krzysztof 

Reply | Threaded
Open this post in threaded view
|

Re: Multi-stream SQL-like processing

Jark Wu-3
Yes. The dynamism might be a problem. 
Does Kafka Connect support discovering new tables and synchronizing them dynamically?

Best,
Jark

On Thu, 5 Nov 2020 at 04:39, Krzysztof Zarzycki <[hidden email]> wrote:
Hi Jark, thanks for joining the discussion!
I understand your point of view that SQL environment is probably not the best for what I was looking to achieve.
The idea of a configuration tool sounds almost perfect :) Almost , because:
Without the "StatementSet" that you mentioned at the end I would be worried about resource consumption (job & task manager objects, buffers, connections) of having one topology per table. That would be a significant loss against architecture of Kafka Connect kind. 
With StatementSet I understand this is not a case, but there is another issue: We lose the dynamism. That is, the job won't be able to discover new tables. We would need to always restart the whole (reconfigured) StatementSet job. (Anyway, this approach sounds good enough to try it out in my current assignment.)
The other issue I see is that I still need to define the DSL for the configuration(sth like config of KConnect). SQL will not be it, it will probably be barely a way to implement the tool. 

I would appreciate your comments, Jark. 
Also if anyone would like to add other ideas, feel welcome!   

Best,
Krzysztof

śr., 4 lis 2020 o 09:37 Jark Wu <[hidden email]> napisał(a):
Hi Krzysztof,

This is a very interesting idea. 

I think SQL is not a suitable tool for this use case, because SQL is a structured query language
 where the table schema is fixed and never changes during job running. 

However, I think it can be a configuration tool project on top of Flink SQL. 
The configuration tool can dynamically generate all the queries according to the config
 and submit them in one job.

For example, if the configuration says "synchronize from mysql address 'xxxx' to kafka broker 'yyyy'",
then the generated Flink SQL would like:

CREATE TABLE db (
  `database_name` STRING,
  `table_name` STRING,
  `data` BYTES  // encodes all the columns value, can be a better structure for performance
) WITH (
  connector = ...   // a new connector scan all tables from the mysql address
  url = 'jdbc:mysql://localhost:3306/flink-test'
);

// the configuration tool will generate multiple INSERT INTO according to how many tables in the DB
INSERT INTO kafka_table1
SELECT parse_data(table_name, data)   // the parse_data UDF will infer schema from database 
FROM db WHERE table = 'table1'            // or schema registry and deserialize the data into columns with different types.

INSERT INTO kafka_table2
SELECT parse_data(table_name, data)
FROM db WHERE table = 'table2'

...

The configuration tool can use `StatementSet` to package all the INSERT INTO queries together and submit them in one job. 
With the `StatementSet`, the job will share the common source task, so the tables in MySQL are only read once. 

Best,
Jark










On Tue, 3 Nov 2020 at 14:00, Krzysztof Zarzycki <[hidden email]> wrote:
Hi community, I would like to confront one idea with you. 

I was thinking that Flink SQL could be a Flink's answer for Kafka Connect (more powerful, with advantages like being decoupled from Kafka). Flink SQL would be the configuration language for Flink "connectors", sounds great!. 
But one thing does not allow me to implement this idea: There is no possibility to run SQL-based processing over multiple similar inputs and produce multiple similar outputs (counted in tens or hundreds). 
As a problem example that I need to solve, consider that I have a hundred of Kafka topics, with similar data in each. And I would like to sink them to a SQL database. With Kafka connect, I can use a single connector with JDBC sink, that properly configured will dump each topic to a separate table properly keeping the schema (based on what is in the schema registry). 
With Flink SQL I would need to run a query per topic/table, I believe.
Similarly with sourcing data. There is this cool project flink-cdc-connectors [1] that leverages Debezium in Flink to apply CDC on SQL database, but when used with SQL, it can only pull in one table per query.  
These cases can be solved using the datastream API. With it I can code pulling in/pushing out multiple table streams. But then "the configuration" is a much bigger effort, because it requires using java code. And that is a few hours vs few days case, an enormous difference. 

So in the end some questions: 
* Do you know how SQL could be extended to support handling such cases elegantly, with a single job in the end? 
* Or do you believe SQL should not be used for that case and we should come up with a different tool and configuration language? I.e. sth like Kafka Connect
* Do you know of any other project that implements this idea?

I definitely believe that this is a great use case for Flink to be an easy-to-use ingress from/egress to Kafka/HDFS/whatever system, therefore there is a need for a solution for my case.

Thanks for answering! 
Krzysztof 

Reply | Threaded
Open this post in threaded view
|

Re: Multi-stream SQL-like processing

Krzysztof Zarzycki
Yes,
kafka connect supports topics.regex option for Sink connectors. The connector automatically discovers new topics which fit the regex pattern. 
It's similar with source connectors, which discover tables in a SQL database and save them to Kafka topics. 


czw., 5 lis 2020 o 04:16 Jark Wu <[hidden email]> napisał(a):
Yes. The dynamism might be a problem. 
Does Kafka Connect support discovering new tables and synchronizing them dynamically?

Best,
Jark

On Thu, 5 Nov 2020 at 04:39, Krzysztof Zarzycki <[hidden email]> wrote:
Hi Jark, thanks for joining the discussion!
I understand your point of view that SQL environment is probably not the best for what I was looking to achieve.
The idea of a configuration tool sounds almost perfect :) Almost , because:
Without the "StatementSet" that you mentioned at the end I would be worried about resource consumption (job & task manager objects, buffers, connections) of having one topology per table. That would be a significant loss against architecture of Kafka Connect kind. 
With StatementSet I understand this is not a case, but there is another issue: We lose the dynamism. That is, the job won't be able to discover new tables. We would need to always restart the whole (reconfigured) StatementSet job. (Anyway, this approach sounds good enough to try it out in my current assignment.)
The other issue I see is that I still need to define the DSL for the configuration(sth like config of KConnect). SQL will not be it, it will probably be barely a way to implement the tool. 

I would appreciate your comments, Jark. 
Also if anyone would like to add other ideas, feel welcome!   

Best,
Krzysztof

śr., 4 lis 2020 o 09:37 Jark Wu <[hidden email]> napisał(a):
Hi Krzysztof,

This is a very interesting idea. 

I think SQL is not a suitable tool for this use case, because SQL is a structured query language
 where the table schema is fixed and never changes during job running. 

However, I think it can be a configuration tool project on top of Flink SQL. 
The configuration tool can dynamically generate all the queries according to the config
 and submit them in one job.

For example, if the configuration says "synchronize from mysql address 'xxxx' to kafka broker 'yyyy'",
then the generated Flink SQL would like:

CREATE TABLE db (
  `database_name` STRING,
  `table_name` STRING,
  `data` BYTES  // encodes all the columns value, can be a better structure for performance
) WITH (
  connector = ...   // a new connector scan all tables from the mysql address
  url = 'jdbc:mysql://localhost:3306/flink-test'
);

// the configuration tool will generate multiple INSERT INTO according to how many tables in the DB
INSERT INTO kafka_table1
SELECT parse_data(table_name, data)   // the parse_data UDF will infer schema from database 
FROM db WHERE table = 'table1'            // or schema registry and deserialize the data into columns with different types.

INSERT INTO kafka_table2
SELECT parse_data(table_name, data)
FROM db WHERE table = 'table2'

...

The configuration tool can use `StatementSet` to package all the INSERT INTO queries together and submit them in one job. 
With the `StatementSet`, the job will share the common source task, so the tables in MySQL are only read once. 

Best,
Jark










On Tue, 3 Nov 2020 at 14:00, Krzysztof Zarzycki <[hidden email]> wrote:
Hi community, I would like to confront one idea with you. 

I was thinking that Flink SQL could be a Flink's answer for Kafka Connect (more powerful, with advantages like being decoupled from Kafka). Flink SQL would be the configuration language for Flink "connectors", sounds great!. 
But one thing does not allow me to implement this idea: There is no possibility to run SQL-based processing over multiple similar inputs and produce multiple similar outputs (counted in tens or hundreds). 
As a problem example that I need to solve, consider that I have a hundred of Kafka topics, with similar data in each. And I would like to sink them to a SQL database. With Kafka connect, I can use a single connector with JDBC sink, that properly configured will dump each topic to a separate table properly keeping the schema (based on what is in the schema registry). 
With Flink SQL I would need to run a query per topic/table, I believe.
Similarly with sourcing data. There is this cool project flink-cdc-connectors [1] that leverages Debezium in Flink to apply CDC on SQL database, but when used with SQL, it can only pull in one table per query.  
These cases can be solved using the datastream API. With it I can code pulling in/pushing out multiple table streams. But then "the configuration" is a much bigger effort, because it requires using java code. And that is a few hours vs few days case, an enormous difference. 

So in the end some questions: 
* Do you know how SQL could be extended to support handling such cases elegantly, with a single job in the end? 
* Or do you believe SQL should not be used for that case and we should come up with a different tool and configuration language? I.e. sth like Kafka Connect
* Do you know of any other project that implements this idea?

I definitely believe that this is a great use case for Flink to be an easy-to-use ingress from/egress to Kafka/HDFS/whatever system, therefore there is a need for a solution for my case.

Thanks for answering! 
Krzysztof