Posted by
devinbost on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Need-help-using-AggregateFunction-instead-of-FoldFunction-tp31421p31428.html
Thanks for the help.
I was able to make more progress (based on the documentation you provided),
but now I'm getting this exception:
org.apache.pulsar.client.impl.DefaultBatcherBuilder@3b5fad2d is not
serializable. The object probably contains or references non serializable
fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
Here's my code now:
DataStream<String> combinedEnvelopes = dataStream
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2 map(String incomingMessage) throws Exception {
return mapToTuple(incomingMessage);
}
})
.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.seconds(20)))
.aggregate(new JsonConcatenator())
.returns(String.class);
Here's the JsonConcatenator that I'm referencing above:
private static class JsonConcatenator
implements AggregateFunction<Tuple2<String, String>, Tuple2<String,
String>, String> {
@Override
public Tuple2<String, String> createAccumulator() {
return new Tuple2<String, String>("","");
}
@Override
public Tuple2<String, String> add(Tuple2<String, String> value,
Tuple2<String, String> accumulator) {
return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
}
@Override
public String getResult(Tuple2<String, String> accumulator) {
return "[" + accumulator.f1 + "]";
}
@Override
public Tuple2<String, String> merge(Tuple2<String, String> a,
Tuple2<String, String> b) {
return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
}
}
vino yang wrote
--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/