Flink Iterator Functions

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Iterator Functions

Roderick Vincent
Hi,

I am brand new to Apache Flink so please excuse any silly questions.  I have an Iterator function defined as below and adding it as a source to a Flink stream.  But when I try to pass configuration information to it (via a Spring env), what I notice is that one of the threads calls hasNext() and it is not the same object and the passed information is null.  Something is constructing it, but what is strange is that if I add a default constructor I do not see this being called by this thread with the null data so I am wondering what is going on.  Any ideas?  How do we pass configuration information to these functions?  Any help would be appreciated.

Thanks,
Rick

@Public
public class NodeSource extends FromIteratorFunction<LinkedList<BaseDocument>> {


private static final long serialVersionUID = 1L;

public NodeSource(ArangoDBSource iterator) {
super(iterator);
}

}
Reply | Threaded
Open this post in threaded view
|

Re: Flink Iterator Functions

rmetzger0
Hi Roderick,

Luckily there are no silly questions, just silly answers (so I have the harder job here ;) )

It seems that you are trying to read data from an Arango Database, right?
What is important to understand is that the "flink job" that you are implementing in your main() method gets executed in the JVM submitting your job to the Flink cluster. There, we are just constructing a graph of dataflow operations, which will then be distributed to the cluster.
As part of this process, we are serializing all the user code from the client, and sending it over to the cluster where it gets executed in a distributed fashion.
I assume you are seeing "null" references because the objects you are trying to send to the cluster are not serializable (but stored in a transient field); or Spring is doing some Dependency Injection magic that does not work in the remote Flink environment.

tl;dr: What I would recommend is implement a custom SourceFunction that reads from ArangoDB. The RichParallelSourceFunction will allow you to read with parallelism > 1, and it has some lifecycle methods for opening and closing the connection to Arango.
For the configuration passing, I would pass it a _serializable_ object through the constructor of your custom source.

Best,
Robert



On Thu, May 28, 2020 at 6:40 PM Roderick Vincent <[hidden email]> wrote:
Hi,

I am brand new to Apache Flink so please excuse any silly questions.  I have an Iterator function defined as below and adding it as a source to a Flink stream.  But when I try to pass configuration information to it (via a Spring env), what I notice is that one of the threads calls hasNext() and it is not the same object and the passed information is null.  Something is constructing it, but what is strange is that if I add a default constructor I do not see this being called by this thread with the null data so I am wondering what is going on.  Any ideas?  How do we pass configuration information to these functions?  Any help would be appreciated.

Thanks,
Rick

@Public
public class NodeSource extends FromIteratorFunction<LinkedList<BaseDocument>> {


private static final long serialVersionUID = 1L;

public NodeSource(ArangoDBSource iterator) {
super(iterator);
}

}
Reply | Threaded
Open this post in threaded view
|

Re: Flink Iterator Functions

Arvid Heise-3
Hi Roderick,

adding to Robert's response: The easiest way is to get all needed information injected only in the driver from which you manually pass the config in a serializable form to your iterator. Configs could be for example a Java Map using serializable elements, such as Strings.

If you need non-serializable objects, it is common practice to initialize them in Rich(ParallelSource)Function#open and pass all information needed to construct them in the constructor of the RichFunction and store them in serializable fields.


On Fri, May 29, 2020 at 1:26 PM Robert Metzger <[hidden email]> wrote:
Hi Roderick,

Luckily there are no silly questions, just silly answers (so I have the harder job here ;) )

It seems that you are trying to read data from an Arango Database, right?
What is important to understand is that the "flink job" that you are implementing in your main() method gets executed in the JVM submitting your job to the Flink cluster. There, we are just constructing a graph of dataflow operations, which will then be distributed to the cluster.
As part of this process, we are serializing all the user code from the client, and sending it over to the cluster where it gets executed in a distributed fashion.
I assume you are seeing "null" references because the objects you are trying to send to the cluster are not serializable (but stored in a transient field); or Spring is doing some Dependency Injection magic that does not work in the remote Flink environment.

