Re: window function not working when control stream broadcast

Posted by Aljoscha Krettek on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/window-function-not-working-when-control-stream-broadcast-tp12093p12184.html

Hi Sam,
could you please also send the code of the timestamp/watermark assigner? This could also affect things.

Best,
Aljoscha


On Thu, Mar 9, 2017, at 19:58, Sam Huang wrote:
Hi Aljoscha,

Here's the code:
private static class DataFilterFunImpl extends RichCoFlatMapFunction<KVTuple6, String, KVTuple6> {
private JSONParser parser;
private Map<String, Map<String, ControlJsonConfig>> whiteListMap = new HashMap<>();

@Override
// tuple5(domain, device_type, type, key, count_or_sum)
public void flatMap1(KVTuple6 dataTuple, Collector<KVTuple6> collector) throws Exception {
String type = dataTuple.f2;
String[] keyValue = dataTuple.f3.split(RawEventExtractor.Constants.DEFAULT_VALUE_SP);
String key = keyValue[0];
switch (type) {
case RawEventExtractor.Constants.VALUE_COUNT: {
if (whiteListMap.containsKey(key)) {
ControlJsonConfig ruleConfig = whiteListMap.get(key).get(RawEventExtractor.Constants.VALUE_COUNT);
if (ruleConfig != null) {
String value = keyValue.length > 1 ? keyValue[1] : "";
String bucket = ruleConfig.getBucketName(value);
if (bucket != null) {
dataTuple.setField(String.join(RawEventExtractor.Constants.DEFAULT_VALUE_SP, key, bucket), 3);
collector.collect(dataTuple);
}
} else {
collector.collect(dataTuple);
}
}
break;
}
case RawEventExtractor.Constants.VALUE_SUM: {
if (whiteListMap.containsKey(key) && whiteListMap.get(key).containsKey(RawEventExtractor.Constants.VALUE_SUM)) {
collector.collect(dataTuple);
}
break;
}
default: collector.collect(dataTuple);
}
}


@Override
public void flatMap2(String jsonStr, Collector<KVTuple6> collector) throws Exception {
// Map<String, Map<String, ControlJsonConfig>> whiteListMap = whiteListMapState.value();
try {
if (parser == null) {
parser = new JSONParser();
}
JSONObject jsonConfig = (JSONObject) parser.parse(jsonStr);
Tuple2<String, Map<String, ControlJsonConfig>> config = RawEventExtractor.getKeyConfig(jsonConfig);
if (config.f1 == null) {
whiteListMap.remove(config.f0);
} else {
whiteListMap.put(config.f0, config.f1);
}
} catch (Exception e) {}
}
}

FYI, if I setParallelism of both the control stream and data stream, the window function works. Is it necessary to do so for broadcast() function?


On Thu, Mar 9, 2017 at 2:26 AM, Aljoscha Krettek <[hidden email]> wrote:

Hi Sam,
could you please also send the code for the DataFilterFunImpl and your timestamps/watermark assigner. That could help in figuring out the problem.

Best,
Aljoscha


On Wed, Mar 8, 2017, at 19:56, Sam Huang wrote:
Hi Timo,

The window function sinks the data into InfluxDB, and it's not triggered.
If I comment the ".timeWindow", and print results after the reduce function, it works
Code for window function is here:

private static class WindowFunImpl implements WindowFunction<KVTuple6,Point,Tuple,TimeWindow> {

@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<KVTuple6> iterable,
Collector<Point> collector) throws Exception {

KVTuple6 kvTypeTuple = iterable.iterator().next();
System.out.println("window: " + kvTypeTuple); // Doesn't work here if use broadcast
Point.Builder builder = Point.measurement(INFLUXDB_MEASUREMENT)

.time(window.getStart(), TimeUnit.MILLISECONDS)

.tag(TAG_DOMAIN, kvTypeTuple.f0)

.tag(TAG_DEVICE, kvTypeTuple.f1)

.tag(TAG_TYPE, kvTypeTuple.f2)

.tag(TAG_KEY, kvTypeTuple.f3)

.addField(FIELD, kvTypeTuple.f4);

collector.collect(builder.build());
}

}


On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther <[hidden email]> wrote:
Hi Sam,

could you explain the behavior a bit more? How does the window function behave? Is it not triggered or what is the content? What is the result if you don't use a window function?

Timo


Am 08/03/17 um 02:59 schrieb Sam Huang:

btw, the reduce function works well, I've printed out the data, and they are
all correct. So are the timestamps and watermarks. And if I remove
".broadcast()", the data is successfully sinked.

Any help?



--
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.