Re: Need help using AggregateFunction instead of FoldFunction

Posted by vino yang on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Need-help-using-AggregateFunction-instead-of-FoldFunction-tp31421p31425.html

Hi devinbost,

Sharing two example links with you :

  • the example code of official documentation[1];
  • a StackOverflow answer of a similar question[2];
[1]: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#aggregatefunction
[2]: https://stackoverflow.com/questions/47123785/flink-how-to-convert-the-deprecated-fold-to-aggregrate

I hope these resources are helpful to you.

Best,
Vino

devinbost <[hidden email]> 于2019年12月5日周四 上午9:38写道:
Hi,

In my use case, I am attempting to create a keyedStream (on a string) and
then window that stream (which represents keyed JSON objects) with
EventTimeSessionWindows (so that I have a separate window for each set of
JSON messages, according to the key), and then concatenate the JSON objects
by their keys. (e.g. If message1, message2, and message3 all have the same
key, they should be concatenated to a JSON array like: [message1,message2,
message3].)

I think my code expresses my intent conceptually, but I learned that Fold is
deprecated because it can't perform partial aggregations. Instead, I need to
use the AggregateFunction, but I'm having trouble understanding the API
documentation. How do I convert this code to an implementation that uses the
AggregateFunction instead?

DataStream<String> combinedEnvelopes = dataStream
    .map(new MapFunction<String, Tuple2&lt;String, JSONObject>>() {
        @Override
        public Tuple2 map(String incomingMessage) throws Exception {
            return mapToTuple(incomingMessage);
        }
    })
    .keyBy(0)
    .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
    .fold("[", new FoldFunction<Tuple2&lt;String, JSONObject>, String>() {
        @Override
        public String fold(String concatenatedJsonArray, Tuple2
incomingMessage) {
            return concatenatedJsonArray + ", " +
incomingMessage.f1.toString();
        }
    })
    .map(new MapFunction<String, String>() {
        @Override
        public String map(String jsonPartialArray) throws Exception {
            return jsonPartialArray + "]";
        }
    })
    .returns(String.class);




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/