Propagating event time field from nested query

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Propagating event time field from nested query

Piyush Narang

Hi folks,

 

We have a Flink streaming Table / SQL job that we were looking to migrate from an older Flink release (1.6.x) to 1.9. As part of doing so, we have been seeing a few errors which I was trying to figure out how to work around. Would appreciate any help / pointers.

Job essentially involves a nested query:

SELECT `timestamp`, cost, partnerid, impression_id, …

FROM my_kafka_stream

 

The kafka stream has a ‘timestamp’ field that tracks event time. We register this nested query as “base_query”.

 

We now use this in a couple of outer aggregation queries (different outer aggregation queries differ in terms of the time window we aggregate over – 1M, 1H, 6H etc):

SELECT

  SUM(cost) AS FLOAT AS CostPerPartner,

  COUNT(impression_id) AS ImpsPerPartner,

  …

FROM

  base_query

GROUP BY

  partnerid,

  HOP(`timestamp`, INTERVAL '30' SECOND, INTERVAL '1' MINUTE)

 

While the outer query would get translated and scheduled as a Flink streaming job just fine on 1.6, we are running into this error when we try to bump our build to 1.9:

“Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.”

 

Any suggestions on how we could work around this? I saw a thread suggesting using HOP_ROWTIME but if I understand correctly, that would mean we would need to do the hop window generation / group by in the nested query which we’d like to avoid (as we have a couple of time window combinations to generate).

 

Thanks,

-- Piyush

 

Reply | Threaded
Open this post in threaded view
|

Re: Propagating event time field from nested query

Dawid Wysakowicz-2

Hi Piyush,

Could you verify that the type of the `timestamp` field in the table my_kafka_stream is of TIMESTAMP(3) *ROWTIME* type? Could you share how you create this table?

What you are doing should work and what I suspect is that the `timestamp` field in the `my_kafka_stream` changed the type somehow.

Best,

Dawid

On 11/11/2019 22:43, Piyush Narang wrote:

Hi folks,

 

We have a Flink streaming Table / SQL job that we were looking to migrate from an older Flink release (1.6.x) to 1.9. As part of doing so, we have been seeing a few errors which I was trying to figure out how to work around. Would appreciate any help / pointers.

Job essentially involves a nested query:

SELECT `timestamp`, cost, partnerid, impression_id, …

FROM my_kafka_stream

 

The kafka stream has a ‘timestamp’ field that tracks event time. We register this nested query as “base_query”.

 

We now use this in a couple of outer aggregation queries (different outer aggregation queries differ in terms of the time window we aggregate over – 1M, 1H, 6H etc):

SELECT

  SUM(cost) AS FLOAT AS CostPerPartner,

  COUNT(impression_id) AS ImpsPerPartner,

  …

FROM

  base_query

GROUP BY

  partnerid,

  HOP(`timestamp`, INTERVAL '30' SECOND, INTERVAL '1' MINUTE)

 

While the outer query would get translated and scheduled as a Flink streaming job just fine on 1.6, we are running into this error when we try to bump our build to 1.9:

“Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.”

 

Any suggestions on how we could work around this? I saw a thread suggesting using HOP_ROWTIME but if I understand correctly, that would mean we would need to do the hop window generation / group by in the nested query which we’d like to avoid (as we have a couple of time window combinations to generate).

 

Thanks,

-- Piyush

 


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Propagating event time field from nested query

Piyush Narang

Hi Dawid,

 

Thanks for getting back, the *ROWTIME* modifier did ring a bell and I was able to find the issue. We are registering the inner table correctly (timestamp is of type timestamp(3) rowtime), but we had an intermediate step where we converted that to a Datastream to optionally add custom triggers and then re-registered it and tried to use that table. When re-registering the Datastream we were converting the timestamp(3) rowtime to a timestamp field (as I think we hit a warning that the rowtime timestamp wasn’t part of the public api).

 

We didn’t actually need to do the intermediate datastream so when I dropped that step and directly use the inner table (with the rowtime timestamp), things work as expected!

 

Thanks,

 

-- Piyush

 

 

From: Dawid Wysakowicz <[hidden email]>
Date: Thursday, November 14, 2019 at 10:57 AM
To: <[hidden email]>
Subject: Re: Propagating event time field from nested query

 

Hi Piyush,

Could you verify that the type of the `timestamp` field in the table my_kafka_stream is of TIMESTAMP(3) *ROWTIME* type? Could you share how you create this table?

What you are doing should work and what I suspect is that the `timestamp` field in the `my_kafka_stream` changed the type somehow.

Best,

Dawid

On 11/11/2019 22:43, Piyush Narang wrote:

Hi folks,

 

We have a Flink streaming Table / SQL job that we were looking to migrate from an older Flink release (1.6.x) to 1.9. As part of doing so, we have been seeing a few errors which I was trying to figure out how to work around. Would appreciate any help / pointers.

Job essentially involves a nested query:

SELECT `timestamp`, cost, partnerid, impression_id, …

FROM my_kafka_stream

 

The kafka stream has a ‘timestamp’ field that tracks event time. We register this nested query as “base_query”.

 

We now use this in a couple of outer aggregation queries (different outer aggregation queries differ in terms of the time window we aggregate over – 1M, 1H, 6H etc):

SELECT

  SUM(cost) AS FLOAT AS CostPerPartner,

  COUNT(impression_id) AS ImpsPerPartner,

  …

FROM

  base_query

GROUP BY

  partnerid,

  HOP(`timestamp`, INTERVAL '30' SECOND, INTERVAL '1' MINUTE)

 

While the outer query would get translated and scheduled as a Flink streaming job just fine on 1.6, we are running into this error when we try to bump our build to 1.9:

“Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.”

 

Any suggestions on how we could work around this? I saw a thread suggesting using HOP_ROWTIME but if I understand correctly, that would mean we would need to do the hop window generation / group by in the nested query which we’d like to avoid (as we have a couple of time window combinations to generate).

 

Thanks,

-- Piyush