To a naive Flink newcomer (me) it's a little surprising that there is no pure "data" mechanism for specifying a Flink pipeline, only "code" interfaces. With the DataStream interface I can use Java, Scala or Python to set up a pipeline and then execute it - but that doesn't really seem to need a programming model, it seems like configuration, which could be done with data? OK, one does need occasionally to specify some custom code, e.g. a ProcessFunction, but for any given use-case, a relatively static library of such functions would seem fine. My use case is that I have lots of customers, and I'm doing a similar job for each of them, so I'd prefer to have a library of common code (e.g. ProcessFunctions), and then specify each customer's specific requirements in a single config file. To do that in Java, I'd have to do metaprogramming (to build various pieces of Java out of that config file). Flink SQL seems to be the closest solution, but doesn't appear to support fundamental Flink concepts such as timers (?). Is there a plan to evolve Flink SQL to support timers? Timeouts is my specific need. Thanks, |
Hi Pilgrim, Currently table indeed could not using low level api like timer, would a mixture of sql & datastream could satisfy the requirements? A job might be created via multiple sqls, and connected via datastream operations. Best, Yun ------------------------------------------------------------------ |
Hi Pilgrim, it sounds to me as if you are planning to use Flink for batch processing by having some centralized server to where you submit your queries. While you can use Flink SQL for that (and it's used in this way in larger companies), the original idea of Flink was to be used in streaming applications. For these applications, the query is not just configuration; the query IS the application. For these applications, you'd usually spawn a few K8s pods and let your application+Flink run. Hence, the many code interfaces. Even if you use SQL on streaming, you'd usually start a new ad-hoc cluster for your application to have better isolation. There are quite a few deployments that use YARN or Mesos to provide a large number of nodes which are then used by a large number of Flink batch and streaming jobs, but I'd usually not recommend that for newer users. I'd even go as far as to say that most of these organizations wouldn't use that stack if they'd create a cluster now and instead would also go for a K8s solution. On Tue, Feb 9, 2021 at 1:05 PM Yun Gao <[hidden email]> wrote:
|
Hi Arvid, thanks for the response. I don't think I was being very clear. This is a streaming application. Each customer will have a Flink job running in its own private cluster. The rough shape of the analysis that the Flink job is doing for each customer is always the same, because we do just one thing - (IoT service management). So our DAG is always the same shape. But the specific requirements of each customer are slightly different. The exact shape of their input data, the exact metrics they need to gather. So for example, if they care about timeouts, then customer 1 might want 10 minute timeouts whenever message fields A or B change, but customer 2 might want 2 day timeouts whenever message fields C or D change. For maintainability it would make sense if we could encapsulate all the code which is common to all customers into one standard Flink library, and separate out the bits which are different somewhere else. Let's call that "somewhere else" a config file. The config file then becomes a complete specification of the transform. We can version it, and do CI/CD etc. easily - and any enhancement or bug-fixing on our core Flink library can be inherited by all customers. Part of the challenge is Java's strong typing - the messages flowing through Flink always have a similar shape, but their exact fields will be customer-specific. Java needs to know about this at compile time, which is fine. And simple numeric parameters, such as e.g. timeout intervals, can be pulled from a config file at runtime. But consider the above example: for one customer I want to set a timer when fields A & B in the message change. For another customer I want to set a timer when fields C & D change. Because the only way I can set a timer is with code, this means I need to write some code to do this. Which means that my configuration has to be code. Code is infinitely powerful, but infinitely complicated and dangerous. It's not really ideal to have to use code in a configuration. Ideally, I'd like my config file to say something like this to define timers: T1=timeout(A) T2=timeout(B) I could solve this with meta-programming: a) Either I could hold my messages in an abstract type (e.g. my messages could just be Maps), though that's pretty inefficient and seems like I'm bypassing a lot of the benefits of Java's type safety and speed. b) Or I could take my specification and turn it into a Java class before I compile my Flink job. But that seems a bit ugly? Hope I've at least made clear the goal! Thanks for any further thoughts, -Pilgrim -- Learn more at https://devicepilot.com @devicepilot +44 7961 125282 See our latest features and book me for a video call. On Thu, 11 Feb 2021 at 11:03, Arvid Heise <[hidden email]> wrote:
|
Hi Pilgrim, Thank you for clarifying. I solved a similar challenge in Spark (for batch) by doing the following (translated into Flink terms): - I created most of the application with the Table API - the programmatic interface to Flink SQL. Here it is quite easy to implement structural variance of similar queries. The fields and types can be dynamically set through configuration that is injected while composing the query. - Configuration was specified in some CSV (the users were Excel-fanatics) and specific to the domain. (I'd recommend HOCON else) - For low level constructs (e.g. timers), you can convert a Table to a DataStream and use ProcessFunctions and then convert back to Table. [1] With that approach the users were able to aggregate >100 different event types from ~10 types of sources (the source type needed to be added once to the application by me - mostly because the schema needed to be incorporated). The big advantage (and what I assume works in Flink as well) was that the optimizer smartly grouped the aggregations, such that the number of aggregation operators solely depended on the number of unique sources. I'm happy to provide more details Arvid On Thu, Feb 11, 2021 at 3:51 PM Pilgrim Beart <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |