Posted by
devinbost on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Need-help-using-AggregateFunction-instead-of-FoldFunction-tp31421p31463.html
They released Pulsar 2.4.2, and I was able to pull its dependencies and
successfully submit the Flink job.
It's able to receive messages from the Pulsar topic successfully. However, I
still don't think I'm using the AggregateFunction correctly.
I added logging statements everywhere in my code, and I'm able to see my
message reach the `add` method in the AggregateFunction that I implemented,
but the getResult method is never called.
In the code below, I also never see the:
"Ran dataStream. Adding sink next"
line appear in my log, and the only log statements from the JsonConcatenator
class come from the `add` method, as shown below.
DataStream<String> combinedEnvelopes = dataStream
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2 map(String incomingMessage) throws Exception {
return mapToTuple(incomingMessage);
}
})
.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.seconds(20)))
.aggregate(new JsonConcatenator());
Logger logger = LoggerFactory.getLogger(StreamJob.class);
logger.info("Ran dataStream. Adding sink next")
-------------
private static class JsonConcatenator
implements AggregateFunction<Tuple2<String, String>,
Tuple2<String, String>, String> {
Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
@Override
public Tuple2<String, String> createAccumulator() {
return new Tuple2<String, String>("","");
}
@Override
public Tuple2<String, String> add(Tuple2<String, String> value,
Tuple2<String, String> accumulator) {
logger.info("Running Add on value.f0: " + value.f0 + " and value.f1:
" + value.f1);
return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
}
@Override
public String getResult(Tuple2<String, String> accumulator) {
logger.info("Running getResult on accumulator.f1: " +
accumulator.f1);
return "[" + accumulator.f1 + "]";
}
@Override
public Tuple2<String, String> merge(Tuple2<String, String> a,
Tuple2<String, String> b) {
logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " + a.f1
+ " and b.f1: " + b.f1);
return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
}
}
Any ideas?
Chris Miller-2 wrote
> I hit the same problem, as far as I can tell it should be fixed in
> Pulsar 2.4.2. The release of this has already passed voting so I hope it
> should be available in a day or two.
>
>
https://github.com/apache/pulsar/pull/5068--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/