Hello,
I'm trying to create a custom operator to explore the internals of Flink. Actually the one I'm working on is rather similar to Union and I'm trying to mimick it for now. When I run my job though, this error arise: Exception in thread "main" java.lang.IllegalArgumentException: Unknown operator type: MyOperator - My Operator at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:237) at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:82) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:279) at org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:223) at org.apache.flink.api.common.Plan.accept(Plan.java:348) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:454) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398) at org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:213) at org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:107) at io.radicalbit.flinkh2o.Job$.main(Job.scala:50) at io.radicalbit.flinkh2o.Job.main(Job.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) I looked at the location of the error but it's not clear to me how to make my operator recognizable from the optimizer. Thank, Simone |
Hi Simone, the GraphCreatingVisitor transforms the common operator plan into a representation that is translated by the optimizer. You have to implement an OptimizerNode and OperatorDescriptor to describe the operator. Depending on the semantics of the operator, there are a few more places to make the integration working like driver strategies, cost model, etc. I would recommend to have a look at previous changes that added an operator such as PartitionOperator, SortPartitionOperator, OuterJoin, etc. The respective commits should give you an idea which parts of the code need to be touched. You should find the commit IDs in the JIRA issues for these extensions. Cheers, Fabian 2016-04-29 15:32 GMT+02:00 Simone Robutti <[hidden email]>:
|
Hello Fabian, we delved more moving from the input you gave us but a question arised. We always assumed that runtime operators were open for extension without modifying anything inside Flink but it looks like this is not the case and the documentation assumes that the developer is working to a contribution to Flink. So I would like to know if our understandment is correct and custom runtime operators are not supposed to be implemented outside of Flink. Thanks, Simone 2016-04-29 21:32 GMT+02:00 Fabian Hueske <[hidden email]>:
|
Hi Simone, you are right, the interfaces you extend are not considered to be public, user-facing API. Adding custom operators to the DataSet API touches many parts of the system and is not straightforward. Maybe the functionality can be achieved with the existing operators. 2016-05-03 12:54 GMT+02:00 Simone Robutti <[hidden email]>:
|
I'm not sure this is the right way to do it but we were exploring all the possibilities and this one is the more obvious. We also spent some time to study how to do it to achieve a better understanding of Flink's internals. What we want to do though is to integrate Flink with another distributed system that builds its own nodes and coordinates through the network with its own logic. This software is H2O (a Machine Learning platform) and the integration consists of two big tasks: the first is to instantiate a H2O's node in every task manager and handle the lifecycle of the node according to the taskmanager and the execution graph. The second is to allow the developer to code everything inside Flink, converting from and to H2O's data structures (distributed tabular data) and triggering the execution of algorithms on H2O with a uniform API. Here's a simple example (assuming that we will use the TableAPI): val env = ExecutionEnvironment.getExecutionEnvironment val h2oEnv = H2OEnviroment.getEnvironment(env) val myData: Table = ... val someOtherData: Table = ... val myH2OFrame = myData.select(...).toH2OFrame(h2oEnv) val linearRegressionModel = h2oEnv.linearRegression(myH2OFrame) val predictions:Table=linearRegressionModel(someOtherData) predictions.select(...) A good solution should allow the system to keep the H2O's nodes alive through multiple tasks and the possibility to move the data locally from Flink to H2O. The latter is not achieved in H2O's integration with Spark but we still hope to do it. That said, I'm still not sure if it is really required to implement a custom runtime operator but given the complexity of the integration of two distribute systems, we assumed that more control would allow more flexibility and possibilities to achieve an ideal solution. 2016-05-03 13:29 GMT+02:00 Fabian Hueske <[hidden email]>:
|
Hi Simone, sorry for the delayed answer. I have a few questions regarding your requirements and a some ideas that might be helpful (depending on the requirements).- "keep the H2O's nodes alive through multiple tasks" The first option (starting for each job) would allow to share the H2O node for all tasks of a job. This could be done using two MapPartition operators, the first Mapper is put in front of the first task requiring H2O starting an H2O service before the first record is forwarded and the second task is put after the last H2O task and shuts it down after the last element was forwarded. The mappers itself do nothing than forwarding elements and starting and stopping tasks. If you would like to share H2O nodes across jobs, we might need another hook to start the process. - "move the data locally from Flink to H2O", do you mean host local or JVM local? I think it should not be hard to keep the data host local. 2) "Allow the developer to code everything inside Flink". - The Table API which you are referring to in your example is built on top of the DataSet and DataStream APIs. I think it should be possible to add another API similar to the Table API. You should be aware that the Table API is currently quite actively developed and should not be considered to be a stable interface. So certain things might change in the next versions. With 1.0 we stabilized the DataSet API and I would rather put a new API on top of it than on the Table API. - Regarding the transformation in H2O structures and calling H2O operations, I think this might again be done in MapPartition operators. In general, MapPartition gives you a lot of freedom because it provides an iterator over all elements of a partition. So you can do things before the first and after the last element and group data as you like. You can use partitionByHash() or rebalace() to shuffle data and sortPartition to locally sort the data in a partition. Please note that MapPartition operators do not support chaining and come therefore with a certain serialization overhead. Whenever possible you should use a MapFunction or FlatMapFunction which are a bit more lightweight. 2016-05-03 15:13 GMT+02:00 Simone Robutti <[hidden email]>:
|
>- You wrote you'd like to "instantiate a H2O's node in every task manager". This reads a bit like you want to start H2O in the TM's JVM , but I would assume that a H2O node runs as a separate process. So should it be started inside the TM JVM or as an external process next to each TM. Also, do you want to start one H2O node per TM slot or per TM? My idea is to run it in the same process but there may be several good reasons not to do it, it's just the way I think of it right now. I'm thinking about replicating the structure of Sparkling Water and for my understanding, they run their H2O nodes in the same process. >- You wrote you'd like to "handle the lifecycle of the node according to the taskmanager and the execution graph". A TM can execute multiple jobs each with its own execution graph. Do you want to start the H2O node for each job and shut it down when the job finishes or start the H2O when the TM is started and kill it when the TM is brought down? There are different trade-offs for both choices. I assume that there's nothing inside H2O that should be shared between different jobs for most use cases so it should follow the job's lifecycle. In the previous mail this was ambigous, my bad. >- "move the data locally from Flink to H2O", do you mean host local or JVM local? I think it should not be hard to keep the data host local. JVM local. This is clearly not an issue flink-side but may be an issue on H2O's side. It's one of the many issues we will tackle as soon as we will talk with them (I hope soon). >- The Table API which you are referring to in your example is built on top of the DataSet and DataStream APIs. I think it should be possible to add another API similar to the Table API. You should be aware that the Table API is currently quite actively developed and should not be considered to be a stable interface. So certain things might change in the next versions. With 1.0 we stabilized the DataSet API and I would rather put a new API on top of it than on the Table API. We know but we will work mostly with the data abstractions of the Table API and not the operations. We take the risk to rework it if they change in the meantime. Your reply really helped: many questions helped us clear our mind on a few points. H2O's team showed interest in working on this integration or at least support us in the development. We are waiting for them to start a discussion and as soon as we will have a more clear idea on how to proceed, we will validate it with the stuff you just said. Your confidence in Flink's operators gives up hope to achieve a clean solution. Thanks a lot of your time, Simone 2016-05-09 12:24 GMT+02:00 Fabian Hueske <[hidden email]>:
|
2016-05-09 14:56 GMT+02:00 Simone Robutti <[hidden email]>:
That should be possible by starting a thread from a MapPartition operator. To make it one H2O node per TM, you would need a synchronized singleton to avoid that each parallel task starts a new thread.
The approach with two MapPartition operators at the beginning and end of the H2O section might work then.
Sure :-) Cheers, Fabian
|
Free forum by Nabble | Edit this page |