Re: Need help using AggregateFunction instead of FoldFunction

Posted by devinbost on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Need-help-using-AggregateFunction-instead-of-FoldFunction-tp31421p31543.html

I did confirm that I got no resulting output after 20 seconds and after sending additional data after waiting over a minute between batches of data. 

My code looks like this:

PulsarSourceBuilder<String> builder = PulsarSourceBuilder
.builder(new SimpleStringSchema())
.serviceUrl(SERVICE_URL)
.topic(INPUT_TOPIC)
.subscriptionName(SUBSCRIPTION_NAME);
SourceFunction<String> src = builder.build();
DataStream<String> dataStream = env.addSource(src);

DataStream<String> combinedEnvelopes = dataStream
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2 map(String incomingMessage) throws Exception {
return mapToTuple(incomingMessage);
}
})
.keyBy(0)
//.timeWindow(Time.seconds(5))
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.aggregate(new JsonConcatenator());
//dataStream.print();

Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
logger.info("Ran dataStream. Adding sink next");
combinedEnvelopes.addSink(new FlinkPulsarProducer<>(
SERVICE_URL,
OUTPUT_TOPIC,
new AuthenticationDisabled(), // probably need to fix // AuthenticationTls()
combinedData -> combinedData.toString().getBytes(UTF_8),
combinedData -> "test")
);
logger.info("Added sink. Executing job.");
// execute program
env.execute("Flink Streaming Java API Skeleton");

Here is the JsonConcatenator class:

private static class JsonConcatenator
implements AggregateFunction<Tuple2<String, String>, Tuple2<String, String>, String> {
Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
@Override
public Tuple2<String, String> createAccumulator() {
return new Tuple2<String, String>("","");
}

@Override
public Tuple2<String, String> add(Tuple2<String, String> value, Tuple2<String, String> accumulator) {
logger.info("Running Add on value.f0: " + value.f0 + " and value.f1: " + value.f1);
return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
}

@Override
public String getResult(Tuple2<String, String> accumulator) {
logger.info("Running getResult on accumulator.f1: " + accumulator.f1);
return "[" + accumulator.f1.substring(1) + "]";
}

@Override
public Tuple2<String, String> merge(Tuple2<String, String> a, Tuple2<String, String> b) {
// Merge is applied when you allow lateness.
logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " + a.f1 + " and b.f1: " + b.f1);
if(b.f1.charAt(0) == '['){
logger.info("During merge, we detected the right message starts with the '[' character. Removing it.");
b.f1 = b.f1.substring(1);
}
return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
}
}

Devin G. Bost

Re:
getResult will only be called when the window is triggered. For a fixed-time window, it triggers at the end of the window.
However, for EventTimeSessionWindows you need to have gaps in the data. Can you verify that there is actually a 20sec pause inbetween data points for your keys?
Additionally, it may also be an issue with extracting the event time from the sources. Could you post the relevant code as well?
Best,
Arvid

On Tue, Dec 10, 2019 at 3:22 AM Arvid Heise <[hidden email]> wrote:
getResult will only be called when the window is triggered. For a fixed-time window, it triggers at the end of the window.

However, for EventTimeSessionWindows you need to have gaps in the data. Can you verify that there is actually a 20sec pause inbetween data points for your keys?
Additionally, it may also be an issue with extracting the event time from the sources. Could you post the relevant code as well?

Best,

Arvid

On Mon, Dec 9, 2019 at 8:51 AM vino yang <[hidden email]> wrote:
Hi dev,

The time of the window may have different semantics.
In the session window, it's only a time gap, the size of the window is driven via activity events.
In the tumbling or sliding window, it means the size of the window.

For more details, please see the official documentation.[1]

Best,
Vino




devinbost <[hidden email]> 于2019年12月6日周五 下午10:39写道:
I think there might be a bug in
`.window(EventTimeSessionWindows.withGap(Time.seconds(5)))`
 (unless I'm just not using it correctly) because I'm able to get output
when I use the simpler window
`.timeWindow(Time.seconds(5))`
However, I don't get any output when I used the session-based window.


devinbost wrote
> I added logging statements everywhere in my code, and I'm able to see my
> message reach the `add` method in the AggregateFunction that I
> implemented,
> but the getResult method is never called.
>
> In the code below, I also never see the:
>  "Ran dataStream. Adding sink next"
> line appear in my log, and the only log statements from the
> JsonConcatenator
> class come from the `add` method, as shown below.
>
>
> DataStream
> <String>
>  combinedEnvelopes = dataStream
>     .map(new MapFunction&lt;String, Tuple2&amp;lt;String, String&gt;>() {
>         @Override
>         public Tuple2 map(String incomingMessage) throws Exception {
>             return mapToTuple(incomingMessage);
>         }
>     })
>     .keyBy(0)
>     .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
>     .aggregate(new JsonConcatenator());
>
> Logger logger = LoggerFactory.getLogger(StreamJob.class);
> logger.info("Ran dataStream. Adding sink next")
>
> -------------
>
> private static class JsonConcatenator
>         implements AggregateFunction&lt;Tuple2&amp;lt;String, String&gt;,
> Tuple2&lt;String, String&gt;, String> {
>     Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
>     @Override
>     public Tuple2&lt;String, String&gt; createAccumulator() {
>         return new Tuple2&lt;String, String&gt;("","");
>     }
>
>     @Override
>     public Tuple2&lt;String, String&gt; add(Tuple2&lt;String, String&gt;
> value,
> Tuple2&lt;String, String&gt; accumulator) {
>         logger.info("Running Add on value.f0: " + value.f0 + " and
> value.f1:
> " + value.f1);
>         return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
>     }
>
>     @Override
>     public String getResult(Tuple2&lt;String, String&gt; accumulator) {
>         logger.info("Running getResult on accumulator.f1: " +
> accumulator.f1);
>         return "[" + accumulator.f1 + "]";
>     }
>
>     @Override
>     public Tuple2&lt;String, String&gt; merge(Tuple2&lt;String, String&gt;
> a,
> Tuple2&lt;String, String&gt; b) {
>         logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " +
> a.f1
> + " and b.f1: " + b.f1);
>         return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
>     }
> }
>
>
>
>
> Any ideas?
>
>
> Chris Miller-2 wrote
>> I hit the same problem, as far as I can tell it should be fixed in
>> Pulsar 2.4.2. The release of this has already passed voting so I hope it
>> should be available in a day or two.
>>
>> https://github.com/apache/pulsar/pull/5068
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/