Hi guys,
I would ask to you how could I create triggers in Flink. I would like to perform some operations on a dataset and according to some conditions, based on an attribute of a Pojo class or Tuple, execute some triggers. I mean, starting collecting other datasources' data and performing operations over them. An Example. I have a dataset of Pojo class Person. My trigger activation condition is (number of italian people > 100). If so, I collect another datasource and I execute operations over it. Do you think is that possible in Flink? Thanks, Giacomo |
You refer to the DataSet (batch) API, right? In that case you can evaluate your condition in the program and fetch a DataSet back to the client using List<X> myData = DataSet<X>.collect();Note: collect() will immediately trigger the execution of the program in its current state and bring the result back to the client. There is also a size limitation of results that can be fetched back. This is the Akka framesize which is 10MB by default but could be adapted. DataSet<Z> c; if(b.get(0).equals(...)) {} 2015-10-30 22:40 GMT+01:00 Giacomo Licari <[hidden email]>:
|
Hi Fabian, thanks a lot for your solution. Just another question, do you think is possible to execute operations on C dataset , inside filter or map operators (or any operator), when some conditions appear, instead of waiting for the entire A dataset processing? My purposes are: If, while processing A dataset some conditions appear, stop executing operations on A dataset and execute operations on C dataset. Some pseudocode from your solution: DataSet<X> A = env.readFile(...); DataSet<X> C = env.readFile(...); A.groupBy().reduce().filter(Check conditions here and in case start processing C); Thanks, Giacomo On Fri, Oct 30, 2015 at 11:02 PM, Fabian Hueske <[hidden email]> wrote:
|
Hi Giacomo, there is no direct support for use cases like yours. The main issue that it is not possible to modify the execution of a submitted program. Once it is running, it cannot be adapted. It is also not possible to inject a condition into the data flow logic, e.g., if this happens follow this flow branch, otherwise the other one.2015-10-31 11:53 GMT+01:00 Giacomo Licari <[hidden email]>:
|
You can also try and make the decision on the client. Imagine a program like this. long count = env.readFile(...).filter(...).count(); if (count > 5) { env.readFile(...).map().join(...).reduce(...); } else { env.readFile(...).filter().coGroup(...).map(...); } On Mon, Nov 2, 2015 at 1:35 AM, Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |