AllwindowStream and RichReduceFunction

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

AllwindowStream and RichReduceFunction

Flavio Pompermaier
Hi to all,
I'm trying to apply a rich reduce function after a countWindowAll but Flink says 
"ReduceFunction of reduce can not be a RichFunction. Please use reduce(ReduceFunction, WindowFunction) instead."

Is there any good reason for this? Or am I doing something wrong?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: AllwindowStream and RichReduceFunction

Aljoscha Krettek
Hi Flavio,

the reason is that under the covers the ReduceFunction will be used as
the ReduceFunction of a ReducingState. And those cannot be rich
functions because we cannot provide all the required context "inside"
the state backend.

You can see how the ReduceFunction is used to create a
ReducingStateDescriptor here:
https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300

Best,
Aljoscha

On 16.07.20 16:28, Flavio Pompermaier wrote:

> Hi to all,
> I'm trying to apply a rich reduce function after a countWindowAll but Flink
> says
> "ReduceFunction of reduce can not be a RichFunction. Please use
> reduce(ReduceFunction, WindowFunction) instead."
>
> Is there any good reason for this? Or am I doing something wrong?
>
> Best,
> Flavio
>

Reply | Threaded
Open this post in threaded view
|

Re: AllwindowStream and RichReduceFunction

Flavio Pompermaier
Thanks Aljosha for the reply. So what can I do in my reduce function that contains transient variables (i.e. not serializable)?

On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek <[hidden email]> wrote:
Hi Flavio,

the reason is that under the covers the ReduceFunction will be used as
the ReduceFunction of a ReducingState. And those cannot be rich
functions because we cannot provide all the required context "inside"
the state backend.

You can see how the ReduceFunction is used to create a
ReducingStateDescriptor here:
https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300

Best,
Aljoscha

On 16.07.20 16:28, Flavio Pompermaier wrote:
> Hi to all,
> I'm trying to apply a rich reduce function after a countWindowAll but Flink
> says
> "ReduceFunction of reduce can not be a RichFunction. Please use
> reduce(ReduceFunction, WindowFunction) instead."
>
> Is there any good reason for this? Or am I doing something wrong?
>
> Best,
> Flavio

Reply | Threaded
Open this post in threaded view
|

Re: AllwindowStream and RichReduceFunction

Aljoscha Krettek
What are you trying to do in the ReduceFunction? Without knowing the
code, maybe an aggregate(AggregateFunction) is the solution.

Best,
Aljoscha

On 20.07.20 18:03, Flavio Pompermaier wrote:

> Thanks Aljosha for the reply. So what can I do in my reduce function that
> contains transient variables (i.e. not serializable)?
>
> On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek <[hidden email]>
> wrote:
>
>> Hi Flavio,
>>
>> the reason is that under the covers the ReduceFunction will be used as
>> the ReduceFunction of a ReducingState. And those cannot be rich
>> functions because we cannot provide all the required context "inside"
>> the state backend.
>>
>> You can see how the ReduceFunction is used to create a
>> ReducingStateDescriptor here:
>>
>> https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300
>>
>> Best,
>> Aljoscha
>>
>> On 16.07.20 16:28, Flavio Pompermaier wrote:
>>> Hi to all,
>>> I'm trying to apply a rich reduce function after a countWindowAll but
>> Flink
>>> says
>>> "ReduceFunction of reduce can not be a RichFunction. Please use
>>> reduce(ReduceFunction, WindowFunction) instead."
>>>
>>> Is there any good reason for this? Or am I doing something wrong?
>>>
>>> Best,
>>> Flavio
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: AllwindowStream and RichReduceFunction

Flavio Pompermaier
In my reduce function I want to compute some aggregation on the sub-results of a map-partition (that I tried to migrate from DataSet to DataStream without success).
The original code was something like:

 input.mapPartition(new RowToStringSketches(sketchMapSize)) //
        .reduce(new SketchesStringReducer()) //
        .map(new SketchesStringToStatsPojo(colIndex, topK));

I asked about the simulation of the mapPartition function in the streaming env in another thread in the mailing list [1] because I was not able to test it..it seems that the program was exiting before be able to process anything..
So I gave up on replacing DataSet with DataStream API for the moment..it seems that there are too many things still to migrate.
Btw, this is the reduce function:

public class SketchesStringReducer extends RichReduceFunction<Tuple2<byte[], byte[]>> {
  private static final long serialVersionUID = 1L;

  private transient ArrayOfItemsSerDe<String> serDe;

  @Override
  public void open(Configuration parameters) throws Exception {
    this.serDe = new ArrayOfStringsSerDe();
  }

