Flink flatMap to pass a tuple and get multiple tuple

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink flatMap to pass a tuple and get multiple tuple

Soheil Pourbafrani
Hi, I want to use flatMap to pass to function namely 'parse' a tuple and it will return multiple tuple, that each should be a record in datastream object.

Something like this:

DataStream<Tuple2<Integer,Long>> res = stream.flatMap(new FlatMapFunction<tuple3<int, int, int>, Tuple2<Integer,Long>>() {

@Override
public void flatMap(Tuple3<int, int, int> t, Collector<Tuple2<Integer,Long>> collector) throws Exception {

collector.collect(parse(t.f_3));
}
})
;

that parse will return for example 6 tuples2 and I want them inserted into res datastream.
Reply | Threaded
Open this post in threaded view
|

Re: Flink flatMap to pass a tuple and get multiple tuple

Michael Latta
it would look more like:

for (Tuple2<> t2 : parse(t.f3) {
collector.collect(t2);
}

Michael

On Apr 27, 2018, at 9:08 AM, Soheil Pourbafrani <[hidden email]> wrote:

Hi, I want to use flatMap to pass to function namely 'parse' a tuple and it will return multiple tuple, that each should be a record in datastream object.

Something like this:

DataStream<Tuple2<Integer,Long>> res = stream.flatMap(new FlatMapFunction<tuple3<int, int, int>, Tuple2<Integer,Long>>() {

@Override
public void flatMap(Tuple3<int, int, int> t, Collector<Tuple2<Integer,Long>> collector) throws Exception {

collector.collect(parse(t.f_3));
}
})
;

that parse will return for example 6 tuples2 and I want them inserted into res datastream.

Reply | Threaded
Open this post in threaded view
|

Re: Flink flatMap to pass a tuple and get multiple tuple

Michael Latta
Any itterable of Tuples will work for a for loop: List, Set, etc.

Michael

On Apr 27, 2018, at 10:47 AM, Soheil Pourbafrani <[hidden email]> wrote:

Thanks, what did you consider the return type of parse method? Arraylist of tuples?

On Friday, April 27, 2018, TechnoMage <[hidden email]> wrote:

> it would look more like:
> for (Tuple2<> t2 : parse(t.f3) {
> collector.collect(t2);
> }
> Michael
>
> On Apr 27, 2018, at 9:08 AM, Soheil Pourbafrani <[hidden email]> wrote:
> Hi, I want to use flatMap to pass to function namely 'parse' a tuple and it will return multiple tuple, that each should be a record in datastream object.
> Something like this:
>
> DataStream<Tuple2<Integer,Long>> res = stream.flatMap(new FlatMapFunction<tuple3<int, int, int>, Tuple2<Integer,Long>>() {
>
> @Override
> public void flatMap(Tuple3<int, int, int> t, Collector<Tuple2<Integer,Long>> collector) throws Exception {
>
> collector.collect(parse(t.f_3));
> }
> });
>
> that parse will return for example 6 tuples2 and I want them inserted into res datastream.
>