Hello Everybody, I’m currently refactoring some code and am looking for a better alternative to handle JPMML-Models in data streams. At the moment the flink job I’m working on references a model-object as a Singleton which I want to change because static references tend to cause problems in distributed systems. I thought about handing the model-object to the function that uses it as a variable closure. The object can be between 16MB and 250MB in size (depending on the depth of the decision tree). According to <a href="https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables">
https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables that’s way too large though. Are there any viable alternatives or would this be the „right way“ to handle this situation?
Best Regards, Julian
|
How about using a source and broadcast variable? You could write the model to the storage (DFS), the read it with a source and use a broadcast variable to send it to all tasks. A single record can be very large, so it should work even if your model is quite big. Does that sound feasible? In future versions of flink, you may be able to skip the "write to DFS" step and simply have the model in a collection source (when large RPC messages are supported). Best, Stephan On Fri, Sep 2, 2016 at 11:20 AM, Bauss, Julian <[hidden email]> wrote:
|
Hi Stephan, thanks for your reply! It seems as if I can only use broadcast variables on DataSet-Operators (using
myFunc.withBroadcastSet(…)) Is that right?
I am working on a DataStream, though. Do streams offer similiar functionality? Best Regards, Julian Von: Stephan Ewen [mailto:[hidden email]]
How about using a source and broadcast variable? You could write the model to the storage (DFS), the read it with a source and use a broadcast variable to send it to all tasks. A single record can be very large, so it should work even if your model is quite big. Does that sound feasible? In future versions of flink, you may be able to skip the "write to DFS" step and simply have the model in a collection source (when large RPC messages are supported). Best, Stephan On Fri, Sep 2, 2016 at 11:20 AM, Bauss, Julian <[hidden email]> wrote:
|
I think you could make use of this small component I've developed: https://gitlab.com/chobeat/Flink-JPMML It's specifically for using JPMML on Flink. Maybe there's too much stuff for what you need but you could reuse the code of the Operator to do what you need. 2016-09-05 14:11 GMT+02:00 Bauss, Julian <[hidden email]>:
|
Hi Simone, that sounds promising! Unfortunately your link leads to a 404 page. Best Regards, Julian Von: Simone Robutti [mailto:[hidden email]]
I think you could make use of this small component I've developed: https://gitlab.com/chobeat/Flink-JPMML It's specifically for using JPMML on Flink. Maybe there's too much stuff for what you need but you could reuse the code of the Operator to do what you need. 2016-09-05 14:11 GMT+02:00 Bauss, Julian <[hidden email]>:
|
Yes, sorry but it's private and I just discovered we don't want to release it as public yet. This piece of code could help you though: https://gist.github.com/chobeat/f07221357a2e3f9efa377e4cb0479f92 Ignore all the stuff about the strategies. The important part is the `open` method and the transient var. This is used to load the PMML file and instance all the JPMML stuff when you instance the Flink operator. The variable `pmmlSource` is a string but you can replace that part with a load from HDFS or other FS if you want every node to load the .jpmml file in parallel and be in control of that part. 2016-09-05 15:24 GMT+02:00 Bauss, Julian <[hidden email]>:
|
Hi Simone, thank you for your feedback! The code snippet you provided works fine. The only drawback is that the Evaluator gets initialized once per Function-Instance. That could lead to high memory consumption depending on the level of parallelism and the size of the PMML-Model (which can get quite big). The „obvious“ optimization would be to initialize and hide the Evaluator behind a singleton since it is thread safe. (Which is what I wanted to avoid in the first place. But maybe that is the best solution at the moment?) Best Regards, Julian Von: Simone Robutti [mailto:[hidden email]]
Yes, sorry but it's private and I just discovered we don't want to release it as public yet. This piece of code could help you though: https://gist.github.com/chobeat/f07221357a2e3f9efa377e4cb0479f92 Ignore all the stuff about the strategies. The important part is the `open` method and the transient var. This is used to load the PMML file and instance all the JPMML stuff when you instance the Flink operator. The variable `pmmlSource`
is a string but you can replace that part with a load from HDFS or other FS if you want every node to load the .jpmml file in parallel and be in control of that part. 2016-09-05 15:24 GMT+02:00 Bauss, Julian <[hidden email]>:
|
>The only drawback is that the Evaluator gets initialized once per Function-Instance. I believe that reducing the number of functions instances is something that should be handled by Flink's runtime and that's why I've implemented the solution this way. In our tests the number of instances was minimal but this is still extremely experimental so take it with a grain of salt. I believe that this is highly dependent on the expected size of the PMML models though. 2016-09-05 16:33 GMT+02:00 Bauss, Julian <[hidden email]>:
|
Free forum by Nabble | Edit this page |