window function not working when control stream broadcast

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

window function not working when control stream broadcast

Sam Huang
Hi all,

I connected my data stream with my control stream and create event time tumbling window, everything works fine. But when I add .broadcast() function to the control stream, the window function doesn't work anymore.

I'm running that on my local, the code is here:

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = FlinkUtil.createExecutionEnvironment(args);
DataStream<String> jsonEventStream = JsonEventStreamReader.readStream(env);
DataStream<String> controlStream = JsonEventStreamReader.readControlStream(env);//.broadcast(); With broadcast, window function doesn't work
jsonEventStream
.flatMap(new StrToTuplesFlatMapFunImpl())
.connect(controlStream)
.flatMap(new DataFilterFunImpl())
.assignTimestampsAndWatermarks(getTimestampsWatermarksAssigner())
.keyBy(0, 1, 2, 3)
.timeWindow(Time.seconds(WINDOW_LENGTH))
.allowedLateness(Time.seconds(WINDOW_LATENESS))
.reduce(new ReduceFunImpl(), new WindowFunImpl())
.addSink(new InfluxDBSink(INFLUXDB_DB));

env.execute();
}
Reply | Threaded
Open this post in threaded view
|

Re: window function not working when control stream broadcast

Sam Huang
btw, the reduce function works well, I've printed out the data, and they are all correct. So are the timestamps and watermarks. And if I remove ".broadcast()", the data is successfully sinked.

Any help?
Reply | Threaded
Open this post in threaded view
|

Re: window function not working when control stream broadcast

Timo Walther
Hi Sam,

could you explain the behavior a bit more? How does the window function
behave? Is it not triggered or what is the content? What is the result
if you don't use a window function?

Timo


Am 08/03/17 um 02:59 schrieb Sam Huang:

> btw, the reduce function works well, I've printed out the data, and they are
> all correct. So are the timestamps and watermarks. And if I remove
> ".broadcast()", the data is successfully sinked.
>
> Any help?
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/window-function-not-working-when-control-stream-broadcast-tp12093p12094.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: window function not working when control stream broadcast

Sam Huang
Hi Timo,

The window function sinks the data into InfluxDB, and it's not triggered.
If I comment the ".timeWindow", and print results after the reduce function, it works
Code for window function is here:

private static class WindowFunImpl implements WindowFunction<KVTuple6,Point,Tuple,TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<KVTuple6> iterable,
Collector<Point> collector) throws Exception {
KVTuple6 kvTypeTuple = iterable.iterator().next();
System.out.println("window: " + kvTypeTuple); // Doesn't work here if use broadcast
Point.Builder builder = Point.measurement(INFLUXDB_MEASUREMENT)
.time(window.getStart(), TimeUnit.MILLISECONDS)
.tag(TAG_DOMAIN, kvTypeTuple.f0)
.tag(TAG_DEVICE, kvTypeTuple.f1)
.tag(TAG_TYPE, kvTypeTuple.f2)
.tag(TAG_KEY, kvTypeTuple.f3)
.addField(FIELD, kvTypeTuple.f4);

collector.collect(builder.build());
}
}

On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther <[hidden email]> wrote:
Hi Sam,

could you explain the behavior a bit more? How does the window function behave? Is it not triggered or what is the content? What is the result if you don't use a window function?

Timo


Am 08/03/17 um 02:59 schrieb Sam Huang:

btw, the reduce function works well, I've printed out the data, and they are
all correct. So are the timestamps and watermarks. And if I remove
".broadcast()", the data is successfully sinked.

Any help?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/window-function-not-working-when-control-stream-broadcast-tp12093p12094.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



Reply | Threaded
Open this post in threaded view
|

Re: window function not working when control stream broadcast

Aljoscha Krettek
Hi Sam,
could you please also send the code for the DataFilterFunImpl and your timestamps/watermark assigner. That could help in figuring out the problem.

Best,
Aljoscha


On Wed, Mar 8, 2017, at 19:56, Sam Huang wrote:
Hi Timo,

The window function sinks the data into InfluxDB, and it's not triggered.
If I comment the ".timeWindow", and print results after the reduce function, it works
Code for window function is here:

private static class WindowFunImpl implements WindowFunction<KVTuple6,Point,Tuple,TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<KVTuple6> iterable,
Collector<Point> collector) throws Exception {
KVTuple6 kvTypeTuple = iterable.iterator().next();
System.out.println("window: " + kvTypeTuple); // Doesn't work here if use broadcast
Point.Builder builder = Point.measurement(INFLUXDB_MEASUREMENT)
.time(window.getStart(), TimeUnit.MILLISECONDS)
.tag(TAG_DOMAIN, kvTypeTuple.f0)
.tag(TAG_DEVICE, kvTypeTuple.f1)
.tag(TAG_TYPE, kvTypeTuple.f2)
.tag(TAG_KEY, kvTypeTuple.f3)
.addField(FIELD, kvTypeTuple.f4);

collector.collect(builder.build());
}
}

On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther <[hidden email]> wrote:
Hi Sam,

could you explain the behavior a bit more? How does the window function behave? Is it not triggered or what is the content? What is the result if you don't use a window function?

Timo


Am 08/03/17 um 02:59 schrieb Sam Huang:

btw, the reduce function works well, I've printed out the data, and they are
all correct. So are the timestamps and watermarks. And if I remove
".broadcast()", the data is successfully sinked.

Any help?



--
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: window function not working when control stream broadcast

Sam Huang
Hi Aljoscha,

Here's the code:
private static class DataFilterFunImpl extends RichCoFlatMapFunction<KVTuple6, String, KVTuple6> {
private JSONParser parser;
private Map<String, Map<String, ControlJsonConfig>> whiteListMap = new HashMap<>();

@Override
// tuple5(domain, device_type, type, key, count_or_sum)
public void flatMap1(KVTuple6 dataTuple, Collector<KVTuple6> collector) throws Exception {
String type = dataTuple.f2;
String[] keyValue = dataTuple.f3.split(RawEventExtractor.Constants.DEFAULT_VALUE_SP);
String key = keyValue[0];
switch (type) {
case RawEventExtractor.Constants.VALUE_COUNT: {
if (whiteListMap.containsKey(key)) {
ControlJsonConfig ruleConfig = whiteListMap.get(key).get(RawEventExtractor.Constants.VALUE_COUNT);
if (ruleConfig != null) {
String value = keyValue.length > 1 ? keyValue[1] : "";
String bucket = ruleConfig.getBucketName(value);
if (bucket != null) {
dataTuple.setField(String.join(RawEventExtractor.Constants.DEFAULT_VALUE_SP, key, bucket), 3);
collector.collect(dataTuple);
}
} else {
collector.collect(dataTuple);
}
}
break;
}
case RawEventExtractor.Constants.VALUE_SUM: {
if (whiteListMap.containsKey(key) && whiteListMap.get(key).containsKey(RawEventExtractor.Constants.VALUE_SUM)) {
collector.collect(dataTuple);
}
break;
}
default: collector.collect(dataTuple);
}
}


@Override
public void flatMap2(String jsonStr, Collector<KVTuple6> collector) throws Exception {
// Map<String, Map<String, ControlJsonConfig>> whiteListMap = whiteListMapState.value();
try {
if (parser == null) {
parser = new JSONParser();
}
JSONObject jsonConfig = (JSONObject) parser.parse(jsonStr);
Tuple2<String, Map<String, ControlJsonConfig>> config = RawEventExtractor.getKeyConfig(jsonConfig);
if (config.f1 == null) {
whiteListMap.remove(config.f0);
} else {
whiteListMap.put(config.f0, config.f1);
}
} catch (Exception e) {}
}
}

FYI, if I setParallelism of both the control stream and data stream, the window function works. Is it necessary to do so for broadcast() function?


On Thu, Mar 9, 2017 at 2:26 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi Sam,
could you please also send the code for the DataFilterFunImpl and your timestamps/watermark assigner. That could help in figuring out the problem.

Best,
Aljoscha


On Wed, Mar 8, 2017, at 19:56, Sam Huang wrote:
Hi Timo,

The window function sinks the data into InfluxDB, and it's not triggered.
If I comment the ".timeWindow", and print results after the reduce function, it works
Code for window function is here:

private static class WindowFunImpl implements WindowFunction<KVTuple6,Point,Tuple,TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<KVTuple6> iterable,
Collector<Point> collector) throws Exception {
KVTuple6 kvTypeTuple = iterable.iterator().next();
System.out.println("window: " + kvTypeTuple); // Doesn't work here if use broadcast
Point.Builder builder = Point.measurement(INFLUXDB_MEASUREMENT)
.time(window.getStart(), TimeUnit.MILLISECONDS)
.tag(TAG_DOMAIN, kvTypeTuple.f0)
.tag(TAG_DEVICE, kvTypeTuple.f1)
.tag(TAG_TYPE, kvTypeTuple.f2)
.tag(TAG_KEY, kvTypeTuple.f3)
.addField(FIELD, kvTypeTuple.f4);

collector.collect(builder.build());
}
}

On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther <[hidden email]> wrote:
Hi Sam,

could you explain the behavior a bit more? How does the window function behave? Is it not triggered or what is the content? What is the result if you don't use a window function?

Timo


Am 08/03/17 um 02:59 schrieb Sam Huang:

btw, the reduce function works well, I've printed out the data, and they are
all correct. So are the timestamps and watermarks. And if I remove
".broadcast()", the data is successfully sinked.

Any help?



--
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



Reply | Threaded
Open this post in threaded view
|

Re: window function not working when control stream broadcast

Aljoscha Krettek
Hi Sam,
could you please also send the code of the timestamp/watermark assigner? This could also affect things.

Best,
Aljoscha


On Thu, Mar 9, 2017, at 19:58, Sam Huang wrote:
Hi Aljoscha,

Here's the code:
private static class DataFilterFunImpl extends RichCoFlatMapFunction<KVTuple6, String, KVTuple6> {
private JSONParser parser;
private Map<String, Map<String, ControlJsonConfig>> whiteListMap = new HashMap<>();

@Override
// tuple5(domain, device_type, type, key, count_or_sum)
public void flatMap1(KVTuple6 dataTuple, Collector<KVTuple6> collector) throws Exception {
String type = dataTuple.f2;
String[] keyValue = dataTuple.f3.split(RawEventExtractor.Constants.DEFAULT_VALUE_SP);
String key = keyValue[0];
switch (type) {
case RawEventExtractor.Constants.VALUE_COUNT: {
if (whiteListMap.containsKey(key)) {
ControlJsonConfig ruleConfig = whiteListMap.get(key).get(RawEventExtractor.Constants.VALUE_COUNT);
if (ruleConfig != null) {
String value = keyValue.length > 1 ? keyValue[1] : "";
String bucket = ruleConfig.getBucketName(value);
if (bucket != null) {
dataTuple.setField(String.join(RawEventExtractor.Constants.DEFAULT_VALUE_SP, key, bucket), 3);
collector.collect(dataTuple);
}
} else {
collector.collect(dataTuple);
}
}
break;
}
case RawEventExtractor.Constants.VALUE_SUM: {
if (whiteListMap.containsKey(key) && whiteListMap.get(key).containsKey(RawEventExtractor.Constants.VALUE_SUM)) {
collector.collect(dataTuple);
}
break;
}
default: collector.collect(dataTuple);
}
}


@Override
public void flatMap2(String jsonStr, Collector<KVTuple6> collector) throws Exception {
// Map<String, Map<String, ControlJsonConfig>> whiteListMap = whiteListMapState.value();
try {
if (parser == null) {
parser = new JSONParser();
}
JSONObject jsonConfig = (JSONObject) parser.parse(jsonStr);
Tuple2<String, Map<String, ControlJsonConfig>> config = RawEventExtractor.getKeyConfig(jsonConfig);
if (config.f1 == null) {
whiteListMap.remove(config.f0);
} else {
whiteListMap.put(config.f0, config.f1);
}
} catch (Exception e) {}
}
}

FYI, if I setParallelism of both the control stream and data stream, the window function works. Is it necessary to do so for broadcast() function?


On Thu, Mar 9, 2017 at 2:26 AM, Aljoscha Krettek <[hidden email]> wrote:

Hi Sam,
could you please also send the code for the DataFilterFunImpl and your timestamps/watermark assigner. That could help in figuring out the problem.

Best,
Aljoscha


On Wed, Mar 8, 2017, at 19:56, Sam Huang wrote:
Hi Timo,

The window function sinks the data into InfluxDB, and it's not triggered.
If I comment the ".timeWindow", and print results after the reduce function, it works
Code for window function is here:

private static class WindowFunImpl implements WindowFunction<KVTuple6,Point,Tuple,TimeWindow> {

@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<KVTuple6> iterable,
Collector<Point> collector) throws Exception {

KVTuple6 kvTypeTuple = iterable.iterator().next();
System.out.println("window: " + kvTypeTuple); // Doesn't work here if use broadcast
Point.Builder builder = Point.measurement(INFLUXDB_MEASUREMENT)

.time(window.getStart(), TimeUnit.MILLISECONDS)

.tag(TAG_DOMAIN, kvTypeTuple.f0)

.tag(TAG_DEVICE, kvTypeTuple.f1)

.tag(TAG_TYPE, kvTypeTuple.f2)

.tag(TAG_KEY, kvTypeTuple.f3)

.addField(FIELD, kvTypeTuple.f4);

collector.collect(builder.build());
}

}


On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther <[hidden email]> wrote:
Hi Sam,

could you explain the behavior a bit more? How does the window function behave? Is it not triggered or what is the content? What is the result if you don't use a window function?

Timo


Am 08/03/17 um 02:59 schrieb Sam Huang:

btw, the reduce function works well, I've printed out the data, and they are
all correct. So are the timestamps and watermarks. And if I remove
".broadcast()", the data is successfully sinked.

Any help?



--
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.