FLink Streaming - Parallelize ArrayList

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

FLink Streaming - Parallelize ArrayList

defstat
Hi all,

Is there a way to either Broadcast or Parallelize  an ArrayList of custom objects?

The application is as follows:

I have 2 streams taken from two sockets. The first stream contains vectors that should populate one list of vectors by deciding whether an arriving vector is part of the list, after calculations made on the existing list vectors and the new one.

The other stream is also containing vectors that use the above list, and sorts it using the info contained in the incoming vectors.

I tried to solve this using  

private static final ArrayList<Vector> aSetOfVectors = new ArrayList<Vector>();

vectorsFirstStream.fold(aSetOfVectors , new FoldFunction<Vector, ArrayList<Vector>>() {
 .... (Calculations and new aSetOfVectors produced) ......

return aSetOfVectors;
}

vectorsSecondStream.fold(aSetOfVectors , new FoldFunction<Vector, ArrayList<Vector>>() {
 .... (Calculations and sorted vector list produced) ......
}

I think that that approach could work only locally. Is there a way to parallelize the above?

Thank you in advance
Reply | Threaded
Open this post in threaded view
|

Re: FLink Streaming - Parallelize ArrayList

Aljoscha Krettek
Hi,
I think for what you are trying a Connected FlatMap would be best suited. Using this, you can have an operation that has two input streams:

input1 = env.socketStream(...)
input2 = env.socketStream(...)

result = input1.connect(input2)
    .flatMap(new CoFlatMapFunction<IN1, IN2, OUT> {
       void flatMap1(...) {...}
       void flatMap2(...) {...}
    })

here, the flatMap1 will get elements from the first input stream while flatMap2 will get elements from the second input stream. You can now internally keep a list of elements from the first input. You have to be careful, however, to not let the list get too large. Otherwise, you might blow your JVM with an OutOfMemory exception.

Cheers,
Aljoscha

On Tue, 29 Sep 2015 at 04:38 defstat <[hidden email]> wrote:
Hi all,

Is there a way to either Broadcast or Parallelize  an ArrayList of custom
objects?

The application is as follows:

I have 2 streams taken from two sockets. The first stream contains vectors
that should populate one list of vectors by deciding whether an arriving
vector is part of the list, after calculations made on the existing list
vectors and the new one.

The other stream is also containing vectors that use the above list, and
sorts it using the info contained in the incoming vectors.

I tried to solve this using

private static final ArrayList<Vector> aSetOfVectors = new
ArrayList<Vector>();

vectorsFirstStream.fold(aSetOfVectors , new FoldFunction<Vector,
ArrayList&lt;Vector>>() {
 .... (Calculations and new aSetOfVectors produced) ......

return aSetOfVectors;
}

vectorsSecondStream.fold(aSetOfVectors , new FoldFunction<Vector,
ArrayList&lt;Vector>>() {
 .... (Calculations and sorted vector list produced) ......
}

I think that that approach could work only locally. Is there a way to
parallelize the above?

Thank you in advance




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FLink-Streaming-Parallelize-ArrayList-tp2957.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: FLink Streaming - Parallelize ArrayList

defstat
Will that keep a global list for all execution environment?
Reply | Threaded
Open this post in threaded view
|

Re: FLink Streaming - Parallelize ArrayList

Aljoscha Krettek
No, each operator would have its own local list.

In a distributed environment it is very tricky to keep global state across all instances of operations (Flink does not support anything in this direction). If you really need it then the only way is to set the parallelism of the operator to 1. This way you ensure that all tuples pass through only one operator instance. 

On Tue, 29 Sep 2015 at 13:08 defstat <[hidden email]> wrote:
Will that keep a global list for all execution environment?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FLink-Streaming-Parallelize-ArrayList-tp2957p2959.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: FLink Streaming - Parallelize ArrayList

defstat
Is there any way to "broadcast" the internal list so that all processing nodes "Know" the list, and use it either inside the connect function or in a fold operation?