  @Override
  public Tuple2<byte[], byte[]> reduce(Tuple2<byte[], byte[]> t1, Tuple2<byte[], byte[]> t2)
      throws Exception {
    // merge HLL
    final HllSketch hll1 = HllSketch.heapify(Memory.wrap(t1.f0));
    final HllSketch hll2 = HllSketch.heapify(Memory.wrap(t2.f0));
    final Union union = new Union(hll1.getLgConfigK());
    union.update(hll1);
    union.update(hll2);
    final byte[] hllSketchBytes = union.getResult().toCompactByteArray();

    // merge Item
    final ItemsSketch<String> s1 = ItemsSketch.getInstance(Memory.wrap(t1.f1), serDe);
    final ItemsSketch<String> s2 = ItemsSketch.getInstance(Memory.wrap(t2.f1), serDe);
    final byte[] itemSketchBytes = s1.merge(s2).toByteArray(serDe);
    return new Tuple2<>(hllSketchBytes, itemSketchBytes);
  }
}


On Mon, Jul 20, 2020 at 6:32 PM Aljoscha Krettek <[hidden email]> wrote:
What are you trying to do in the ReduceFunction? Without knowing the
code, maybe an aggregate(AggregateFunction) is the solution.

Best,
Aljoscha

On 20.07.20 18:03, Flavio Pompermaier wrote:
> Thanks Aljosha for the reply. So what can I do in my reduce function that
> contains transient variables (i.e. not serializable)?
>
> On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek <[hidden email]>
> wrote:
>
>> Hi Flavio,
>>
>> the reason is that under the covers the ReduceFunction will be used as
>> the ReduceFunction of a ReducingState. And those cannot be rich
>> functions because we cannot provide all the required context "inside"
>> the state backend.
>>
>> You can see how the ReduceFunction is used to create a
>> ReducingStateDescriptor here:
>>
>> https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300
>>
>> Best,
>> Aljoscha
>>
>> On 16.07.20 16:28, Flavio Pompermaier wrote:
>>> Hi to all,
>>> I'm trying to apply a rich reduce function after a countWindowAll but
>> Flink
>>> says
>>> "ReduceFunction of reduce can not be a RichFunction. Please use
>>> reduce(ReduceFunction, WindowFunction) instead."
>>>
>>> Is there any good reason for this? Or am I doing something wrong?
>>>
>>> Best,
>>> Flavio
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: AllwindowStream and RichReduceFunction

Aljoscha Krettek
I think that should work with an aggregate() instead of reduce().

Best,
Aljoscha

On 24.07.20 17:02, Flavio Pompermaier wrote:

> In my reduce function I want to compute some aggregation on the sub-results
> of a map-partition (that I tried to migrate from DataSet to DataStream
> without success).
> The original code was something like:
>
>   input.mapPartition(new RowToStringSketches(sketchMapSize)) //
>          .reduce(new SketchesStringReducer()) //
>          .map(new SketchesStringToStatsPojo(colIndex, topK));
>
> I asked about the simulation of the mapPartition function in the streaming
> env in another thread in the mailing list [1] because I was not able to
> test it..it seems that the program was exiting before be able to process
> anything..
> So I gave up on replacing DataSet with DataStream API for the moment..it
> seems that there are too many things still to migrate.
> Btw, this is the reduce function:
>
> public class SketchesStringReducer extends
> RichReduceFunction<Tuple2<byte[], byte[]>> {
>    private static final long serialVersionUID = 1L;
>
>    private transient ArrayOfItemsSerDe<String> serDe;
>
>    @Override
>    public void open(Configuration parameters) throws Exception {
>      this.serDe = new ArrayOfStringsSerDe();
>    }
>
>    @Override
>    public Tuple2<byte[], byte[]> reduce(Tuple2<byte[], byte[]> t1,
> Tuple2<byte[], byte[]> t2)
>        throws Exception {
>      // merge HLL
>      final HllSketch hll1 = HllSketch.heapify(Memory.wrap(t1.f0));
>      final HllSketch hll2 = HllSketch.heapify(Memory.wrap(t2.f0));
>      final Union union = new Union(hll1.getLgConfigK());
>      union.update(hll1);
>      union.update(hll2);
>      final byte[] hllSketchBytes = union.getResult().toCompactByteArray();
>
>      // merge Item
>      final ItemsSketch<String> s1 =
> ItemsSketch.getInstance(Memory.wrap(t1.f1), serDe);
>      final ItemsSketch<String> s2 =
> ItemsSketch.getInstance(Memory.wrap(t2.f1), serDe);
>      final byte[] itemSketchBytes = s1.merge(s2).toByteArray(serDe);
>      return new Tuple2<>(hllSketchBytes, itemSketchBytes);
>    }
> }
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-use-stream-API-with-this-program-td36715.html#a36767
>
> On Mon, Jul 20, 2020 at 6:32 PM Aljoscha Krettek <[hidden email]>
> wrote:
>
>> What are you trying to do in the ReduceFunction? Without knowing the
>> code, maybe an aggregate(AggregateFunction) is the solution.
>>
>> Best,
>> Aljoscha
>>
>> On 20.07.20 18:03, Flavio Pompermaier wrote:
>>> Thanks Aljosha for the reply. So what can I do in my reduce function that
>>> contains transient variables (i.e. not serializable)?
>>>
>>> On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek <[hidden email]>
>>> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> the reason is that under the covers the ReduceFunction will be used as
>>>> the ReduceFunction of a ReducingState. And those cannot be rich
>>>> functions because we cannot provide all the required context "inside"
>>>> the state backend.
>>>>
>>>> You can see how the ReduceFunction is used to create a
>>>> ReducingStateDescriptor here:
>>>>
>>>>
>> https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 16.07.20 16:28, Flavio Pompermaier wrote:
>>>>> Hi to all,
>>>>> I'm trying to apply a rich reduce function after a countWindowAll but
>>>> Flink
>>>>> says
>>>>> "ReduceFunction of reduce can not be a RichFunction. Please use
>>>>> reduce(ReduceFunction, WindowFunction) instead."
>>>>>
>>>>> Is there any good reason for this? Or am I doing something wrong?
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: AllwindowStream and RichReduceFunction

