Hi,
I am brand new to Apache Flink so please excuse any silly questions. I have an Iterator function defined as below and adding it as a source to a Flink stream. But when I try to pass configuration information to it (via a Spring env), what I notice is that one of the threads calls hasNext() and it is not the same object and the passed information is null. Something is constructing it, but what is strange is that if I add a default constructor I do not see this being called by this thread with the null data so I am wondering what is going on. Any ideas? How do we pass configuration information to these functions? Any help would be appreciated. Thanks, Rick @Public public class NodeSource extends FromIteratorFunction<LinkedList<BaseDocument>> { private static final long serialVersionUID = 1L; public NodeSource(ArangoDBSource iterator) { super(iterator); } } |
Hi Roderick, Luckily there are no silly questions, just silly answers (so I have the harder job here ;) ) It seems that you are trying to read data from an Arango Database, right? What is important to understand is that the "flink job" that you are implementing in your main() method gets executed in the JVM submitting your job to the Flink cluster. There, we are just constructing a graph of dataflow operations, which will then be distributed to the cluster. As part of this process, we are serializing all the user code from the client, and sending it over to the cluster where it gets executed in a distributed fashion. Further reading: https://ci.apache.org/projects/flink/flink-docs-master/concepts/flink-architecture.html I assume you are seeing "null" references because the objects you are trying to send to the cluster are not serializable (but stored in a transient field); or Spring is doing some Dependency Injection magic that does not work in the remote Flink environment. tl;dr: What I would recommend is implement a custom SourceFunction that reads from ArangoDB. The RichParallelSourceFunction will allow you to read with parallelism > 1, and it has some lifecycle methods for opening and closing the connection to Arango. For the configuration passing, I would pass it a _serializable_ object through the constructor of your custom source. Best, Robert On Thu, May 28, 2020 at 6:40 PM Roderick Vincent <[hidden email]> wrote:
|
Hi Roderick, adding to Robert's response: The easiest way is to get all needed information injected only in the driver from which you manually pass the config in a serializable form to your iterator. Configs could be for example a Java Map using serializable elements, such as Strings. If you need non-serializable objects, it is common practice to initialize them in Rich(ParallelSource)Function#open and pass all information needed to construct them in the constructor of the RichFunction and store them in serializable fields. On Fri, May 29, 2020 at 1:26 PM Robert Metzger <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Thank you both for your answers and yes, that does explain what's going on. I will have to refactor this code. Thanks again for your help! Rick On Fri, May 29, 2020 at 2:29 PM Arvid Heise <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |