http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Need-help-using-AggregateFunction-instead-of-FoldFunction-tp31421p31478.html
However, I don't get any output when I used the session-based window.
> I added logging statements everywhere in my code, and I'm able to see my
> message reach the `add` method in the AggregateFunction that I
> implemented,
> but the getResult method is never called.
>
> In the code below, I also never see the:
> "Ran dataStream. Adding sink next"
> line appear in my log, and the only log statements from the
> JsonConcatenator
> class come from the `add` method, as shown below.
>
>
> DataStream
> <String>
> combinedEnvelopes = dataStream
> .map(new MapFunction<String, Tuple2&lt;String, String>>() {
> @Override
> public Tuple2 map(String incomingMessage) throws Exception {
> return mapToTuple(incomingMessage);
> }
> })
> .keyBy(0)
> .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
> .aggregate(new JsonConcatenator());
>
> Logger logger = LoggerFactory.getLogger(StreamJob.class);
> logger.info("Ran dataStream. Adding sink next")
>
> -------------
>
> private static class JsonConcatenator
> implements AggregateFunction<Tuple2&lt;String, String>,
> Tuple2<String, String>, String> {
> Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
> @Override
> public Tuple2<String, String> createAccumulator() {
> return new Tuple2<String, String>("","");
> }
>
> @Override
> public Tuple2<String, String> add(Tuple2<String, String>
> value,
> Tuple2<String, String> accumulator) {
> logger.info("Running Add on value.f0: " + value.f0 + " and
> value.f1:
> " + value.f1);
> return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
> }
>
> @Override
> public String getResult(Tuple2<String, String> accumulator) {
> logger.info("Running getResult on accumulator.f1: " +
> accumulator.f1);
> return "[" + accumulator.f1 + "]";
> }
>
> @Override
> public Tuple2<String, String> merge(Tuple2<String, String>
> a,
> Tuple2<String, String> b) {
> logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " +
> a.f1
> + " and b.f1: " + b.f1);
> return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
> }
> }
>
>
>
>
> Any ideas?
>
>
> Chris Miller-2 wrote
>> I hit the same problem, as far as I can tell it should be fixed in
>> Pulsar 2.4.2. The release of this has already passed voting so I hope it
>> should be available in a day or two.
>>
>>
https://github.com/apache/pulsar/pull/5068>
>
>
>
>
> --
> Sent from:
>
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/