Re: Need help using AggregateFunction instead of FoldFunction

Posted by devinbost on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Need-help-using-AggregateFunction-instead-of-FoldFunction-tp31421p31478.html

I think there might be a bug in
`.window(EventTimeSessionWindows.withGap(Time.seconds(5)))`
 (unless I'm just not using it correctly) because I'm able to get output
when I use the simpler window
`.timeWindow(Time.seconds(5))`
However, I don't get any output when I used the session-based window.


devinbost wrote

> I added logging statements everywhere in my code, and I'm able to see my
> message reach the `add` method in the AggregateFunction that I
> implemented,
> but the getResult method is never called.
>
> In the code below, I also never see the:
>  "Ran dataStream. Adding sink next"
> line appear in my log, and the only log statements from the
> JsonConcatenator
> class come from the `add` method, as shown below.
>
>
> DataStream
> <String>
>  combinedEnvelopes = dataStream
>     .map(new MapFunction&lt;String, Tuple2&amp;lt;String, String&gt;>() {
>         @Override
>         public Tuple2 map(String incomingMessage) throws Exception {
>             return mapToTuple(incomingMessage);
>         }
>     })
>     .keyBy(0)
>     .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
>     .aggregate(new JsonConcatenator());
>
> Logger logger = LoggerFactory.getLogger(StreamJob.class);
> logger.info("Ran dataStream. Adding sink next")
>
> -------------
>
> private static class JsonConcatenator
>         implements AggregateFunction&lt;Tuple2&amp;lt;String, String&gt;,
> Tuple2&lt;String, String&gt;, String> {
>     Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
>     @Override
>     public Tuple2&lt;String, String&gt; createAccumulator() {
>         return new Tuple2&lt;String, String&gt;("","");
>     }
>
>     @Override
>     public Tuple2&lt;String, String&gt; add(Tuple2&lt;String, String&gt;
> value,
> Tuple2&lt;String, String&gt; accumulator) {
>         logger.info("Running Add on value.f0: " + value.f0 + " and
> value.f1:
> " + value.f1);
>         return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
>     }
>
>     @Override
>     public String getResult(Tuple2&lt;String, String&gt; accumulator) {
>         logger.info("Running getResult on accumulator.f1: " +
> accumulator.f1);
>         return "[" + accumulator.f1 + "]";
>     }
>
>     @Override
>     public Tuple2&lt;String, String&gt; merge(Tuple2&lt;String, String&gt;
> a,
> Tuple2&lt;String, String&gt; b) {
>         logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " +
> a.f1
> + " and b.f1: " + b.f1);
>         return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
>     }
> }
>
>
>
>
> Any ideas?
>
>
> Chris Miller-2 wrote
>> I hit the same problem, as far as I can tell it should be fixed in
>> Pulsar 2.4.2. The release of this has already passed voting so I hope it
>> should be available in a day or two.
>>
>> https://github.com/apache/pulsar/pull/5068
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/