Create Tuple Array dynamically fro a DS

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Create Tuple Array dynamically fro a DS

Andres Angel
Hello everyone, 

I need to create dynamically the size of my Tuple that feeds a DS, let me explain it better. Let's assume the first payload I read has this format "filed1,field2,field3", then this might require a Tuple3<> but my payload later can be "field1,field2,field3,field4" then my Tuple might need to be refine it on the flight and now be Tuple4<>.

How could I create this dynamically, any idea?

Thanks so much
Reply | Threaded
Open this post in threaded view
|

Re: Create Tuple Array dynamically fro a DS

Caizhi Weng
Hi Andres,

Are the payloads strings? If yes, one method is that you can store them as strings and process it further with user defined functions when you need to use them.

Another method is that you can store them into arrays.

Also, if the type of the first 3 fields are the same for the first and second payload, you can use a Tuple4<> and set the last element as null for the first payload.

Andres Angel <[hidden email]> 于2019年7月24日周三 上午10:09写道:
Hello everyone, 

I need to create dynamically the size of my Tuple that feeds a DS, let me explain it better. Let's assume the first payload I read has this format "filed1,field2,field3", then this might require a Tuple3<> but my payload later can be "field1,field2,field3,field4" then my Tuple might need to be refine it on the flight and now be Tuple4<>.

How could I create this dynamically, any idea?

Thanks so much
Reply | Threaded
Open this post in threaded view
|

Re: Create Tuple Array dynamically fro a DS

Andres Angel
Hello Weng,

thanks for your reply, however I'm struggling to somehow read the content of my DS with the payload that defines how many fields the message contains into a String. That is the reason why I thought into a map function for that DS.

The Tuple part can change overtime can even pass from 3 or 4 to 2 then it can change the whole time. How could I approach this challenge?

thanks so much 

On Tue, Jul 23, 2019 at 10:23 PM Caizhi Weng <[hidden email]> wrote:
Hi Andres,

Are the payloads strings? If yes, one method is that you can store them as strings and process it further with user defined functions when you need to use them.

Another method is that you can store them into arrays.

Also, if the type of the first 3 fields are the same for the first and second payload, you can use a Tuple4<> and set the last element as null for the first payload.

Andres Angel <[hidden email]> 于2019年7月24日周三 上午10:09写道:
Hello everyone, 

I need to create dynamically the size of my Tuple that feeds a DS, let me explain it better. Let's assume the first payload I read has this format "filed1,field2,field3", then this might require a Tuple3<> but my payload later can be "field1,field2,field3,field4" then my Tuple might need to be refine it on the flight and now be Tuple4<>.

How could I create this dynamically, any idea?

Thanks so much
Reply | Threaded
Open this post in threaded view
|

Re: Create Tuple Array dynamically fro a DS

Caizhi Weng
Hi Andres,

Sorry I can't quite get your question... Do you mean that how to spilt the string into fields?

There is a `split` method in java. You can give it a regexp and it will return an array containing all the split fields.

Andres Angel <[hidden email]> 于2019年7月24日周三 上午10:28写道:
Hello Weng,

thanks for your reply, however I'm struggling to somehow read the content of my DS with the payload that defines how many fields the message contains into a String. That is the reason why I thought into a map function for that DS.

The Tuple part can change overtime can even pass from 3 or 4 to 2 then it can change the whole time. How could I approach this challenge?

thanks so much 

On Tue, Jul 23, 2019 at 10:23 PM Caizhi Weng <[hidden email]> wrote:
Hi Andres,

Are the payloads strings? If yes, one method is that you can store them as strings and process it further with user defined functions when you need to use them.

Another method is that you can store them into arrays.

Also, if the type of the first 3 fields are the same for the first and second payload, you can use a Tuple4<> and set the last element as null for the first payload.

Andres Angel <[hidden email]> 于2019年7月24日周三 上午10:09写道:
Hello everyone, 

I need to create dynamically the size of my Tuple that feeds a DS, let me explain it better. Let's assume the first payload I read has this format "filed1,field2,field3", then this might require a Tuple3<> but my payload later can be "field1,field2,field3,field4" then my Tuple might need to be refine it on the flight and now be Tuple4<>.

How could I create this dynamically, any idea?

Thanks so much
Reply | Threaded
Open this post in threaded view
|

Re: Create Tuple Array dynamically fro a DS

Andres Angel
Hello,

Let me list properly the questions I have:

* How to catch into a string the content of a DataStream? about this point basically I have a DS<String> , the only way how I can use the content is within a map function , print , store the content somewhere or SQL queries. The point is that I need the content because depending on that content I need to create another DS and later register it as a Table environment, which means I need the value content but likewise the headers content and the whole info is within the DS<String>. The first option I had was play with the map function but apparently I can't create a new DS within a map function and less register it as a new table environment.

My second option in this point could be create a sort of public variable to store the DS<String> content and then create my UDF, but sadly this is neither allowed. My options in this case would be either somehow store public the content of the DS<String> into a new variable, turn the DS<String> as String or store the content in a file and read the file and start over to parse the content to serve the header and content for the new DS.

* How dynamically create the DS<Tuple>: well basically after parse the point above I might end up with an array of fields sometimes 4,3,2 doesnt matter then I might need to swing between different tuples or turn my content into Row to create a DS<Row>.

I'm looking forward to reading your comments.

thanks so much

On Tue, Jul 23, 2019 at 10:34 PM Caizhi Weng <[hidden email]> wrote:
Hi Andres,

Sorry I can't quite get your question... Do you mean that how to spilt the string into fields?

There is a `split` method in java. You can give it a regexp and it will return an array containing all the split fields.

Andres Angel <[hidden email]> 于2019年7月24日周三 上午10:28写道:
Hello Weng,

thanks for your reply, however I'm struggling to somehow read the content of my DS with the payload that defines how many fields the message contains into a String. That is the reason why I thought into a map function for that DS.

The Tuple part can change overtime can even pass from 3 or 4 to 2 then it can change the whole time. How could I approach this challenge?

thanks so much 

On Tue, Jul 23, 2019 at 10:23 PM Caizhi Weng <[hidden email]> wrote:
Hi Andres,

Are the payloads strings? If yes, one method is that you can store them as strings and process it further with user defined functions when you need to use them.

Another method is that you can store them into arrays.

Also, if the type of the first 3 fields are the same for the first and second payload, you can use a Tuple4<> and set the last element as null for the first payload.

Andres Angel <[hidden email]> 于2019年7月24日周三 上午10:09写道:
Hello everyone, 

I need to create dynamically the size of my Tuple that feeds a DS, let me explain it better. Let's assume the first payload I read has this format "filed1,field2,field3", then this might require a Tuple3<> but my payload later can be "field1,field2,field3,field4" then my Tuple might need to be refine it on the flight and now be Tuple4<>.

How could I create this dynamically, any idea?

Thanks so much
Reply | Threaded
Open this post in threaded view
|

Re: Create Tuple Array dynamically fro a DS

Caizhi Weng
Hi Andres,

Thanks for the detailed explanation.

but apparently I can't create a new DS within a map function

If you create a new DS within the map function, then you'll create as many DSs as the number of elements in the old DS which... doesn't seem to be your desired situation? I suppose you want to create a DS<Tuple> from DS<String>. If that is the case you can write something like this:

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dsString = env.fromElements("1,a,1.1", "2,b,2.2,-2", "3,c", "4,d,4.4");
DataStream<Object> dsTuple = dsString.map(s -> {
String[] split = s.split(",");
if (split.length == 2) {
return new Tuple2<>(Integer.valueOf(split[0]), split[1]);
} else if (split.length == 3) {
return new Tuple3<>(Integer.valueOf(split[0]), split[1], Double.valueOf(split[2]));
} else {
return new Tuple4<>(Integer.valueOf(split[0]), split[1], Double.valueOf(split[2]), Long.valueOf(split[3]));
}
});

dsTuple.print();
env.execute();
}

How dynamically create the DS<Tuple>

As you can see in the above code, I did not create a DS<Tuple> but a DS<Object>, because Tuple can't be directly used. It seems that you want to turn this new DS into a table, but if different records have different number of columns this is not a good practice as the schema of each record is not the same (but as a workaround, you can fill the columns with null if some record doesn't have this column).

Hope this solves your problem. If you have any other problems feel free to write back.

Andres Angel <[hidden email]> 于2019年7月24日周三 上午10:50写道:
Hello,

Let me list properly the questions I have:

* How to catch into a string the content of a DataStream? about this point basically I have a DS<String> , the only way how I can use the content is within a map function , print , store the content somewhere or SQL queries. The point is that I need the content because depending on that content I need to create another DS and later register it as a Table environment, which means I need the value content but likewise the headers content and the whole info is within the DS<String>. The first option I had was play with the map function but apparently I can't create a new DS within a map function and less register it as a new table environment.

My second option in this point could be create a sort of public variable to store the DS<String> content and then create my UDF, but sadly this is neither allowed. My options in this case would be either somehow store public the content of the DS<String> into a new variable, turn the DS<String> as String or store the content in a file and read the file and start over to parse the content to serve the header and content for the new DS.

* How dynamically create the DS<Tuple>: well basically after parse the point above I might end up with an array of fields sometimes 4,3,2 doesnt matter then I might need to swing between different tuples or turn my content into Row to create a DS<Row>.

I'm looking forward to reading your comments.

thanks so much

On Tue, Jul 23, 2019 at 10:34 PM Caizhi Weng <[hidden email]> wrote:
Hi Andres,

Sorry I can't quite get your question... Do you mean that how to spilt the string into fields?

There is a `split` method in java. You can give it a regexp and it will return an array containing all the split fields.

Andres Angel <[hidden email]> 于2019年7月24日周三 上午10:28写道:
Hello Weng,

thanks for your reply, however I'm struggling to somehow read the content of my DS with the payload that defines how many fields the message contains into a String. That is the reason why I thought into a map function for that DS.

The Tuple part can change overtime can even pass from 3 or 4 to 2 then it can change the whole time. How could I approach this challenge?

thanks so much 

On Tue, Jul 23, 2019 at 10:23 PM Caizhi Weng <[hidden email]> wrote:
Hi Andres,

Are the payloads strings? If yes, one method is that you can store them as strings and process it further with user defined functions when you need to use them.

Another method is that you can store them into arrays.

Also, if the type of the first 3 fields are the same for the first and second payload, you can use a Tuple4<> and set the last element as null for the first payload.

Andres Angel <[hidden email]> 于2019年7月24日周三 上午10:09写道:
Hello everyone, 

I need to create dynamically the size of my Tuple that feeds a DS, let me explain it better. Let's assume the first payload I read has this format "filed1,field2,field3", then this might require a Tuple3<> but my payload later can be "field1,field2,field3,field4" then my Tuple might need to be refine it on the flight and now be Tuple4<>.

How could I create this dynamically, any idea?

Thanks so much
Reply | Threaded
Open this post in threaded view
|

Re: Create Tuple Array dynamically fro a DS

Andres Angel
Hello Weng,

This definitely helps a lot,  however I know my initial DS has a single row content then I would in theory just create a DS which is what I need. That is why I need to know how to create a new environment DS within a map function.

thanks so much

On Tue, Jul 23, 2019 at 11:41 PM Caizhi Weng <[hidden email]> wrote:
Hi Andres,

Thanks for the detailed explanation.

but apparently I can't create a new DS within a map function

If you create a new DS within the map function, then you'll create as many DSs as the number of elements in the old DS which... doesn't seem to be your desired situation? I suppose you want to create a DS<Tuple> from DS<String>. If that is the case you can write something like this:

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dsString = env.fromElements("1,a,1.1", "2,b,2.2,-2", "3,c", "4,d,4.4");
DataStream<Object> dsTuple = dsString.map(s -> {
String[] split = s.split(",");
if (split.length == 2) {
return new Tuple2<>(Integer.valueOf(split[0]), split[1]);
} else if (split.length == 3) {
return new Tuple3<>(Integer.valueOf(split[0]), split[1], Double.valueOf(split[2]));
} else {
return new Tuple4<>(Integer.valueOf(split[0]), split[1], Double.valueOf(split[2]), Long.valueOf(split[3]));
}
});

dsTuple.print();
env.execute();
}

How dynamically create the DS<Tuple>

As you can see in the above code, I did not create a DS<Tuple> but a DS<Object>, because Tuple can't be directly used. It seems that you want to turn this new DS into a table, but if different records have different number of columns this is not a good practice as the schema of each record is not the same (but as a workaround, you can fill the columns with null if some record doesn't have this column).

Hope this solves your problem. If you have any other problems feel free to write back.

Andres Angel <[hidden email]> 于2019年7月24日周三 上午10:50写道:
Hello,

Let me list properly the questions I have:

* How to catch into a string the content of a DataStream? about this point basically I have a DS<String> , the only way how I can use the content is within a map function , print , store the content somewhere or SQL queries. The point is that I need the content because depending on that content I need to create another DS and later register it as a Table environment, which means I need the value content but likewise the headers content and the whole info is within the DS<String>. The first option I had was play with the map function but apparently I can't create a new DS within a map function and less register it as a new table environment.

My second option in this point could be create a sort of public variable to store the DS<String> content and then create my UDF, but sadly this is neither allowed. My options in this case would be either somehow store public the content of the DS<String> into a new variable, turn the DS<String> as String or store the content in a file and read the file and start over to parse the content to serve the header and content for the new DS.

* How dynamically create the DS<Tuple>: well basically after parse the point above I might end up with an array of fields sometimes 4,3,2 doesnt matter then I might need to swing between different tuples or turn my content into Row to create a DS<Row>.

I'm looking forward to reading your comments.

thanks so much

On Tue, Jul 23, 2019 at 10:34 PM Caizhi Weng <[hidden email]> wrote:
Hi Andres,

Sorry I can't quite get your question... Do you mean that how to spilt the string into fields?

There is a `split` method in java. You can give it a regexp and it will return an array containing all the split fields.

Andres Angel <[hidden email]> 于2019年7月24日周三 上午10:28写道:
Hello Weng,

thanks for your reply, however I'm struggling to somehow read the content of my DS with the payload that defines how many fields the message contains into a String. That is the reason why I thought into a map function for that DS.

The Tuple part can change overtime can even pass from 3 or 4 to 2 then it can change the whole time. How could I approach this challenge?

thanks so much 

On Tue, Jul 23, 2019 at 10:23 PM Caizhi Weng <[hidden email]> wrote:
Hi Andres,

Are the payloads strings? If yes, one method is that you can store them as strings and process it further with user defined functions when you need to use them.

Another method is that you can store them into arrays.

Also, if the type of the first 3 fields are the same for the first and second payload, you can use a Tuple4<> and set the last element as null for the first payload.

Andres Angel <[hidden email]> 于2019年7月24日周三 上午10:09写道:
Hello everyone, 

I need to create dynamically the size of my Tuple that feeds a DS, let me explain it better. Let's assume the first payload I read has this format "filed1,field2,field3", then this might require a Tuple3<> but my payload later can be "field1,field2,field3,field4" then my Tuple might need to be refine it on the flight and now be Tuple4<>.

How could I create this dynamically, any idea?

Thanks so much
Reply | Threaded
Open this post in threaded view
|

Re: Create Tuple Array dynamically fro a DS

Caizhi Weng
Hi Andres,

In that case you should use `flatMap` method instead of `map` method. `flatMap` method allows you to return multiple elements and collect them all into one DS. This applies even if you have multiple contents in your DS<String>.

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dsString = env.fromElements("1,a,1.1|2,b,2.2,-2", "3,c|4,d,4.4");
DataStream<Object> dsTuple = dsString.flatMap(new FlatMapFunction<String, Object>() {
@Override
public void flatMap(String value, Collector<Object> out) throws Exception {
for (String record : value.split("\\|")) {
String[] split = record.split(",");
if (split.length == 2) {
out.collect(new Tuple2<>(Integer.valueOf(split[0]), split[1]));
} else if (split.length == 3) {
out.collect(new Tuple3<>(Integer.valueOf(split[0]), split[1], Double.valueOf(split[2])));
} else {
out.collect(new Tuple4<>(Integer.valueOf(split[0]), split[1], Double.valueOf(split[2]), Long.valueOf(split[3])));
}
}
}
});

dsTuple.print();
env.execute();
}

Andres Angel <[hidden email]> 于2019年7月24日周三 上午11:47写道:
Hello Weng,

This definitely helps a lot,  however I know my initial DS has a single row content then I would in theory just create a DS which is what I need. That is why I need to know how to create a new environment DS within a map function.

thanks so much

On Tue, Jul 23, 2019 at 11:41 PM Caizhi Weng <[hidden email]> wrote:
Hi Andres,

Thanks for the detailed explanation.

but apparently I can't create a new DS within a map function

If you create a new DS within the map function, then you'll create as many DSs as the number of elements in the old DS which... doesn't seem to be your desired situation? I suppose you want to create a DS<Tuple> from DS<String>. If that is the case you can write something like this:

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dsString = env.fromElements("1,a,1.1", "2,b,2.2,-2", "3,c", "4,d,4.4");
DataStream<Object> dsTuple = dsString.map(s -> {
String[] split = s.split(",");
if (split.length == 2) {
return new Tuple2<>(Integer.valueOf(split[0]), split[1]);
} else if (split.length == 3) {
return new Tuple3<>(Integer.valueOf(split[0]), split[1], Double.valueOf(split[2]));
} else {
return new Tuple4<>(Integer.valueOf(split[0]), split[1], Double.valueOf(split[2]), Long.valueOf(split[3]));
}
});

dsTuple.print();
env.execute();
}

How dynamically create the DS<Tuple>

As you can see in the above code, I did not create a DS<Tuple> but a DS<Object>, because Tuple can't be directly used. It seems that you want to turn this new DS into a table, but if different records have different number of columns this is not a good practice as the schema of each record is not the same (but as a workaround, you can fill the columns with null if some record doesn't have this column).

Hope this solves your problem. If you have any other problems feel free to write back.

Andres Angel <[hidden email]> 于2019年7月24日周三 上午10:50写道:
Hello,

Let me list properly the questions I have:

* How to catch into a string the content of a DataStream? about this point basically I have a DS<String> , the only way how I can use the content is within a map function , print , store the content somewhere or SQL queries. The point is that I need the content because depending on that content I need to create another DS and later register it as a Table environment, which means I need the value content but likewise the headers content and the whole info is within the DS<String>. The first option I had was play with the map function but apparently I can't create a new DS within a map function and less register it as a new table environment.

My second option in this point could be create a sort of public variable to store the DS<String> content and then create my UDF, but sadly this is neither allowed. My options in this case would be either somehow store public the content of the DS<String> into a new variable, turn the DS<String> as String or store the content in a file and read the file and start over to parse the content to serve the header and content for the new DS.

* How dynamically create the DS<Tuple>: well basically after parse the point above I might end up with an array of fields sometimes 4,3,2 doesnt matter then I might need to swing between different tuples or turn my content into Row to create a DS<Row>.

I'm looking forward to reading your comments.

thanks so much

On Tue, Jul 23, 2019 at 10:34 PM Caizhi Weng <[hidden email]> wrote:
Hi Andres,

Sorry I can't quite get your question... Do you mean that how to spilt the string into fields?

There is a `split` method in java. You can give it a regexp and it will return an array containing all the split fields.

Andres Angel <[hidden email]> 于2019年7月24日周三 上午10:28写道:
Hello Weng,

thanks for your reply, however I'm struggling to somehow read the content of my DS with the payload that defines how many fields the message contains into a String. That is the reason why I thought into a map function for that DS.

The Tuple part can change overtime can even pass from 3 or 4 to 2 then it can change the whole time. How could I approach this challenge?

thanks so much 

On Tue, Jul 23, 2019 at 10:23 PM Caizhi Weng <[hidden email]> wrote:
Hi Andres,

Are the payloads strings? If yes, one method is that you can store them as strings and process it further with user defined functions when you need to use them.

Another method is that you can store them into arrays.

Also, if the type of the first 3 fields are the same for the first and second payload, you can use a Tuple4<> and set the last element as null for the first payload.

Andres Angel <[hidden email]> 于2019年7月24日周三 上午10:09写道:
Hello everyone, 

I need to create dynamically the size of my Tuple that feeds a DS, let me explain it better. Let's assume the first payload I read has this format "filed1,field2,field3", then this might require a Tuple3<> but my payload later can be "field1,field2,field3,field4" then my Tuple might need to be refine it on the flight and now be Tuple4<>.

How could I create this dynamically, any idea?

Thanks so much