I want to try using AWS Personalize to get content recommendations. One of the fields on the input (click) event is a list of recent impressions. E.g. { ... eventType: 'click', eventId: 'click-1', itemId: 'item-1' impression: ['item-2', 'item-3', 'item-4', 'item-5', ....], } Is there a way to produce this output using Flink SQK? I tried doing a version of this but get the following error: "Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before."
Here is a simplified version of the query. SELECT "user".user_id AS userId, "view".session_id AS sessionId, click.click_id AS eventId, CAST(click.ts AS BIGINT) AS sentAt, insertion.content_id AS itemId, impression_content_ids AS impression FROM "user" RIGHT JOIN "view" ON "user".log_user_id = "view".log_user_id AND "user".ts BETWEEN "view".ts - INTERVAL '30' DAY AND "view".ts + INTERVAL '1' HOUR JOIN insertion ON view.view_id = insertion.view_id AND view.ts BETWEEN insertion.ts - INTERVAL '1' HOUR AND insertion.ts + INTERVAL '1' HOUR JOIN impression ON insertion.insertion_id = impression.insertion_id AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND impression.ts + INTERVAL '1' HOUR JOIN ( SELECT log_user_id, CAST(COLLECT(DISTINCT impression_content_id) AS ARRAY<STRING>) AS impression_content_ids FROM ( SELECT insertion.log_user_id AS log_user_id, ROW_NUMBER() OVER (PARTITION BY insertion.log_user_id ORDER BY impression.ts DESC) AS row_num, insertion.content_id AS impression_content_id FROM insertion JOIN impression ON insertion.insertion_id = impression.insertion_id AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND impression.ts + INTERVAL '1' HOUR GROUP BY insertion.log_user_id, impression.ts, insertion.content_id ) WHERE row_num <= 25 GROUP BY log_user_id ) ON insertion.insertion_id = impression.insertion_id AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND impression.ts + INTERVAL '1' HOUR LEFT JOIN click ON impression.impression_id = click.impression_id AND impression.ts BETWEEN click.ts - INTERVAL '12' HOUR AND click.ts + INTERVAL '12' HOUR" |
Hi Dan,
the exception that you get is a very frequent limitation in Flink SQL at the moment. I tried to summarize the issue recently here: https://stackoverflow.com/questions/64445207/rowtime-attributes-must-not-be-in-the-input-rows-of-a-regular-join-despite-usi/64500296#64500296 The query is quite complex. It seems that some JOIN is not recognized as a streaming interval join. Maybe you can split up the big query into individual subqueries and verify the the plan using `TableEnvironment.explainSql()` to figure out which join causes the exception. Regards, Timo On 16.12.20 03:40, Dan Hill wrote: > I want to try using AWS Personalize > <https://aws.amazon.com/personalize/> to get content recommendations. > One of the fields on the input (click) event is a list of recent > impressions. > > E.g. > { > ... > eventType: 'click', > eventId: 'click-1', > itemId: 'item-1' > impression: ['item-2', 'item-3', 'item-4', 'item-5', ....], > } > > Is there a way to produce this output using Flink SQK? > > I tried doing a version of this but get the following error: > "Rowtime attributes must not be in the input rows of a regular join. As > a workaround you can cast the time attributes of input tables to > TIMESTAMP before." > > Here is a simplified version of the query. > > > SELECT > > "user".user_id AS userId, > > "view".session_id AS sessionId, click.click_id AS eventId, > > CAST(click.ts AS BIGINT) AS sentAt, > > insertion.content_id AS itemId, > > impression_content_ids AS impression > > FROM "user" > > RIGHT JOIN "view" > > ON "user".log_user_id = "view".log_user_id > > AND "user".ts BETWEEN "view".ts - INTERVAL '30' DAY AND "view".ts + > INTERVAL '1' HOUR > > JOIN insertion > > ON view.view_id = insertion.view_id > > AND view.ts BETWEEN insertion.ts - INTERVAL '1' HOUR AND > insertion.ts + INTERVAL '1' HOUR > > JOIN impression ON insertion.insertion_id = impression.insertion_id > > AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND > impression.ts + INTERVAL '1' HOUR > > JOIN ( > > SELECT log_user_id, CAST(COLLECT(DISTINCT impression_content_id) AS > ARRAY<STRING>) AS impression_content_ids > > FROM ( > > SELECT insertion.log_user_id AS log_user_id, > > ROW_NUMBER() OVER (PARTITION BY insertion.log_user_id ORDER BY > impression.ts DESC) AS row_num, > > insertion.content_id AS impression_content_id > > FROM insertion > > JOIN impression > > ON insertion.insertion_id = impression.insertion_id > > AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND > impression.ts + INTERVAL '1' HOUR > > GROUP BY insertion.log_user_id, impression.ts, insertion.content_id > > ) WHERE row_num <= 25 > > GROUP BY log_user_id > > ) ON insertion.insertion_id = impression.insertion_id > > AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND > impression.ts + INTERVAL '1' HOUR LEFT JOIN click > > ON impression.impression_id = click.impression_id > > AND impression.ts BETWEEN click.ts - INTERVAL '12' HOUR AND > click.ts + INTERVAL '12' HOUR" > |
Free forum by Nabble | Edit this page |