Hi to all,
I'm trying to apply a rich reduce function after a countWindowAll but Flink says "ReduceFunction of reduce can not be a RichFunction. Please use reduce(ReduceFunction, WindowFunction) instead." Is there any good reason for this? Or am I doing something wrong? Best, Flavio |
Hi Flavio,
the reason is that under the covers the ReduceFunction will be used as the ReduceFunction of a ReducingState. And those cannot be rich functions because we cannot provide all the required context "inside" the state backend. You can see how the ReduceFunction is used to create a ReducingStateDescriptor here: https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300 Best, Aljoscha On 16.07.20 16:28, Flavio Pompermaier wrote: > Hi to all, > I'm trying to apply a rich reduce function after a countWindowAll but Flink > says > "ReduceFunction of reduce can not be a RichFunction. Please use > reduce(ReduceFunction, WindowFunction) instead." > > Is there any good reason for this? Or am I doing something wrong? > > Best, > Flavio > |
Thanks Aljosha for the reply. So what can I do in my reduce function that contains transient variables (i.e. not serializable)? On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek <[hidden email]> wrote: Hi Flavio, |
What are you trying to do in the ReduceFunction? Without knowing the
code, maybe an aggregate(AggregateFunction) is the solution. Best, Aljoscha On 20.07.20 18:03, Flavio Pompermaier wrote: > Thanks Aljosha for the reply. So what can I do in my reduce function that > contains transient variables (i.e. not serializable)? > > On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek <[hidden email]> > wrote: > >> Hi Flavio, >> >> the reason is that under the covers the ReduceFunction will be used as >> the ReduceFunction of a ReducingState. And those cannot be rich >> functions because we cannot provide all the required context "inside" >> the state backend. >> >> You can see how the ReduceFunction is used to create a >> ReducingStateDescriptor here: >> >> https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300 >> >> Best, >> Aljoscha >> >> On 16.07.20 16:28, Flavio Pompermaier wrote: >>> Hi to all, >>> I'm trying to apply a rich reduce function after a countWindowAll but >> Flink >>> says >>> "ReduceFunction of reduce can not be a RichFunction. Please use >>> reduce(ReduceFunction, WindowFunction) instead." >>> >>> Is there any good reason for this? Or am I doing something wrong? >>> >>> Best, >>> Flavio >>> >> > |
In my reduce function I want to compute some aggregation on the sub-results of a map-partition (that I tried to migrate from DataSet to DataStream without success). The original code was something like: input.mapPartition(new RowToStringSketches(sketchMapSize)) // .reduce(new SketchesStringReducer()) // .map(new SketchesStringToStatsPojo(colIndex, topK)); I asked about the simulation of the mapPartition function in the streaming env in another thread in the mailing list [1] because I was not able to test it..it seems that the program was exiting before be able to process anything.. So I gave up on replacing DataSet with DataStream API for the moment..it seems that there are too many things still to migrate. Btw, this is the reduce function: public class SketchesStringReducer extends RichReduceFunction<Tuple2<byte[], byte[]>> { private static final long serialVersionUID = 1L; private transient ArrayOfItemsSerDe<String> serDe; @Override public void open(Configuration parameters) throws Exception { this.serDe = new ArrayOfStringsSerDe(); } @Override public Tuple2<byte[], byte[]> reduce(Tuple2<byte[], byte[]> t1, Tuple2<byte[], byte[]> t2) throws Exception { // merge HLL final HllSketch hll1 = HllSketch.heapify(Memory.wrap(t1.f0)); final HllSketch hll2 = HllSketch.heapify(Memory.wrap(t2.f0)); final Union union = new Union(hll1.getLgConfigK()); union.update(hll1); union.update(hll2); final byte[] hllSketchBytes = union.getResult().toCompactByteArray(); // merge Item final ItemsSketch<String> s1 = ItemsSketch.getInstance(Memory.wrap(t1.f1), serDe); final ItemsSketch<String> s2 = ItemsSketch.getInstance(Memory.wrap(t2.f1), serDe); final byte[] itemSketchBytes = s1.merge(s2).toByteArray(serDe); return new Tuple2<>(hllSketchBytes, itemSketchBytes); } } On Mon, Jul 20, 2020 at 6:32 PM Aljoscha Krettek <[hidden email]> wrote: What are you trying to do in the ReduceFunction? Without knowing the |
I think that should work with an aggregate() instead of reduce().
Best, Aljoscha On 24.07.20 17:02, Flavio Pompermaier wrote: > In my reduce function I want to compute some aggregation on the sub-results > of a map-partition (that I tried to migrate from DataSet to DataStream > without success). > The original code was something like: > > input.mapPartition(new RowToStringSketches(sketchMapSize)) // > .reduce(new SketchesStringReducer()) // > .map(new SketchesStringToStatsPojo(colIndex, topK)); > > I asked about the simulation of the mapPartition function in the streaming > env in another thread in the mailing list [1] because I was not able to > test it..it seems that the program was exiting before be able to process > anything.. > So I gave up on replacing DataSet with DataStream API for the moment..it > seems that there are too many things still to migrate. > Btw, this is the reduce function: > > public class SketchesStringReducer extends > RichReduceFunction<Tuple2<byte[], byte[]>> { > private static final long serialVersionUID = 1L; > > private transient ArrayOfItemsSerDe<String> serDe; > > @Override > public void open(Configuration parameters) throws Exception { > this.serDe = new ArrayOfStringsSerDe(); > } > > @Override > public Tuple2<byte[], byte[]> reduce(Tuple2<byte[], byte[]> t1, > Tuple2<byte[], byte[]> t2) > throws Exception { > // merge HLL > final HllSketch hll1 = HllSketch.heapify(Memory.wrap(t1.f0)); > final HllSketch hll2 = HllSketch.heapify(Memory.wrap(t2.f0)); > final Union union = new Union(hll1.getLgConfigK()); > union.update(hll1); > union.update(hll2); > final byte[] hllSketchBytes = union.getResult().toCompactByteArray(); > > // merge Item > final ItemsSketch<String> s1 = > ItemsSketch.getInstance(Memory.wrap(t1.f1), serDe); > final ItemsSketch<String> s2 = > ItemsSketch.getInstance(Memory.wrap(t2.f1), serDe); > final byte[] itemSketchBytes = s1.merge(s2).toByteArray(serDe); > return new Tuple2<>(hllSketchBytes, itemSketchBytes); > } > } > > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-use-stream-API-with-this-program-td36715.html#a36767 > > On Mon, Jul 20, 2020 at 6:32 PM Aljoscha Krettek <[hidden email]> > wrote: > >> What are you trying to do in the ReduceFunction? Without knowing the >> code, maybe an aggregate(AggregateFunction) is the solution. >> >> Best, >> Aljoscha >> >> On 20.07.20 18:03, Flavio Pompermaier wrote: >>> Thanks Aljosha for the reply. So what can I do in my reduce function that >>> contains transient variables (i.e. not serializable)? >>> >>> On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek <[hidden email]> >>> wrote: >>> >>>> Hi Flavio, >>>> >>>> the reason is that under the covers the ReduceFunction will be used as >>>> the ReduceFunction of a ReducingState. And those cannot be rich >>>> functions because we cannot provide all the required context "inside" >>>> the state backend. >>>> >>>> You can see how the ReduceFunction is used to create a >>>> ReducingStateDescriptor here: >>>> >>>> >> https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300 >>>> >>>> Best, >>>> Aljoscha >>>> >>>> On 16.07.20 16:28, Flavio Pompermaier wrote: >>>>> Hi to all, >>>>> I'm trying to apply a rich reduce function after a countWindowAll but >>>> Flink >>>>> says >>>>> "ReduceFunction of reduce can not be a RichFunction. Please use >>>>> reduce(ReduceFunction, WindowFunction) instead." >>>>> >>>>> Is there any good reason for this? Or am I doing something wrong? >>>>> >>>>> Best, >>>>> Flavio >>>>> >>>> >>> >> > |
Ok thanks for the suggestion but I think I'll wait for another Flink version before migrating Datasets to Datastream I think... In my experience it is very helpful to have open/close on all operators. Best, Flavio On Tue, Jul 28, 2020 at 8:51 AM Aljoscha Krettek <[hidden email]> wrote: I think that should work with an aggregate() instead of reduce(). |
Free forum by Nabble | Edit this page |