tl;dr: What I would recommend is implement a custom SourceFunction that reads from ArangoDB. The RichParallelSourceFunction will allow you to read with parallelism > 1, and it has some lifecycle methods for opening and closing the connection to Arango.
For the configuration passing, I would pass it a _serializable_ object through the constructor of your custom source.

Best,
Robert



On Thu, May 28, 2020 at 6:40 PM Roderick Vincent <[hidden email]> wrote:
Hi,

I am brand new to Apache Flink so please excuse any silly questions.  I have an Iterator function defined as below and adding it as a source to a Flink stream.  But when I try to pass configuration information to it (via a Spring env), what I notice is that one of the threads calls hasNext() and it is not the same object and the passed information is null.  Something is constructing it, but what is strange is that if I add a default constructor I do not see this being called by this thread with the null data so I am wondering what is going on.  Any ideas?  How do we pass configuration information to these functions?  Any help would be appreciated.

Thanks,
Rick

@Public
public class NodeSource extends FromIteratorFunction<LinkedList<BaseDocument>> {


private static final long serialVersionUID = 1L;

public NodeSource(ArangoDBSource iterator) {
super(iterator);
}

}


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Flink Iterator Functions

Roderick Vincent
Thank you both for your answers and yes, that does explain what's going on.  I will have to refactor this code.

Thanks again for your help!
Rick

On Fri, May 29, 2020 at 2:29 PM Arvid Heise <[hidden email]> wrote:
Hi Roderick,

adding to Robert's response: The easiest way is to get all needed information injected only in the driver from which you manually pass the config in a serializable form to your iterator. Configs could be for example a Java Map using serializable elements, such as Strings.

If you need non-serializable objects, it is common practice to initialize them in Rich(ParallelSource)Function#open and pass all information needed to construct them in the constructor of the RichFunction and store them in serializable fields.


On Fri, May 29, 2020 at 1:26 PM Robert Metzger <[hidden email]> wrote:
Hi Roderick,

Luckily there are no silly questions, just silly answers (so I have the harder job here ;) )

It seems that you are trying to read data from an Arango Database, right?
What is important to understand is that the "flink job" that you are implementing in your main() method gets executed in the JVM submitting your job to the Flink cluster. There, we are just constructing a graph of dataflow operations, which will then be distributed to the cluster.
As part of this process, we are serializing all the user code from the client, and sending it over to the cluster where it gets executed in a distributed fashion.
I assume you are seeing "null" references because the objects you are trying to send to the cluster are not serializable (but stored in a transient field); or Spring is doing some Dependency Injection magic that does not work in the remote Flink environment.

tl;dr: What I would recommend is implement a custom SourceFunction that reads from ArangoDB. The RichParallelSourceFunction will allow you to read with parallelism > 1, and it has some lifecycle methods for opening and closing the connection to Arango.
For the configuration passing, I would pass it a _serializable_ object through the constructor of your custom source.

Best,
Robert



On Thu, May 28, 2020 at 6:40 PM Roderick Vincent <[hidden email]> wrote:
Hi,

I am brand new to Apache Flink so please excuse any silly questions.  I have an Iterator function defined as below and adding it as a source to a Flink stream.  But when I try to pass configuration information to it (via a Spring env), what I notice is that one of the threads calls hasNext() and it is not the same object and the passed information is null.  Something is constructing it, but what is strange is that if I add a default constructor I do not see this being called by this thread with the null data so I am wondering what is going on.  Any ideas?  How do we pass configuration information to these functions?  Any help would be appreciated.

Thanks,
Rick

@Public
public class NodeSource extends FromIteratorFunction<LinkedList<BaseDocument>> {


private static final long serialVersionUID = 1L;

public NodeSource(ArangoDBSource iterator) {
super(iterator);
}

}


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng