FlatMap returning Row<> based on ArrayList elements()

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

FlatMap returning Row<> based on ArrayList elements()

Andres Angel
Hello everyone,

I need to parse into an anonymous function an input data to turn it into several Row elements. Originally I would have done something like Row.of(1,2,3,4) but these elements can change on the flight as part of my function. This is why I have decided to store them in a list and right now it looks something like this:

image.png

Now, I need to return my out Collector it Row<> based on this elements. I checked on the Flink documentation but the Lambda functions are not supported : https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as Row.of(myTuple):

                    Tuple mytuple = Tuple.newInstance(5);
                    for (int i = 0; i < pelements.size(); i++) {
                        mytuple.setField(pelements.get(i), i);
                    }
                    out.collect(Row.of(mytuple));


However , it doesnt work because this is being parsed s 1 element for  sqlQuery step. how could I do something like:

pelements.forEach(n->out.collect(Row.of(n)));

Thanks so much
Reply | Threaded
Open this post in threaded view
|

Re:FlatMap returning Row<> based on ArrayList elements()

Haibo Sun
Hi Andres Angel,

I guess people don't understand your problem (including me). I don't know if the following sample code is what you want, if not, can you describe the problem more clearly?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(Arrays.asList(1, 2, 3, 4, 5))
.flatMap(new FlatMapFunction<List<Integer>, Row>() {
@Override
public void flatMap(List<Integer> value, Collector<Row> out) throws Exception {
out.collect(Row.of(value.toArray(new Integer[0])));
}
}).print();

env.execute("test job");

Best,
Haibo

At 2019-07-30 02:30:27, "Andres Angel" <[hidden email]> wrote:
Hello everyone,

I need to parse into an anonymous function an input data to turn it into several Row elements. Originally I would have done something like Row.of(1,2,3,4) but these elements can change on the flight as part of my function. This is why I have decided to store them in a list and right now it looks something like this:

image.png

Now, I need to return my out Collector it Row<> based on this elements. I checked on the Flink documentation but the Lambda functions are not supported : https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as Row.of(myTuple):

                    Tuple mytuple = Tuple.newInstance(5);
                    for (int i = 0; i < pelements.size(); i++) {
                        mytuple.setField(pelements.get(i), i);
                    }
                    out.collect(Row.of(mytuple));


However , it doesnt work because this is being parsed s 1 element for  sqlQuery step. how could I do something like:

pelements.forEach(n->out.collect(Row.of(n)));

Thanks so much
Reply | Threaded
Open this post in threaded view
|

Re: FlatMap returning Row<> based on ArrayList elements()

Andres Angel
Hello everyone, let me be more precis on what I'm looking for at the end because your example is right and very accurate in the way about how to turn an array into a Row() object.
I have done it seamlessly:

out.collect(Row.of(pelements.toArray()));

Then I printed and the outcome is as expected:

5d2df2c2e7370c7843dad9ca,359731,13333196,156789925,619381

Now I need to register this DS as a table and here is basically how I'm planning to do it:

tenv.registerDataStream("newtable", ds, 'col1,col2,col3,col4,col5');

However, this returns an error on the DS registration due to I need to specify the RowTypeInfo. Here is the big deal because yes I know I would be able to use something like :


TypeInformation<?>[] types = {
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO};

































DataStream ds = previousds.flatMap(new FlatMapFunction<List<Integer>, Row>() {
@Override
public void flatMap(List<Integer> value, Collector<Row> out) throws Exception {
out.collect(Row.of(value.toArray(new Integer[0])));
}
}).return(types);


The problem with this approach is that I'm looking for a standard FlatMap anonymous function that could return every time: 1. different number of elements within the Array and 2. the data type can be random likewise. I mean is not fixed the whole time then my TypeInformation return would fix every execution.

How could I approach this?

thanks so much
AU


On Wed, Aug 7, 2019 at 3:57 AM Haibo Sun <[hidden email]> wrote:
Hi Andres Angel,

I guess people don't understand your problem (including me). I don't know if the following sample code is what you want, if not, can you describe the problem more clearly?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(Arrays.asList(1, 2, 3, 4, 5))
.flatMap(new FlatMapFunction<List<Integer>, Row>() {
@Override
public void flatMap(List<Integer> value, Collector<Row> out) throws Exception {
out.collect(Row.of(value.toArray(new Integer[0])));
}
}).print();

env.execute("test job");

Best,
Haibo

At 2019-07-30 02:30:27, "Andres Angel" <[hidden email]> wrote:
Hello everyone,

I need to parse into an anonymous function an input data to turn it into several Row elements. Originally I would have done something like Row.of(1,2,3,4) but these elements can change on the flight as part of my function. This is why I have decided to store them in a list and right now it looks something like this:

image.png

Now, I need to return my out Collector it Row<> based on this elements. I checked on the Flink documentation but the Lambda functions are not supported : https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as Row.of(myTuple):

                    Tuple mytuple = Tuple.newInstance(5);
                    for (int i = 0; i < pelements.size(); i++) {
                        mytuple.setField(pelements.get(i), i);
                    }
                    out.collect(Row.of(mytuple));


However , it doesnt work because this is being parsed s 1 element for  sqlQuery step. how could I do something like:

pelements.forEach(n->out.collect(Row.of(n)));

Thanks so much
Reply | Threaded
Open this post in threaded view
|

Re: FlatMap returning Row<> based on ArrayList elements()

Victor Wong

Hi Andres,

 

I’d like to share my thoughts:

When you register a “Table”, you need to specify its “schema”, so how can you register the table when the number of elements/columns and data types are both nondeterministic.

Correct me if I misunderstood your meaning.

 

Best,

Victor

 

From: Andres Angel <[hidden email]>
Date: Wednesday, August 7, 2019 at 9:55 PM
To: Haibo Sun <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: FlatMap returning Row<> based on ArrayList elements()

 

Hello everyone, let me be more precis on what I'm looking for at the end because your example is right and very accurate in the way about how to turn an array into a Row() object.

I have done it seamlessly:

 

out.collect(Row.of(pelements.toArray()));

 

Then I printed and the outcome is as expected:

 

5d2df2c2e7370c7843dad9ca,359731,13333196,156789925,619381

 

Now I need to register this DS as a table and here is basically how I'm planning to do it:

 

tenv.registerDataStream("newtable", ds, 'col1,col2,col3,col4,col5');

 

However, this returns an error on the DS registration due to I need to specify the RowTypeInfo. Here is the big deal because yes I know I would be able to use something like :

 

 

TypeInformation<?>[] types = {

BasicTypeInfo.STRING_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO};

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

DataStream ds = previousds.flatMap(new FlatMapFunction<List<Integer>, Row>() {

@Override

public void flatMap(List<Integer> value, Collector<Row> out) throws Exception {

out.collect(Row.of(value.toArray(new Integer[0])));

}

}).return(types);

 

 

The problem with this approach is that I'm looking for a standard FlatMap anonymous function that could return every time: 1. different number of elements within the Array and 2. the data type can be random likewise. I mean is not fixed the whole time then my TypeInformation return would fix every execution.

 

How could I approach this?

 

thanks so much

AU

 

 

On Wed, Aug 7, 2019 at 3:57 AM Haibo Sun <[hidden email]> wrote:

Hi Andres Angel,

 

I guess people don't understand your problem (including me). I don't know if the following sample code is what you want, if not, can you describe the problem more clearly?

 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.fromElements(Arrays.asList(1, 2, 3, 4, 5))

.flatMap(new FlatMapFunction<List<Integer>, Row>() {

@Override

public void flatMap(List<Integer> value, Collector<Row> out) throws Exception {

out.collect(Row.of(value.toArray(new Integer[0])));

}

}).print();

 

env.execute("test job");

 

Best,

Haibo


At 2019-07-30 02:30:27, "Andres Angel" <[hidden email]> wrote:

Hello everyone,

 

I need to parse into an anonymous function an input data to turn it into several Row elements. Originally I would have done something like Row.of(1,2,3,4) but these elements can change on the flight as part of my function. This is why I have decided to store them in a list and right now it looks something like this:

 

image.png

 

Now, I need to return my out Collector it Row<> based on this elements. I checked on the Flink documentation but the Lambda functions are not supported : https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as Row.of(myTuple):

 

                    Tuple mytuple = Tuple.newInstance(5);
                    for (int i = 0; i < pelements.size(); i++) {
                        mytuple.setField(pelements.get(i), i);
                    }
                    out.collect(Row.of(mytuple));

 

 