Flavio Pompermaier
Ok thanks for the suggestion but I think I'll wait for another Flink version before migrating Datasets to Datastream I think...
In my experience it is  very helpful to have open/close on all operators.

Best,
Flavio

On Tue, Jul 28, 2020 at 8:51 AM Aljoscha Krettek <[hidden email]> wrote:
I think that should work with an aggregate() instead of reduce().

Best,
Aljoscha

On 24.07.20 17:02, Flavio Pompermaier wrote:
> In my reduce function I want to compute some aggregation on the sub-results
> of a map-partition (that I tried to migrate from DataSet to DataStream
> without success).
> The original code was something like:
>
>   input.mapPartition(new RowToStringSketches(sketchMapSize)) //
>          .reduce(new SketchesStringReducer()) //
>          .map(new SketchesStringToStatsPojo(colIndex, topK));
>
> I asked about the simulation of the mapPartition function in the streaming
> env in another thread in the mailing list [1] because I was not able to
> test it..it seems that the program was exiting before be able to process
> anything..
> So I gave up on replacing DataSet with DataStream API for the moment..it
> seems that there are too many things still to migrate.
> Btw, this is the reduce function:
>
> public class SketchesStringReducer extends
> RichReduceFunction<Tuple2<byte[], byte[]>> {
>    private static final long serialVersionUID = 1L;
>
>    private transient ArrayOfItemsSerDe<String> serDe;
>
>    @Override
>    public void open(Configuration parameters) throws Exception {
>      this.serDe = new ArrayOfStringsSerDe();
>    }
>
>    @Override
>    public Tuple2<byte[], byte[]> reduce(Tuple2<byte[], byte[]> t1,
> Tuple2<byte[], byte[]> t2)
>        throws Exception {
>      // merge HLL
>      final HllSketch hll1 = HllSketch.heapify(Memory.wrap(t1.f0));
>      final HllSketch hll2 = HllSketch.heapify(Memory.wrap(t2.f0));
>      final Union union = new Union(hll1.getLgConfigK());
>      union.update(hll1);
>      union.update(hll2);
>      final byte[] hllSketchBytes = union.getResult().toCompactByteArray();
>
>      // merge Item
>      final ItemsSketch<String> s1 =
> ItemsSketch.getInstance(Memory.wrap(t1.f1), serDe);
>      final ItemsSketch<String> s2 =
> ItemsSketch.getInstance(Memory.wrap(t2.f1), serDe);
>      final byte[] itemSketchBytes = s1.merge(s2).toByteArray(serDe);
>      return new Tuple2<>(hllSketchBytes, itemSketchBytes);
>    }
> }
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-use-stream-API-with-this-program-td36715.html#a36767
>
> On Mon, Jul 20, 2020 at 6:32 PM Aljoscha Krettek <[hidden email]>
> wrote:
>
>> What are you trying to do in the ReduceFunction? Without knowing the
>> code, maybe an aggregate(AggregateFunction) is the solution.
>>
>> Best,
>> Aljoscha
>>
>> On 20.07.20 18:03, Flavio Pompermaier wrote:
>>> Thanks Aljosha for the reply. So what can I do in my reduce function that
>>> contains transient variables (i.e. not serializable)?
>>>
>>> On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek <[hidden email]>
>>> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> the reason is that under the covers the ReduceFunction will be used as
>>>> the ReduceFunction of a ReducingState. And those cannot be rich
>>>> functions because we cannot provide all the required context "inside"
>>>> the state backend.
>>>>
>>>> You can see how the ReduceFunction is used to create a
>>>> ReducingStateDescriptor here:
>>>>
>>>>
>> https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 16.07.20 16:28, Flavio Pompermaier wrote:
>>>>> Hi to all,
>>>>> I'm trying to apply a rich reduce function after a countWindowAll but
>>>> Flink
>>>>> says
>>>>> "ReduceFunction of reduce can not be a RichFunction. Please use
>>>>> reduce(ReduceFunction, WindowFunction) instead."
>>>>>
>>>>> Is there any good reason for this? Or am I doing something wrong?
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>
>>>
>>
>