flink interval join后按窗口聚组问题

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

flink interval join后按窗口聚组问题

元始(Bob Hu)
您好,我想请教一个问题:
flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime + leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 + allowedLateness + 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group by的时候这种右表数据为空的数据就丢掉了啊。
flink版本 1.10.0。

下面是我的一段测试代码:
import org.apache.commons.net.ntp.TimeStamp;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.IOUtils;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class TimeBoundedJoin {

public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
AssignerWithPeriodicWatermarks<Row> timestampExtractor = new AssignerWithPeriodicWatermarks<Row>() {
private long currentMaxTimestamp = 0;
private long lastMaxTimestamp = 0;
private long lastUpdateTime = 0;
boolean firstWatermark = true;
// Integer maxIdleTime = 30;

@Override
public Watermark getCurrentWatermark() {
if(firstWatermark) {
lastUpdateTime = System.currentTimeMillis();
firstWatermark = false;
}
if(currentMaxTimestamp != lastMaxTimestamp) {
lastMaxTimestamp = currentMaxTimestamp;
lastUpdateTime = System.currentTimeMillis();
}
if(maxIdleTime != null && System.currentTimeMillis() - lastUpdateTime > maxIdleTime * 1000) {
return new Watermark(new Date().getTime() - finalMaxOutOfOrderness * 1000);
}
return new Watermark(currentMaxTimestamp - finalMaxOutOfOrderness * 1000);

}

@Override
public long extractTimestamp(Row row, long previousElementTimestamp) {
Object value = row.getField(1);
long timestamp;
try {
timestamp = (long)value;
} catch (Exception e) {
timestamp = ((Timestamp)value).getTime();
}
if(timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
};
return timestampExtractor;
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
bsEnv.setParallelism(1);
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


// DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
List<Row> list = new ArrayList<>();
list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 00:00:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:40:00").getTime()), 100));
list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 01:00:01").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:30:00").getTime()), 100));
list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 02:00:02").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()), 100));
list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()), 100));
list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 04:00:04").getTime()), 100));
DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() {
@Override
public void run(SourceContext<Row> ctx) throws Exception {
for(Row row : list) {
ctx.collect(row);
Thread.sleep(1000);
}

}

@Override
public void cancel() {

}
});
ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
bsTableEnv.createTemporaryView("order_info", ds1, "order_id, order_time, fee, rowtime.rowtime");

List<Row> list2 = new ArrayList<>();
list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 01:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:30:00").getTime())));
list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 02:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime())));
// list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime())));
list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 04:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:40:00").getTime())));
list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 05:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:40:00").getTime())));
DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() {
@Override
public void run(SourceContext<Row> ctx) throws Exception {
for(Row row : list2) {
ctx.collect(row);
Thread.sleep(1000);
}

}

@Override
public void cancel() {

}
});
ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, rowtime.rowtime");

Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id from order_info a left join pay b on a.order_id=b.order_id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' ");

bsTableEnv.toAppendStream(joinTable, Row.class).process(new ProcessFunction<Row, Object>() {
@Override
public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.err.println("row:" + value + ",rowtime:" + value.getField(3) + ",watermark:" + sdf.format(ctx.timerService().currentWatermark()));
}
});

bsTableEnv.execute("job");
}
}
Reply | Threaded
Open this post in threaded view
|

Re: flink interval join后按窗口聚组问题

Benchao Li-2
Hi Bob,

This is Flink user mailing list. Please send to this mailing list using english.
If you want to use Chinese, you can send it to [hidden email]

元始(Bob Hu) <[hidden email]> 于2020年7月3日周五 下午3:29写道:
您好,我想请教一个问题:
flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime + leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 + allowedLateness + 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group by的时候这种右表数据为空的数据就丢掉了啊。
flink版本 1.10.0。

下面是我的一段测试代码:
import org.apache.commons.net.ntp.TimeStamp;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.IOUtils;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class TimeBoundedJoin {

public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
AssignerWithPeriodicWatermarks<Row> timestampExtractor = new AssignerWithPeriodicWatermarks<Row>() {
private long currentMaxTimestamp = 0;
private long lastMaxTimestamp = 0;
private long lastUpdateTime = 0;
boolean firstWatermark = true;
// Integer maxIdleTime = 30;

@Override
public Watermark getCurrentWatermark() {
if(firstWatermark) {
lastUpdateTime = System.currentTimeMillis();
firstWatermark = false;
}
if(currentMaxTimestamp != lastMaxTimestamp) {
lastMaxTimestamp = currentMaxTimestamp;
lastUpdateTime = System.currentTimeMillis();
}
if(maxIdleTime != null && System.currentTimeMillis() - lastUpdateTime > maxIdleTime * 1000) {
return new Watermark(new Date().getTime() - finalMaxOutOfOrderness * 1000);
}
return new Watermark(currentMaxTimestamp - finalMaxOutOfOrderness * 1000);

}

@Override
public long extractTimestamp(Row row, long previousElementTimestamp) {
Object value = row.getField(1);
long timestamp;
try {
timestamp = (long)value;
} catch (Exception e) {
timestamp = ((Timestamp)value).getTime();
}
if(timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
};
return timestampExtractor;
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
bsEnv.setParallelism(1);
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


// DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
List<Row> list = new ArrayList<>();
list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 00:00:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:40:00").getTime()), 100));
list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 01:00:01").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:30:00").getTime()), 100));
list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 02:00:02").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()), 100));
list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()), 100));
list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 04:00:04").getTime()), 100));
DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() {
@Override
public void run(SourceContext<Row> ctx) throws Exception {
for(Row row : list) {
ctx.collect(row);
Thread.sleep(1000);
}

}

@Override
public void cancel() {

}
});
ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
bsTableEnv.createTemporaryView("order_info", ds1, "order_id, order_time, fee, rowtime.rowtime");

List<Row> list2 = new ArrayList<>();
list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 01:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:30:00").getTime())));
list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 02:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime())));
// list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime())));
list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 04:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:40:00").getTime())));
list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 05:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:40:00").getTime())));
DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() {
@Override
public void run(SourceContext<Row> ctx) throws Exception {
for(Row row : list2) {
ctx.collect(row);
Thread.sleep(1000);
}

}

@Override
public void cancel() {

}
});
ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, rowtime.rowtime");

Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id from order_info a left join pay b on a.order_id=b.order_id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' ");

bsTableEnv.toAppendStream(joinTable, Row.class).process(new ProcessFunction<Row, Object>() {
@Override
public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.err.println("row:" + value + ",rowtime:" + value.getField(3) + ",watermark:" + sdf.format(ctx.timerService().currentWatermark()));
}
});

bsTableEnv.execute("job");
}
}


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: flink interval join后按窗口聚组问题

Danny Chan
In reply to this post by 元始(Bob Hu)
For SQL, we always hold back the watermark when we emit the elements, for time interval:

return Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness;

For your case, the watermark would hold back for 1 hour, so the left join records would not delay when it is used by subsequent operators.

See KeyedCoProcessOperatorWithWatermarkDelay and RowTimeIntervalJoin.getMaxOutputDelay for details.

Best,
Danny Chan
在 2020年7月3日 +0800 PM3:29,元始(Bob Hu) <[hidden email]>,写道:
您好,我想请教一个问题:
flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime + leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 + allowedLateness + 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group by的时候这种右表数据为空的数据就丢掉了啊。
flink版本 1.10.0。

下面是我的一段测试代码:
import org.apache.commons.net.ntp.TimeStamp;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.IOUtils;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class TimeBoundedJoin {

public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
AssignerWithPeriodicWatermarks<Row> timestampExtractor = new AssignerWithPeriodicWatermarks<Row>() {
private long currentMaxTimestamp = 0;
private long lastMaxTimestamp = 0;
private long lastUpdateTime = 0;
boolean firstWatermark = true;
// Integer maxIdleTime = 30;

@Override
public Watermark getCurrentWatermark() {
if(firstWatermark) {
lastUpdateTime = System.currentTimeMillis();
firstWatermark = false;
}
if(currentMaxTimestamp != lastMaxTimestamp) {
lastMaxTimestamp = currentMaxTimestamp;
lastUpdateTime = System.currentTimeMillis();
}
if(maxIdleTime != null && System.currentTimeMillis() - lastUpdateTime > maxIdleTime * 1000) {
return new Watermark(new Date().getTime() - finalMaxOutOfOrderness * 1000);
}
return new Watermark(currentMaxTimestamp - finalMaxOutOfOrderness * 1000);

}

@Override
public long extractTimestamp(Row row, long previousElementTimestamp) {
Object value = row.getField(1);
long timestamp;
try {
timestamp = (long)value;
} catch (Exception e) {
timestamp = ((Timestamp)value).getTime();
}
if(timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
};
return timestampExtractor;
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
bsEnv.setParallelism(1);
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


// DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
List<Row> list = new ArrayList<>();
list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 00:00:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:40:00").getTime()), 100));
list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 01:00:01").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:30:00").getTime()), 100));
list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 02:00:02").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()), 100));
list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()), 100));
list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 04:00:04").getTime()), 100));
DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() {
@Override
public void run(SourceContext<Row> ctx) throws Exception {
for(Row row : list) {
ctx.collect(row);
Thread.sleep(1000);
}

}

@Override
public void cancel() {

}
});
ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
bsTableEnv.createTemporaryView("order_info", ds1, "order_id, order_time, fee, rowtime.rowtime");

