Multiple select queries in single job on flink table API

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Multiple select queries in single job on flink table API

Tejas
If I want to run two different select queries on a flink table created from
the dataStream, the blink-planner runs them as two different jobs. Is there
a way to combine them and run as a single job ? Example code :

/StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);

System.out.println("Running credit scores : ");

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

DataStream<String> recordsStream =
        env.readTextFile("src/main/resources/credit_trial.csv");

DataStream<CreditRecord> creditStream = recordsStream
        .filter((FilterFunction<String>) line -> !line.contains(
                "Loan ID,Customer ID,Loan Status,Current Loan
Amount,Term,Credit Score,Annual Income,Years in current job" +
                        ",Home Ownership,Purpose,Monthly Debt,Years of
Credit History,Months since last delinquent,Number of Open Accounts," +
                        "Number of Credit Problems,Current Credit
Balance,Maximum Open Credit,Bankruptcies,Tax Liens"))
        .map(new MapFunction<String, CreditRecord>() {

            @Override
            public CreditRecord map(String s) throws Exception {

                String[] fields = s.split(",");

                return new CreditRecord(fields[0], fields[2],
Double.parseDouble(fields[3]),
                        fields[4], fields[5].trim().equals("")?0.0:
Double.parseDouble(fields[5]),
                       
fields[6].trim().equals("")?0.0:Double.parseDouble(fields[6]),
                        fields[8], Double.parseDouble(fields[15]));
            }
        });
tableEnv.createTemporaryView("CreditDetails", creditStream);
        Table creditDetailsTable = tableEnv.from("CreditDetails");

Table resultsTable = creditDetailsTable.select($("*"))
        .filter($("loanStatus").isEqual("Charged Off"));

TableResult result = resultsTable.execute();

result.print();


Table resultsTable2 = creditDetailsTable.select($("*"))
        .filter($("loanStatus").isEqual("Fully Paid"));

TableResult result2 = resultsTable2.execute();

result2.print();/

The above code creates 2 different jobs, but I don't want that, I want it to
run in a single job. Is there any way out ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Multiple select queries in single job on flink table API

Yuval Itzchakov
Yes. Instead of calling execute on each table, create a StatementSet using your StreamTableEnvironment (tableEnv.createStatementSet) and use addInsert and finally .execute when you want to run the job.




On Sat, Apr 17, 2021, 03:20 tbud <[hidden email]> wrote:
If I want to run two different select queries on a flink table created from
the dataStream, the blink-planner runs them as two different jobs. Is there
a way to combine them and run as a single job ? Example code :

/StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);

System.out.println("Running credit scores : ");

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

DataStream<String> recordsStream =
        env.readTextFile("src/main/resources/credit_trial.csv");

DataStream<CreditRecord> creditStream = recordsStream
        .filter((FilterFunction<String>) line -> !line.contains(
                "Loan ID,Customer ID,Loan Status,Current Loan
Amount,Term,Credit Score,Annual Income,Years in current job" +
                        ",Home Ownership,Purpose,Monthly Debt,Years of
Credit History,Months since last delinquent,Number of Open Accounts," +
                        "Number of Credit Problems,Current Credit
Balance,Maximum Open Credit,Bankruptcies,Tax Liens"))
        .map(new MapFunction<String, CreditRecord>() {

            @Override
            public CreditRecord map(String s) throws Exception {

                String[] fields = s.split(",");

                return new CreditRecord(fields[0], fields[2],
Double.parseDouble(fields[3]),
                        fields[4], fields[5].trim().equals("")?0.0:
Double.parseDouble(fields[5]),

fields[6].trim().equals("")?0.0:Double.parseDouble(fields[6]),
                        fields[8], Double.parseDouble(fields[15]));
            }
        });
tableEnv.createTemporaryView("CreditDetails", creditStream);
        Table creditDetailsTable = tableEnv.from("CreditDetails");

Table resultsTable = creditDetailsTable.select($("*"))
        .filter($("loanStatus").isEqual("Charged Off"));

TableResult result = resultsTable.execute();

result.print();


Table resultsTable2 = creditDetailsTable.select($("*"))
        .filter($("loanStatus").isEqual("Fully Paid"));

TableResult result2 = resultsTable2.execute();

result2.print();/

The above code creates 2 different jobs, but I don't want that, I want it to
run in a single job. Is there any way out ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Multiple select queries in single job on flink table API

Tejas
I have tried that too For example :

/tableEnv.createTemporaryView("CreditDetails", creditStream);
tableEnv.executeSql(
"CREATE TABLE output(loanId VARCHAR) with ('connector.type' = 'filesystem',"
    + "'connector.path' = 'file:///path/Downloads/1',"
    + "'format.type' = 'csv')");

Table creditDetailsTable =
tableEnv.sqlQuery("SELECT id FROM CreditDetails where loanStatus = 'Charged
Off'");
Table creditDetailsTable2 =
tableEnv.sqlQuery("SELECT id FROM CreditDetails where creditScore < 700");

StatementSet stmtSet = tableEnv.createStatementSet();
stmtSet.addInsert("output", creditDetailsTable);
stmtSet.addInsert("output", creditDetailsTable2);
stmtSet.execute();

Table outputTable1 = tableEnv.from("output");
Table resultsTable1 = outputTable1.select($("*"));
TableResult result1 = resultsTable1.execute();
result1.print();/

But this prints result as 0 rows. What am I doing wrong ?
Also the execution graph shows two different jobs getting created : collect,
and insert. Can you help understand what's going on ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Multiple select queries in single job on flink table API

Arvid Heise-4
Hi tbud,

you still have two executes; it should only be one. Can you try the following instead of using outputTable1?

TableResult result1 = stmtSet.execute();
result1.print();

On Sun, Apr 18, 2021 at 12:05 AM tbud <[hidden email]> wrote:
I have tried that too For example :

/tableEnv.createTemporaryView("CreditDetails", creditStream);
tableEnv.executeSql(
"CREATE TABLE output(loanId VARCHAR) with ('connector.type' = 'filesystem',"
    + "'connector.path' = 'file:///path/Downloads/1',"
    + "'format.type' = 'csv')");

Table creditDetailsTable =
tableEnv.sqlQuery("SELECT id FROM CreditDetails where loanStatus = 'Charged
Off'");
Table creditDetailsTable2 =
tableEnv.sqlQuery("SELECT id FROM CreditDetails where creditScore < 700");

StatementSet stmtSet = tableEnv.createStatementSet();
stmtSet.addInsert("output", creditDetailsTable);
stmtSet.addInsert("output", creditDetailsTable2);
stmtSet.execute();

Table outputTable1 = tableEnv.from("output");
Table resultsTable1 = outputTable1.select($("*"));
TableResult result1 = resultsTable1.execute();
result1.print();/

But this prints result as 0 rows. What am I doing wrong ?
Also the execution graph shows two different jobs getting created : collect,
and insert. Can you help understand what's going on ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Multiple select queries in single job on flink table API

Tejas
/"TableResult result1 = stmtSet.execute();
result1.print();"/

I tried this, and the result is following :
Job has been submitted with JobID 4803aa5edc31b3ddc884f922008c5c03
+--------------------------------------------+--------------------------------------------+
| default_catalog.default_database.output1_1 |
default_catalog.default_database.output1_2 |
+--------------------------------------------+--------------------------------------------+
|                                         -1 |                                        
-1 |
+--------------------------------------------+--------------------------------------------+
1 row in set

Output folder is created but with empty files.

When I run my queries individually it gives me 68 and 107 rows respectively.
The problem starts happening when I add two statements to a StatementSet.
Has anybody faced this issue ? what was the solution ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/