I am sorry to task this twice. I reworded my question though, and I never got an answer. I am trying to learn how to use the SQL api, but mix-in the Streaming API where there is too much complex business logic. AND 200000 records entered window X, and performs its business logic. How can I assure that Window Y will process exactly all the records that left window A within the same window? I am trying to do this: 200000 records streamed in --> SQL TUMBLING WINDOW --> aggregated table (the is results of table with 200000 records in it ) --> PROCESS FUNCTION (aggregated table) |
Alright, maybe my example needs to be more concrete. How about this: In this example, I don't want to create to windows just to re-combine what was just aggregated in SQL. Is there a way to transform the aggregate results into one datastream object so that I don't have to aggregate again? // aggregate this stream for 15 minutes |
Hi Marco,
sorry for the late reply. Have you looked into user-defined aggregate functions for SQL? I think your requirements can be easily implemented there. You can declare multiple aggregate functions per window. There is also the built-in function LISTAGG that might help for your use case. But Flink SQL aggregate functions support arbirary data types (e.g. arrays as result type). Regarding `do I need to wait another 15 minutes to aggregate this`: This is another example of why event time is important. Actually you would like to process the data quicker than wall-clock time. If your example would work in event-time, the watermark would be emitted after the window 1 has been processed and this watermark would also trigger the second window immediately without the need to another 15 min in processing time. I hope this helps. Regards, Timo On 12.12.20 01:38, Marco Villalobos wrote: > Alright, maybe my example needs to be more concrete. How about this: > In this example, I don't want to create to windows just to re-combine > what was just aggregated in SQL. Is there a way to transform the > aggregate results into one datastream object so that I don't have to > aggregate again? > > > // aggregate this stream for 15 minutes > final Table employeeDailyPurchasesTable =tableEnv.sqlQuery("SELECT\n" + > " t.organization_id, t.department_id, s.date, s.employee_id, t.fullName, > t.dob, SUM(s.purchase) AS purchases\n" + > "FROM\n" + > " employee_purchases s\n" + > "LEFT JOIN\n" + > " employees FOR SYSTEM_TIME AS OF s.procTime AS t ON t.organization = > s.organization AND t.department = s.department AND t.employee_id = > s.employee_id\n" + > "GROUP BY\n" + > " TUMBLE(s.procTime, INTERVAL '15' MINUTE), t.organization_id, > t.department_id, s.date, s.employee_id, t.fullName, t.dob"); > > // now I want everything that was just aggregated processed together, > // below gives me each row again in a stream > final DataStream<Row> employeeDailyPurchasesDataStream =tableEnv.toAppendStream(employeeDailyPurchasesTable, Row.class); > > // so, do I need to wait another 15 minutes to aggregate this? It was > just aggregated for 15 minutes above! > // how do I get the previous aggregated results into one object so that > I don't have to wait and aggregate it again > final DataStream<DailyEmployeePurchases> aggregatedAgainBecauseINeedHelp =employeeDailyPurchasesDataStream > .keyBy(0, 1, 2) > .window(TumblingProcessingTimeWindows.of(Time.minutes(15))) > .aggregate(new AggregateFunction<Row, DailyEmployeePurchases, DailyEmployeePurchases>() { > > @Override > public DailyEmployeePurchases createAccumulator() { > return new DailyEmployeePurchases(); > } > > @Override > public DailyEmployeePurchases add(Row value, DailyEmployeePurchases accumulator) { > return accumulator.add(value); > } > > @Override > public DailyEmployeePurchases getResult(DailyEmployeePurchases accumulator) { > return accumulator; > } > > @Override > public DailyEmployeePurchases merge(DailyEmployeePurchases a, DailyEmployeePurchases b) { > return a.merge(b); > } > }); > > // important business logic that needs to be applied to the group of > employees > aggregatedAgainBecauseINeedHelp.keyBy("organizationId", "departmentId") > .process(new KeyedProcessFunction<Tuple, DailyEmployeePurchases, DailyEmployeePurchases>() { > > @Override > public void processElement(DailyEmployeePurchases value, Context ctx, Collector<DailyEmployeePurchases> out)throws Exception { > // very important stuff here > } > }); > > > |
Free forum by Nabble | Edit this page |