List<Row> list2 = new ArrayList<>();
list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 01:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:30:00").getTime())));
list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 02:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime())));
// list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime())));
list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 04:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:40:00").getTime())));
list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 05:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:40:00").getTime())));
DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() {
@Override
public void run(SourceContext<Row> ctx) throws Exception {
for(Row row : list2) {
ctx.collect(row);
Thread.sleep(1000);
}

}

@Override
public void cancel() {

}
});
ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, rowtime.rowtime");

Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id from order_info a left join pay b on a.order_id=b.order_id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' ");

bsTableEnv.toAppendStream(joinTable, Row.class).process(new ProcessFunction<Row, Object>() {
@Override
public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.err.println("row:" + value + ",rowtime:" + value.getField(3) + ",watermark:" + sdf.format(ctx.timerService().currentWatermark()));
}
});

bsTableEnv.execute("job");
}
}
Reply | Threaded
Open this post in threaded view
|

Re: flink interval join后按窗口聚组问题

Benchao Li-2
Hi Danny,

You are right, we have already considered the watermark lateness in this case. 
However our Interval Join Operator has some bug that will still produce records later than watermark.
I've created a issue[1], we can discuss it in the jira issue.


Danny Chan <[hidden email]> 于2020年8月26日周三 下午8:09写道:
For SQL, we always hold back the watermark when we emit the elements, for time interval:

return Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness;

For your case, the watermark would hold back for 1 hour, so the left join records would not delay when it is used by subsequent operators.

See KeyedCoProcessOperatorWithWatermarkDelay and RowTimeIntervalJoin.getMaxOutputDelay for details.

Best,
Danny Chan
在 2020年7月3日 +0800 PM3:29,元始(Bob Hu) <[hidden email]>,写道:
您好,我想请教一个问题:
flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime + leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 + allowedLateness + 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group by的时候这种右表数据为空的数据就丢掉了啊。
flink版本 1.10.0。

下面是我的一段测试代码:
import org.apache.commons.net.ntp.TimeStamp;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.IOUtils;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class TimeBoundedJoin {

public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) {
AssignerWithPeriodicWatermarks<Row> timestampExtractor = new AssignerWithPeriodicWatermarks<Row>() {
private long currentMaxTimestamp = 0;
private long lastMaxTimestamp = 0;
private long lastUpdateTime = 0;
boolean firstWatermark = true;
// Integer maxIdleTime = 30;

@Override
public Watermark getCurrentWatermark() {
if(firstWatermark) {
lastUpdateTime = System.currentTimeMillis();
firstWatermark = false;
}
if(currentMaxTimestamp != lastMaxTimestamp) {
lastMaxTimestamp = currentMaxTimestamp;
lastUpdateTime = System.currentTimeMillis();
}
if(maxIdleTime != null && System.currentTimeMillis() - lastUpdateTime > maxIdleTime * 1000) {
return new Watermark(new Date().getTime() - finalMaxOutOfOrderness * 1000);
}
return new Watermark(currentMaxTimestamp - finalMaxOutOfOrderness * 1000);

}

@Override
public long extractTimestamp(Row row, long previousElementTimestamp) {
Object value = row.getField(1);
long timestamp;
try {
timestamp = (long)value;
} catch (Exception e) {
timestamp = ((Timestamp)value).getTime();
}
if(timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
};
return timestampExtractor;
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
bsEnv.setParallelism(1);
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


// DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
List<Row> list = new ArrayList<>();
list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 00:00:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:40:00").getTime()), 100));
list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 01:00:01").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:30:00").getTime()), 100));
list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 02:00:02").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()), 100));
list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()), 100));
list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()), 100));
list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 04:00:04").getTime()), 100));
DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() {
@Override
public void run(SourceContext<Row> ctx) throws Exception {
for(Row row : list) {
ctx.collect(row);
Thread.sleep(1000);
}

}

@Override
public void cancel() {

}
});
ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0));
ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)));
bsTableEnv.createTemporaryView("order_info", ds1, "order_id, order_time, fee, rowtime.rowtime");

List<Row> list2 = new ArrayList<>();
list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 01:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:30:00").getTime())));
list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 02:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime())));
// list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime())));
list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 04:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:40:00").getTime())));
list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 05:00:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:20:00").getTime())));
list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:40:00").getTime())));
DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() {
@Override
public void run(SourceContext<Row> ctx) throws Exception {
for(Row row : list2) {
ctx.collect(row);
Thread.sleep(1000);
}

}

@Override
public void cancel() {

}
});
ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0));
ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP)));
bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, rowtime.rowtime");

Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id from order_info a left join pay b on a.order_id=b.order_id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' ");

bsTableEnv.toAppendStream(joinTable, Row.class).process(new ProcessFunction<Row, Object>() {
@Override
public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.err.println("row:" + value + ",rowtime:" + value.getField(3) + ",watermark:" + sdf.format(ctx.timerService().currentWatermark()));
}
});

bsTableEnv.execute("job");
}
}


--

Best,
Benchao Li