关于flink sql 滚动窗口无法输出结果集合

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

关于flink sql 滚动窗口无法输出结果集合

steven chen
数据没次都能进来,并且统计,但是为什么结果insert 不会保存到mysql 中?是sql的问题?还是?求大神解答
CREATE TABLE user_behavior (

itemCode VARCHAR,

ts BIGINT COMMENT '时间戳',

t as TO_TIMESTAMP(FROM_UNIXTIME(ts /1000,'yyyy-MM-dd HH:mm:ss')),

proctime as PROCTIME(),

WATERMARK FOR t as t - INTERVAL '5' SECOND

) WITH (

'connector.type' = 'kafka',

'connector.version' = '0.11',

'connector.topic' = 'scan-flink-topic',

'connector.properties.group.id' ='qrcode_pv_five_min',

'connector.startup-mode' = 'latest-offset',

'connector.properties.zookeeper.connect' = 'localhost:2181',

'connector.properties.bootstrap.servers' = 'localhost:9092',

'update-mode' = 'append',

'format.type' = 'json',

'format.derive-schema' = 'true'

);
CREATE TABLE pv_five_min (
item_code VARCHAR,
dt VARCHAR,
dd VARCHAR,
pv BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://127.0.0.1:3306/qrcode',
'connector.table' = 'qrcode_pv_five_min',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'root',
'connector.password' = 'root',
'connector.write.flush.max-rows' = '1'
);
INSERT INTO pv_five_min
SELECT
itemCode As item_code,
DATE_FORMAT(TUMBLE_START(t, INTERVAL '5' MINUTE),'yyyy-MM-dd HH:mm') dt,
DATE_FORMAT(TUMBLE_END(t, INTERVAL '5' MINUTE),'yyyy-MM-dd HH:mm') dd,
COUNT(*) AS pv
FROM user_behavior
GROUP BY TUMBLE(t, INTERVAL '5' MINUTE),itemCode;


 



 

Reply | Threaded
Open this post in threaded view
|

Re: 关于flink sql 滚动窗口无法输出结果集合

Benchao Li
Hi,

Flink里面watermark要求时间字段是毫秒级别的,你可以看下你的watermark是否正常。感觉可能是这里的问题。


steven chen <[hidden email]> 于2020年5月29日周五 下午2:34写道:
数据没次都能进来,并且统计,但是为什么结果insert 不会保存到mysql 中?是sql的问题?还是?求大神解答
CREATE TABLE user_behavior (

itemCode VARCHAR,

ts BIGINT COMMENT '时间戳',

t as TO_TIMESTAMP(FROM_UNIXTIME(ts /1000,'yyyy-MM-dd HH:mm:ss')),

proctime as PROCTIME(),

WATERMARK FOR t as t - INTERVAL '5' SECOND

) WITH (

'connector.type' = 'kafka',

'connector.version' = '0.11',

'connector.topic' = 'scan-flink-topic',

'connector.properties.group.id' ='qrcode_pv_five_min',

'connector.startup-mode' = 'latest-offset',

'connector.properties.zookeeper.connect' = 'localhost:2181',

'connector.properties.bootstrap.servers' = 'localhost:9092',

'update-mode' = 'append',

'format.type' = 'json',

'format.derive-schema' = 'true'

);
CREATE TABLE pv_five_min (
item_code VARCHAR,
dt VARCHAR,
dd VARCHAR,
pv BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://127.0.0.1:3306/qrcode',
'connector.table' = 'qrcode_pv_five_min',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'root',
'connector.password' = 'root',
'connector.write.flush.max-rows' = '1'
);
INSERT INTO pv_five_min
SELECT
itemCode As item_code,
DATE_FORMAT(TUMBLE_START(t, INTERVAL '5' MINUTE),'yyyy-MM-dd HH:mm') dt,
DATE_FORMAT(TUMBLE_END(t, INTERVAL '5' MINUTE),'yyyy-MM-dd HH:mm') dd,
COUNT(*) AS pv
FROM user_behavior
GROUP BY TUMBLE(t, INTERVAL '5' MINUTE),itemCode;


 



 



--

Best,
Benchao Li