However , it doesnt work because this is being parsed s 1 element for  sqlQuery step. how could I do something like:

 

pelements.forEach(n->out.collect(Row.of(n)));

 

Thanks so much

Reply | Threaded
Open this post in threaded view
|

Re: FlatMap returning Row<> based on ArrayList elements()

Andres Angel
Hello Victor ,

You are totally right , so now this turn into is Flink capable to handle these cases where would be required define the type info in the row and the Table will infer the columns separated by comma or something similar?

thanks
AU

On Wed, Aug 7, 2019 at 10:33 AM Victor Wong <[hidden email]> wrote:

Hi Andres,

 

I’d like to share my thoughts:

When you register a “Table”, you need to specify its “schema”, so how can you register the table when the number of elements/columns and data types are both nondeterministic.

Correct me if I misunderstood your meaning.

 

Best,

Victor

 

From: Andres Angel <[hidden email]>
Date: Wednesday, August 7, 2019 at 9:55 PM
To: Haibo Sun <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: FlatMap returning Row<> based on ArrayList elements()

 

Hello everyone, let me be more precis on what I'm looking for at the end because your example is right and very accurate in the way about how to turn an array into a Row() object.

I have done it seamlessly:

 

out.collect(Row.of(pelements.toArray()));

 

Then I printed and the outcome is as expected:

 

5d2df2c2e7370c7843dad9ca,359731,13333196,156789925,619381

 

Now I need to register this DS as a table and here is basically how I'm planning to do it:

 

tenv.registerDataStream("newtable", ds, 'col1,col2,col3,col4,col5');

 

However, this returns an error on the DS registration due to I need to specify the RowTypeInfo. Here is the big deal because yes I know I would be able to use something like :

 

 

TypeInformation<?>[] types = {

BasicTypeInfo.STRING_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO};

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

DataStream ds = previousds.flatMap(new FlatMapFunction<List<Integer>, Row>() {

@Override

public void flatMap(List<Integer> value, Collector<Row> out) throws Exception {

out.collect(Row.of(value.toArray(new Integer[0])));

}

}).return(types);

 

 

The problem with this approach is that I'm looking for a standard FlatMap anonymous function that could return every time: 1. different number of elements within the Array and 2. the data type can be random likewise. I mean is not fixed the whole time then my TypeInformation return would fix every execution.

 

How could I approach this?

 

thanks so much

AU

 

 

On Wed, Aug 7, 2019 at 3:57 AM Haibo Sun <[hidden email]> wrote:

Hi Andres Angel,

 

I guess people don't understand your problem (including me). I don't know if the following sample code is what you want, if not, can you describe the problem more clearly?

 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.fromElements(Arrays.asList(1, 2, 3, 4, 5))

.flatMap(new FlatMapFunction<List<Integer>, Row>() {

@Override

public void flatMap(List<Integer> value, Collector<Row> out) throws Exception {

out.collect(Row.of(value.toArray(new Integer[0])));

}

}).print();

 

env.execute("test job");

 

Best,

Haibo


At 2019-07-30 02:30:27, "Andres Angel" <[hidden email]> wrote:

Hello everyone,

 

I need to parse into an anonymous function an input data to turn it into several Row elements. Originally I would have done something like Row.of(1,2,3,4) but these elements can change on the flight as part of my function. This is why I have decided to store them in a list and right now it looks something like this:

 

image.png

 

Now, I need to return my out Collector it Row<> based on this elements. I checked on the Flink documentation but the Lambda functions are not supported : https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as Row.of(myTuple):

 

                    Tuple mytuple = Tuple.newInstance(5);
                    for (int i = 0; i < pelements.size(); i++) {
                        mytuple.setField(pelements.get(i), i);
                    }
                    out.collect(Row.of(mytuple));

 

 

However , it doesnt work because this is being parsed s 1 element for  sqlQuery step. how could I do something like:

 

pelements.forEach(n->out.collect(Row.of(n)));

 

Thanks so much

Reply | Threaded
Open this post in threaded view
|

Re:Re: FlatMap returning Row<> based on ArrayList elements()

Haibo Sun
Hi AU,

The problem with this approach is that I'm looking for a standard FlatMap anonymous function that could return every time: 1. different number of elements within the Array and 2. the data type can be random likewise. I mean is not fixed the whole time then my TypeInformation return would fix every execution.

As far as I know, there is no such standard flatMap function. The table definition requires a fixed number of columns, and even if Flink can infer column types, it also requires that the column types are fixed. For the case you said, the number of columns in the table should be the possible maximum number of elements. If the number of elements is not enough, you should pad all columns defined by the table and then return.  For case where elements in the same column may have different types, you can convert them to a uniform column type defined by the table, or customize a type that can handle these different types of elements.

Best,
Haibo

At 2019-08-07 23:05:51, "Andres Angel" <[hidden email]> wrote:
Hello Victor ,

You are totally right , so now this turn into is Flink capable to handle these cases where would be required define the type info in the row and the Table will infer the columns separated by comma or something similar?

thanks
AU

On Wed, Aug 7, 2019 at 10:33 AM Victor Wong <[hidden email]> wrote:

Hi Andres,

 

I’d like to share my thoughts:

When you register a “Table”, you need to specify its “schema”, so how can you register the table when the number of elements/columns and data types are both nondeterministic.

Correct me if I misunderstood your meaning.

 

Best,

Victor

 

From: Andres Angel <[hidden email]>
Date: Wednesday, August 7, 2019 at 9:55 PM
To: Haibo Sun <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: FlatMap returning Row<> based on ArrayList elements()

 

Hello everyone, let me be more precis on what I'm looking for at the end because your example is right and very accurate in the way about how to turn an array into a Row() object.

I have done it seamlessly:

 

out.collect(Row.of(pelements.toArray()));

 

Then I printed and the outcome is as expected:

 

5d2df2c2e7370c7843dad9ca,359731,13333196,156789925,619381

 

Now I need to register this DS as a table and here is basically how I'm planning to do it:

 

tenv.registerDataStream("newtable", ds, 'col1,col2,col3,col4,col5');

 

However, this returns an error on the DS registration due to I need to specify the RowTypeInfo. Here is the big deal because yes I know I would be able to use something like :

 

 

TypeInformation<?>[] types = {

BasicTypeInfo.STRING_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO};

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

DataStream ds = previousds.flatMap(new FlatMapFunction<List<Integer>, Row>() {

@Override

public void flatMap(List<Integer> value, Collector<Row> out) throws Exception {

out.collect(Row.of(value.toArray(new Integer[0])));

}

}).return(types);

 

 

The problem with this approach is that I'm looking for a standard FlatMap anonymous function that could return every time: 1. different number of elements within the Array and 2. the data type can be random likewise. I mean is not fixed the whole time then my TypeInformation return would fix every execution.

 

How could I approach this?

 

thanks so much

AU

 

 

On Wed, Aug 7, 2019 at 3:57 AM Haibo Sun <[hidden email]> wrote:

Hi Andres Angel,

 

I guess people don't understand your problem (including me). I don't know if the following sample code is what you want, if not, can you describe the problem more clearly?

 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.fromElements(Arrays.asList(1, 2, 3, 4, 5))

.flatMap(new FlatMapFunction<List<Integer>, Row>() {

@Override

public void flatMap(List<Integer> value, Collector<Row> out) throws Exception {

out.collect(Row.of(value.toArray(new Integer[0])));

}

}).print();

 

env.execute("test job");

 

Best,

Haibo


At 2019-07-30 02:30:27, "Andres Angel" <[hidden email]> wrote:

Hello everyone,

 

I need to parse into an anonymous function an input data to turn it into several Row elements. Originally I would have done something like Row.of(1,2,3,4) but these elements can change on the flight as part of my function. This is why I have decided to store them in a list and right now it looks something like this:

 

image.png

 

Now, I need to return my out Collector it Row<> based on this elements. I checked on the Flink documentation but the Lambda functions are not supported : https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as Row.of(myTuple):

 

                    Tuple mytuple = Tuple.newInstance(5);
                    for (int i = 0; i < pelements.size(); i++) {
                        mytuple.setField(pelements.get(i), i);
                    }
                    out.collect(Row.of(mytuple));

 

 

However , it doesnt work because this is being parsed s 1 element for  sqlQuery step. how could I do something like:

 

pelements.forEach(n->out.collect(Row.of(n)));

 

Thanks so much