If I want to run two different select queries on a flink table created from
the dataStream, the blink-planner runs them as two different jobs. Is there a way to combine them and run as a single job ? Example code : /StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); System.out.println("Running credit scores : "); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStream<String> recordsStream = env.readTextFile("src/main/resources/credit_trial.csv"); DataStream<CreditRecord> creditStream = recordsStream .filter((FilterFunction<String>) line -> !line.contains( "Loan ID,Customer ID,Loan Status,Current Loan Amount,Term,Credit Score,Annual Income,Years in current job" + ",Home Ownership,Purpose,Monthly Debt,Years of Credit History,Months since last delinquent,Number of Open Accounts," + "Number of Credit Problems,Current Credit Balance,Maximum Open Credit,Bankruptcies,Tax Liens")) .map(new MapFunction<String, CreditRecord>() { @Override public CreditRecord map(String s) throws Exception { String[] fields = s.split(","); return new CreditRecord(fields[0], fields[2], Double.parseDouble(fields[3]), fields[4], fields[5].trim().equals("")?0.0: Double.parseDouble(fields[5]), fields[6].trim().equals("")?0.0:Double.parseDouble(fields[6]), fields[8], Double.parseDouble(fields[15])); } }); tableEnv.createTemporaryView("CreditDetails", creditStream); Table creditDetailsTable = tableEnv.from("CreditDetails"); Table resultsTable = creditDetailsTable.select($("*")) .filter($("loanStatus").isEqual("Charged Off")); TableResult result = resultsTable.execute(); result.print(); Table resultsTable2 = creditDetailsTable.select($("*")) .filter($("loanStatus").isEqual("Fully Paid")); TableResult result2 = resultsTable2.execute(); result2.print();/ The above code creates 2 different jobs, but I don't want that, I want it to run in a single job. Is there any way out ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Yes. Instead of calling execute on each table, create a StatementSet using your StreamTableEnvironment (tableEnv.createStatementSet) and use addInsert and finally .execute when you want to run the job. On Sat, Apr 17, 2021, 03:20 tbud <[hidden email]> wrote: If I want to run two different select queries on a flink table created from |
I have tried that too For example :
/tableEnv.createTemporaryView("CreditDetails", creditStream); tableEnv.executeSql( "CREATE TABLE output(loanId VARCHAR) with ('connector.type' = 'filesystem'," + "'connector.path' = 'file:///path/Downloads/1'," + "'format.type' = 'csv')"); Table creditDetailsTable = tableEnv.sqlQuery("SELECT id FROM CreditDetails where loanStatus = 'Charged Off'"); Table creditDetailsTable2 = tableEnv.sqlQuery("SELECT id FROM CreditDetails where creditScore < 700"); StatementSet stmtSet = tableEnv.createStatementSet(); stmtSet.addInsert("output", creditDetailsTable); stmtSet.addInsert("output", creditDetailsTable2); stmtSet.execute(); Table outputTable1 = tableEnv.from("output"); Table resultsTable1 = outputTable1.select($("*")); TableResult result1 = resultsTable1.execute(); result1.print();/ But this prints result as 0 rows. What am I doing wrong ? Also the execution graph shows two different jobs getting created : collect, and insert. Can you help understand what's going on ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi tbud, you still have two executes; it should only be one. Can you try the following instead of using outputTable1? TableResult result1 = stmtSet.execute(); result1.print(); On Sun, Apr 18, 2021 at 12:05 AM tbud <[hidden email]> wrote: I have tried that too For example : |
/"TableResult result1 = stmtSet.execute();
result1.print();"/ I tried this, and the result is following : Job has been submitted with JobID 4803aa5edc31b3ddc884f922008c5c03 +--------------------------------------------+--------------------------------------------+ | default_catalog.default_database.output1_1 | default_catalog.default_database.output1_2 | +--------------------------------------------+--------------------------------------------+ | -1 | -1 | +--------------------------------------------+--------------------------------------------+ 1 row in set Output folder is created but with empty files. When I run my queries individually it gives me 68 and 107 rows respectively. The problem starts happening when I add two statements to a StatementSet. Has anybody faced this issue ? what was the solution ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |