why when use orders.aggregate(Aggregations.MAX, 2) not return one value but return more value
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Orders> orders=(DataSet<Orders>) env.readCsvFile("/home/hadoop/Desktop/Dataset/orders.csv") .fieldDelimiter('|') .includeFields(mask).ignoreFirstLine() .tupleType(get_Order().getClass()); orders.aggregate(Aggregations.MAX, 2) ; orders.print(); orders.writeAsCsv("/home/hadoop/Desktop/Dataset/join_instead_of_optmization", "\n", "|",WriteMode.OVERWRITE); env.execute(); out put 1> (280866,129457,96048.38) 1> (280867,16568,89875.17) 1> (280868,47827,118013.89) 1> (280869,104143,220415.76) 1> (280870,105512,262166.76) 1> (280871,36244,123478.83) 1> (280896,148177,92956.99) 1> (280897,83611,128889.07) 1> (280898,29863,289893.15) 1> (280899,143962,111581.46) 1> (280900,43577,26781.38) 1> (280901,87340,30915.09) 1> (280902,6769,235803.72) 1> (280903,61471,138553.46) 1> (280928,69407,168763.3) 1> (280929,114457,5392.93) 1> (280930,58939,47427.22) 1> (280931,110210,125524.13) 1> (280932,91751,11434.53) |
hi, you are not printing the aggregation but the input
val result = orders.aggregate(Aggregations.MAX, 2) result.print cheers michele > Il giorno 08/lug/2015, alle ore 02:00, hagersaleh <[hidden email]> ha scritto: > > why when use orders.aggregate(Aggregations.MAX, 2) not return one value but > return more value > > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > DataSet<Orders> orders=(DataSet<Orders>) > env.readCsvFile("/home/hadoop/Desktop/Dataset/orders.csv") > .fieldDelimiter('|') > .includeFields(mask).ignoreFirstLine() > .tupleType(get_Order().getClass()); > orders.aggregate(Aggregations.MAX, 2) ; > > orders.print(); > > orders.writeAsCsv("/home/hadoop/Desktop/Dataset/join_instead_of_optmization", > "\n", "|",WriteMode.OVERWRITE); > env.execute(); > > out put > 1> (280866,129457,96048.38) > 1> (280867,16568,89875.17) > 1> (280868,47827,118013.89) > 1> (280869,104143,220415.76) > 1> (280870,105512,262166.76) > 1> (280871,36244,123478.83) > 1> (280896,148177,92956.99) > 1> (280897,83611,128889.07) > 1> (280898,29863,289893.15) > 1> (280899,143962,111581.46) > 1> (280900,43577,26781.38) > 1> (280901,87340,30915.09) > 1> (280902,6769,235803.72) > 1> (280903,61471,138553.46) > 1> (280928,69407,168763.3) > 1> (280929,114457,5392.93) > 1> (280930,58939,47427.22) > 1> (280931,110210,125524.13) > 1> (280932,91751,11434.53) > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/why-when-use-orders-aggregate-Aggregations-MAX-2-not-return-one-value-but-return-more-value-tp1977.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Hi, aggregate performs an in-place aggregation, i.e., the input and output type of aggregate() is identical, but the aggregated fields are updated. Causion: All fields that are not associated with an aggregation function and are not not a grouping field have non-deterministic values. That means as well, that you cannot apply more than one aggregation function for each fields (e.g., computing min and max for the same field is not possible with aggregate()) Best, Fabian hi, you are not printing the aggregation but the input val result = orders.aggregate(Aggregations.MAX, 2) result.print cheers michele > Il giorno 08/lug/2015, alle ore 02:00, hagersaleh <[hidden email]> ha scritto: > > why when use orders.aggregate(Aggregations.MAX, 2) not return one value but > return more value > > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > DataSet<Orders> orders=(DataSet<Orders>) > env.readCsvFile("/home/hadoop/Desktop/Dataset/orders.csv") > .fieldDelimiter('|') > .includeFields(mask).ignoreFirstLine() > .tupleType(get_Order().getClass()); > orders.aggregate(Aggregations.MAX, 2) ; > > orders.print(); > > orders.writeAsCsv("/home/hadoop/Desktop/Dataset/join_instead_of_optmization", > "\n", "|",WriteMode.OVERWRITE); > env.execute(); > > out put > 1> (280866,129457,96048.38) > 1> (280867,16568,89875.17) > 1> (280868,47827,118013.89) > 1> (280869,104143,220415.76) > 1> (280870,105512,262166.76) > 1> (280871,36244,123478.83) > 1> (280896,148177,92956.99) > 1> (280897,83611,128889.07) > 1> (280898,29863,289893.15) > 1> (280899,143962,111581.46) > 1> (280900,43577,26781.38) > 1> (280901,87340,30915.09) > 1> (280902,6769,235803.72) > 1> (280903,61471,138553.46) > 1> (280928,69407,168763.3) > 1> (280929,114457,5392.93) > 1> (280930,58939,47427.22) > 1> (280931,110210,125524.13) > 1> (280932,91751,11434.53) > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/why-when-use-orders-aggregate-Aggregations-MAX-2-not-return-one-value-but-return-more-value-tp1977.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
I did not understand what you mean
|
This is your code (it applied the "print" before the aggregation is done)
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > DataSet<Orders> orders=(DataSet<Orders>) > env.readCsvFile("/home/hadoop/Desktop/Dataset/orders.csv") > .fieldDelimiter('|') > .includeFields(mask).ignoreFirstLine() > .tupleType(get_Order().getClass()); > orders.aggregate(Aggregations.MAX, 2) ; > > orders.print(); You need to put the print direct after the aggregate() of use a new variable: orders.aggregate(Aggregations.MAX, 2).print(); or DataSet<Orders> aggedOrders = orders.aggregate(Aggregations.MAX, 2); aggedOrders.print(); -Matthias On 07/08/2015 10:30 PM, hagersaleh wrote: > I did not understand what you mean > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/why-when-use-orders-aggregate-Aggregations-MAX-2-not-return-one-value-but-return-more-value-tp1977p1989.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > signature.asc (836 bytes) Download Attachment |
Free forum by Nabble | Edit this page |