Flink - sending clicks+impressions to AWS Personalize

classic Classic list List threaded Threaded
2 messages Options
Dan
Reply | Threaded
Open this post in threaded view
|

Flink - sending clicks+impressions to AWS Personalize

Dan
I want to try using AWS Personalize to get content recommendations.  One of the fields on the input (click) event is a list of recent impressions.

E.g. 
{
  ...
  eventType: 'click',
  eventId: 'click-1',
  itemId: 'item-1'
  impression: ['item-2', 'item-3', 'item-4', 'item-5', ....],
}

Is there a way to produce this output using Flink SQK? 

I tried doing a version of this but get the following error:
"Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before."

Here is a simplified version of the query.


SELECT

    "user".user_id AS userId,

    "view".session_id AS sessionId,  click.click_id AS eventId,

    CAST(click.ts AS BIGINT) AS sentAt,

    insertion.content_id AS itemId,

    impression_content_ids AS impression

FROM "user"

RIGHT JOIN "view"

    ON "user".log_user_id = "view".log_user_id

    AND "user".ts BETWEEN "view".ts - INTERVAL '30' DAY AND "view".ts + INTERVAL '1' HOUR

JOIN insertion

    ON view.view_id = insertion.view_id

    AND view.ts BETWEEN insertion.ts - INTERVAL '1' HOUR   AND insertion.ts + INTERVAL '1' HOUR

JOIN impression  ON insertion.insertion_id = impression.insertion_id

    AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND impression.ts + INTERVAL '1' HOUR

JOIN (

    SELECT log_user_id, CAST(COLLECT(DISTINCT impression_content_id) AS ARRAY<STRING>) AS impression_content_ids

    FROM (

        SELECT insertion.log_user_id AS log_user_id,

            ROW_NUMBER() OVER (PARTITION BY insertion.log_user_id ORDER BY impression.ts DESC) AS row_num,

      insertion.content_id AS impression_content_id

        FROM insertion

        JOIN impression

            ON insertion.insertion_id = impression.insertion_id

            AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND impression.ts + INTERVAL '1' HOUR

        GROUP BY insertion.log_user_id, impression.ts, insertion.content_id

    ) WHERE row_num <= 25

    GROUP BY log_user_id

) ON insertion.insertion_id = impression.insertion_id

    AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND impression.ts + INTERVAL '1' HOUR  LEFT JOIN click

ON impression.impression_id = click.impression_id

    AND impression.ts BETWEEN click.ts - INTERVAL '12' HOUR AND click.ts + INTERVAL '12' HOUR"

Reply | Threaded
Open this post in threaded view
|

Re: Flink - sending clicks+impressions to AWS Personalize

Timo Walther
Hi Dan,

the exception that you get is a very frequent limitation in Flink SQL at
the moment.

I tried to summarize the issue recently here:

https://stackoverflow.com/questions/64445207/rowtime-attributes-must-not-be-in-the-input-rows-of-a-regular-join-despite-usi/64500296#64500296

The query is quite complex. It seems that some JOIN is not recognized as
a streaming interval join. Maybe you can split up the big query into
individual subqueries and verify the the plan using
`TableEnvironment.explainSql()` to figure out which join causes the
exception.

Regards,
Timo


On 16.12.20 03:40, Dan Hill wrote:

> I want to try using AWS Personalize
> <https://aws.amazon.com/personalize/> to get content recommendations.  
> One of the fields on the input (click) event is a list of recent
> impressions.
>
> E.g.
> {
>    ...
>    eventType: 'click',
>    eventId: 'click-1',
>    itemId: 'item-1'
>    impression: ['item-2', 'item-3', 'item-4', 'item-5', ....],
> }
>
> Is there a way to produce this output using Flink SQK?
>
> I tried doing a version of this but get the following error:
> "Rowtime attributes must not be in the input rows of a regular join. As
> a workaround you can cast the time attributes of input tables to
> TIMESTAMP before."
>
> Here is a simplified version of the query.
>
>
> SELECT
>
>    "user".user_id AS userId,
>
> "view".session_id AS sessionId,  click.click_id AS eventId,
>
>    CAST(click.ts AS BIGINT) AS sentAt,
>
>    insertion.content_id AS itemId,
>
>    impression_content_ids AS impression
>
> FROM "user"
>
> RIGHT JOIN "view"
>
>    ON "user".log_user_id = "view".log_user_id
>
>    AND "user".ts BETWEEN "view".ts - INTERVAL '30' DAY AND "view".ts +
> INTERVAL '1' HOUR
>
> JOIN insertion
>
>    ON view.view_id = insertion.view_id
>
>    AND view.ts BETWEEN insertion.ts - INTERVAL '1' HOUR   AND
> insertion.ts + INTERVAL '1' HOUR
>
> JOIN impression  ON insertion.insertion_id = impression.insertion_id
>
>    AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND
> impression.ts + INTERVAL '1' HOUR
>
> JOIN (
>
>    SELECT log_user_id, CAST(COLLECT(DISTINCT impression_content_id) AS
> ARRAY<STRING>) AS impression_content_ids
>
> FROM (
>
>    SELECT insertion.log_user_id AS log_user_id,
>
>    ROW_NUMBER() OVER (PARTITION BY insertion.log_user_id ORDER BY
> impression.ts DESC) AS row_num,
>
>        insertion.content_id AS impression_content_id
>
>      FROM insertion
>
>      JOIN impression
>
>      ON insertion.insertion_id = impression.insertion_id
>
>        AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND
> impression.ts + INTERVAL '1' HOUR
>
>      GROUP BY insertion.log_user_id, impression.ts, insertion.content_id
>
> ) WHERE row_num <= 25
>
> GROUP BY log_user_id
>
> ) ON insertion.insertion_id = impression.insertion_id
>
> AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND
> impression.ts + INTERVAL '1' HOUR  LEFT JOIN click
>
> ON impression.impression_id = click.impression_id
>
>      AND impression.ts BETWEEN click.ts - INTERVAL '12' HOUR AND
> click.ts + INTERVAL '12' HOUR"
>