Unexpected hop start & end timestamps after stream SQL join

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Unexpected hop start & end timestamps after stream SQL join

Juho Autio
I'm joining a tumbling & hopping window in Flink 1.5-SNAPSHOT. The result is unexpected. Am I doing something wrong? Maybe this is just not a supported join type at all? Any way here goes:

I first register these two tables:

1. new_ids: a tumbling window of seen ids within the last 10 seconds:

SELECT
  s_aid1,
  s_cid,
  TS_MIN(rowtime) AS first_seen,
  CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate,
  TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start,
  TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end
FROM events
WHERE s_aid1 IS NOT NULL
GROUP BY
  s_aid1,
  s_cid,
  TUMBLE(rowtime, INTERVAL '10' SECOND)

2. seen_ids: a sliding window of seen ids 1 hour backwards, 10 second hop:

SELECT
  s_aid1,
  s_cid,
  TS_MIN(rowtime) AS first_seen,
  CAST(HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS DATE) AS processdate,
  HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_start,
  HOP_END(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_end
FROM events
WHERE s_aid1 IS NOT NULL
GROUP BY
  s_aid1,
  s_cid,
  HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR)

If I write the results of the "seen_ids" table, the difference between HOP_start and HOP_end is always 1 hour, as expected.

Then I register another query that joins the 2 tables:

unique_ids (mostly including fields for debugging - what I need is the unique, new combinations of s_cid x s_aid1):

SELECT
   new_ids.s_cid,
   new_ids.s_aid1,
   new_ids.processdate AS processdate,
   seen_ids.processdate AS seen_ids_processdate,
   new_ids.first_seen AS new_ids_first_seen,
   seen_ids.first_seen AS seen_ids_first_seen,
   tumble_start,
   HOP_start,
   tumble_end,
   HOP_end
FROM new_ids, seen_ids
WHERE new_ids.s_cid = seen_ids.s_cid
  AND new_ids.s_aid1 = seen_ids.s_aid1
  AND (new_ids.first_seen <= seen_ids.first_seen OR seen_ids.first_seen IS NULL)

I print the results of this table, and surprisingly the HOP_start & HOP_end are only separated by 10 seconds. Is this a bug?

{
  "s_cid": "appsimulator_236e5fb7",
"s_aid1": "L1GENe52d723b-b563-492f-942d-3dc1a31d7e26",

"seen_ids_processdate": "2018-02-14",

"seen_ids_first_seen": "2018-02-14 11:37:59.0",
"new_ids_first_seen":  "2018-02-14 11:34:33.0",
"tumble_start": "2018-02-14 11:34:30.0",
"tumble_end": "2018-02-14 11:34:40.0",

"HOP_start": "2018-02-14 11:37:50.0",
"HOP_end": "2018-02-14 11:38:00.0"
}

What I'm trying to do is exclude the id from the current "new_ids" window if it was already seen before (within the 1 hour scope of "seen_ids"), but that doesn't work either. This example result row also shows that "seen_ids.first_seen" is bigger than it should be.


Even if I can find a fix to this to get what I need, this strategy seems overly complicated. If anyone can suggest a better way, I'd be glad to hear. If this was a batch job, it could be defined simply as:

SELECT DISTINCT s_cid, s_aid1, DATE_FORMAT(rowtime, '%Y%m%d/%H')

+ when streaming this query, the new distinct values should be written out every 10 seconds (ONLY the new ones - within that wrapping 1 hour window). So far I haven't been able to figure out how to do that in a simple way with Flink.


*) TS_MIN is a custom function, but it's just a mapping of Flink's MinAggFunction:

import java.sql.Timestamp

import com.rovio.ds.flink.common.udaf.ImplicitOrdering.ordered

import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
import org.apache.flink.table.functions.aggfunctions.MaxAggFunction
import org.apache.flink.table.functions.aggfunctions.MinAggFunction

object TimestampAggFunctions {

  trait TimestampAggFunction {
    def getInitValue = null
    def getValueTypeInfo = SqlTimeTypeInfo.TIMESTAMP
  }

  class TimestampMinAggFunction extends MinAggFunction[Timestamp] with TimestampAggFunction
  class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] with TimestampAggFunction

}

// Registered with:
tableEnv.registerFunction("TS_MIN", new TimestampMinAggFunction());

Reply | Threaded
Open this post in threaded view
|

Re: Unexpected hop start & end timestamps after stream SQL join

Fabian Hueske-2
Hi Juho,

sorry for the late response. I found time to look into this issue.
I agree, that the start and end timestamps of the HOP window should be 1 hour apart from each other. I tried to reproduce the issue, but was not able to do so.
Can you maybe open a JIRA and provide a simple test case (collection data source, no Kafka) that reproduces the issue?

Regarding the task that you are trying to solve, have you looked into OVER windows?

The following query would count for each record, how often a record with the same ID combination was observed in the last hour based on its timestamp:

SELECT
  s_aid1,
  s_cid,
  COUNT(*) OVER (PARTITION BY s_aid1, s_cid ORDER BY rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS occurrence,
  rowtime
FROM events
WHERE s_aid1 IS NOT NULL

If occurrence is 1, the current record is the only record within the last 1 hour with the combination of aid and cid .
The query does not batch the stream by 10 seconds, but rather produces the results in real-time. If the batching is not required, you should be good by adding a filter on occurrence = 1.
Otherwise, you could add the filter and wrap it by 10 secs tumbling window.

Hope this helps,
Fabian


2018-02-14 15:30 GMT+01:00 Juho Autio <[hidden email]>:
I'm joining a tumbling & hopping window in Flink 1.5-SNAPSHOT. The result is unexpected. Am I doing something wrong? Maybe this is just not a supported join type at all? Any way here goes:

I first register these two tables:

1. new_ids: a tumbling window of seen ids within the last 10 seconds:

SELECT
  s_aid1,
  s_cid,
  TS_MIN(rowtime) AS first_seen,
  CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate,
  TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start,
  TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end
FROM events
WHERE s_aid1 IS NOT NULL
GROUP BY
  s_aid1,
  s_cid,
  TUMBLE(rowtime, INTERVAL '10' SECOND)

2. seen_ids: a sliding window of seen ids 1 hour backwards, 10 second hop:

SELECT
  s_aid1,
  s_cid,
  TS_MIN(rowtime) AS first_seen,
  CAST(HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS DATE) AS processdate,
  HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_start,
  HOP_END(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_end
FROM events
WHERE s_aid1 IS NOT NULL
GROUP BY
  s_aid1,
  s_cid,
  HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR)

If I write the results of the "seen_ids" table, the difference between HOP_start and HOP_end is always 1 hour, as expected.

Then I register another query that joins the 2 tables:

unique_ids (mostly including fields for debugging - what I need is the unique, new combinations of s_cid x s_aid1):

SELECT
   new_ids.s_cid,
   new_ids.s_aid1,
   new_ids.processdate AS processdate,
   seen_ids.processdate AS seen_ids_processdate,
   new_ids.first_seen AS new_ids_first_seen,
   seen_ids.first_seen AS seen_ids_first_seen,
   tumble_start,
   HOP_start,
   tumble_end,
   HOP_end
FROM new_ids, seen_ids
WHERE new_ids.s_cid = seen_ids.s_cid
  AND new_ids.s_aid1 = seen_ids.s_aid1
  AND (new_ids.first_seen <= seen_ids.first_seen OR seen_ids.first_seen IS NULL)

I print the results of this table, and surprisingly the HOP_start & HOP_end are only separated by 10 seconds. Is this a bug?

{
  "s_cid": "appsimulator_236e5fb7",
"s_aid1": "L1GENe52d723b-b563-492f-942d-3dc1a31d7e26",

"seen_ids_processdate": "2018-02-14",

"seen_ids_first_seen": "2018-02-14 11:37:59.0",
"new_ids_first_seen":  "2018-02-14 11:34:33.0",
"tumble_start": "2018-02-14 11:34:30.0",
"tumble_end": "2018-02-14 11:34:40.0",

"HOP_start": "2018-02-14 11:37:50.0",
"HOP_end": "2018-02-14 11:38:00.0"
}

What I'm trying to do is exclude the id from the current "new_ids" window if it was already seen before (within the 1 hour scope of "seen_ids"), but that doesn't work either. This example result row also shows that "seen_ids.first_seen" is bigger than it should be.


Even if I can find a fix to this to get what I need, this strategy seems overly complicated. If anyone can suggest a better way, I'd be glad to hear. If this was a batch job, it could be defined simply as:

SELECT DISTINCT s_cid, s_aid1, DATE_FORMAT(rowtime, '%Y%m%d/%H')

+ when streaming this query, the new distinct values should be written out every 10 seconds (ONLY the new ones - within that wrapping 1 hour window). So far I haven't been able to figure out how to do that in a simple way with Flink.


*) TS_MIN is a custom function, but it's just a mapping of Flink's MinAggFunction:

import java.sql.Timestamp

import com.rovio.ds.flink.common.udaf.ImplicitOrdering.ordered

import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
import org.apache.flink.table.functions.aggfunctions.MaxAggFunction
import org.apache.flink.table.functions.aggfunctions.MinAggFunction

object TimestampAggFunctions {

  trait TimestampAggFunction {
    def getInitValue = null
    def getValueTypeInfo = SqlTimeTypeInfo.TIMESTAMP
  }

  class TimestampMinAggFunction extends MinAggFunction[Timestamp] with TimestampAggFunction
  class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] with TimestampAggFunction

}

// Registered with:
tableEnv.registerFunction("TS_MIN", new TimestampMinAggFunction());


Reply | Threaded
Open this post in threaded view
|

Re: Unexpected hop start & end timestamps after stream SQL join

Juho Autio
Thanks for the hint! For some reason it isn't catching all distinct values (even though it's a much simpler way than what I initially tried and seems good in that sense). First of all, isn't this like a sliding window: "rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW"?

My use case needs a tumbling window. I tried adding PARTITION BY additionally with DATE_FORMAT(rowtime, '%Y%m%d%H') to achieve the same result as with a tumbling window; this resulted in slightly more distinct values, but was still missing some! Would there by some nice way to create a tumbling window right in the RANGE condition instead?

As a disclaimer I have to say we seem to be fine using a simple window _without_ any early triggering. But of course it would be nice to understand how early triggering could be enabled in a simple & scalable way.

Cheers,
Juho

On Mon, Feb 19, 2018 at 1:44 PM, Fabian Hueske <[hidden email]> wrote:
Hi Juho,

sorry for the late response. I found time to look into this issue.
I agree, that the start and end timestamps of the HOP window should be 1 hour apart from each other. I tried to reproduce the issue, but was not able to do so.
Can you maybe open a JIRA and provide a simple test case (collection data source, no Kafka) that reproduces the issue?

Regarding the task that you are trying to solve, have you looked into OVER windows?

The following query would count for each record, how often a record with the same ID combination was observed in the last hour based on its timestamp:

SELECT
  s_aid1,
  s_cid,
  COUNT(*) OVER (PARTITION BY s_aid1, s_cid ORDER BY rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS occurrence,
  rowtime
FROM events
WHERE s_aid1 IS NOT NULL

If occurrence is 1, the current record is the only record within the last 1 hour with the combination of aid and cid .
The query does not batch the stream by 10 seconds, but rather produces the results in real-time. If the batching is not required, you should be good by adding a filter on occurrence = 1.
Otherwise, you could add the filter and wrap it by 10 secs tumbling window.

Hope this helps,
Fabian


2018-02-14 15:30 GMT+01:00 Juho Autio <[hidden email]>:
I'm joining a tumbling & hopping window in Flink 1.5-SNAPSHOT. The result is unexpected. Am I doing something wrong? Maybe this is just not a supported join type at all? Any way here goes:

I first register these two tables:

1. new_ids: a tumbling window of seen ids within the last 10 seconds:

SELECT
  s_aid1,
  s_cid,
  TS_MIN(rowtime) AS first_seen,
  CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate,
  TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start,
  TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end
FROM events
WHERE s_aid1 IS NOT NULL
GROUP BY
  s_aid1,
  s_cid,
  TUMBLE(rowtime, INTERVAL '10' SECOND)

2. seen_ids: a sliding window of seen ids 1 hour backwards, 10 second hop:

SELECT
  s_aid1,
  s_cid,
  TS_MIN(rowtime) AS first_seen,
  CAST(HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS DATE) AS processdate,
  HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_start,
  HOP_END(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_end
FROM events
WHERE s_aid1 IS NOT NULL
GROUP BY
  s_aid1,
  s_cid,
  HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR)

If I write the results of the "seen_ids" table, the difference between HOP_start and HOP_end is always 1 hour, as expected.

Then I register another query that joins the 2 tables:

unique_ids (mostly including fields for debugging - what I need is the unique, new combinations of s_cid x s_aid1):

SELECT
   new_ids.s_cid,
   new_ids.s_aid1,
   new_ids.processdate AS processdate,
   seen_ids.processdate AS seen_ids_processdate,
   new_ids.first_seen AS new_ids_first_seen,
   seen_ids.first_seen AS seen_ids_first_seen,
   tumble_start,
   HOP_start,
   tumble_end,
   HOP_end
FROM new_ids, seen_ids
WHERE new_ids.s_cid = seen_ids.s_cid
  AND new_ids.s_aid1 = seen_ids.s_aid1
  AND (new_ids.first_seen <= seen_ids.first_seen OR seen_ids.first_seen IS NULL)

I print the results of this table, and surprisingly the HOP_start & HOP_end are only separated by 10 seconds. Is this a bug?

{
  "s_cid": "appsimulator_236e5fb7",
"s_aid1": "L1GENe52d723b-b563-492f-942d-3dc1a31d7e26",

"seen_ids_processdate": "2018-02-14",

"seen_ids_first_seen": "2018-02-14 11:37:59.0",
"new_ids_first_seen":  "2018-02-14 11:34:33.0",
"tumble_start": "2018-02-14 11:34:30.0",
"tumble_end": "2018-02-14 11:34:40.0",

"HOP_start": "2018-02-14 11:37:50.0",
"HOP_end": "2018-02-14 11:38:00.0"
}

What I'm trying to do is exclude the id from the current "new_ids" window if it was already seen before (within the 1 hour scope of "seen_ids"), but that doesn't work either. This example result row also shows that "seen_ids.first_seen" is bigger than it should be.


Even if I can find a fix to this to get what I need, this strategy seems overly complicated. If anyone can suggest a better way, I'd be glad to hear. If this was a batch job, it could be defined simply as:

SELECT DISTINCT s_cid, s_aid1, DATE_FORMAT(rowtime, '%Y%m%d/%H')

+ when streaming this query, the new distinct values should be written out every 10 seconds (ONLY the new ones - within that wrapping 1 hour window). So far I haven't been able to figure out how to do that in a simple way with Flink.


*) TS_MIN is a custom function, but it's just a mapping of Flink's MinAggFunction:

import java.sql.Timestamp

import com.rovio.ds.flink.common.udaf.ImplicitOrdering.ordered

import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
import org.apache.flink.table.functions.aggfunctions.MaxAggFunction
import org.apache.flink.table.functions.aggfunctions.MinAggFunction

object TimestampAggFunctions {

  trait TimestampAggFunction {
    def getInitValue = null
    def getValueTypeInfo = SqlTimeTypeInfo.TIMESTAMP
  }

  class TimestampMinAggFunction extends MinAggFunction[Timestamp] with TimestampAggFunction
  class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] with TimestampAggFunction

}

// Registered with:
tableEnv.registerFunction("TS_MIN", new TimestampMinAggFunction());



Reply | Threaded
Open this post in threaded view
|

Re: Unexpected hop start & end timestamps after stream SQL join

Fabian Hueske-2
Hi Juho,

a query with an OVER aggregation should emit exactly one row for each input row.
Does your comment on "isn't catching all distinct values" mean that this is not the case?

You can combine tumbling windows and over aggregates also by nesting queries as shown below:

SELECT
  s_aid1,
  s_cid,
  first_seen,
  MIN(first_seen) OVER (PARTITION BY s_aid1, s_cid ORDER BY rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS first_seen_1h,
  processdate,
  tumble_start,
  tumble_end
FROM (
  SELECT
    s_aid1,
    s_cid,
    TS_MIN(rowtime) AS first_seen,
    CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate,
    TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start,
    TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end,
    TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) AS rowtime
  FROM events
  WHERE s_aid1 IS NOT NULL
  GROUP BY
    s_aid1,
    s_cid,
    TUMBLE(rowtime, INTERVAL '10' SECOND)
  )

Early triggering is not yet supported for SQL queries.

Best, Fabian

2018-02-27 15:20 GMT+01:00 Juho Autio <[hidden email]>:
Thanks for the hint! For some reason it isn't catching all distinct values (even though it's a much simpler way than what I initially tried and seems good in that sense). First of all, isn't this like a sliding window: "rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW"?

My use case needs a tumbling window. I tried adding PARTITION BY additionally with DATE_FORMAT(rowtime, '%Y%m%d%H') to achieve the same result as with a tumbling window; this resulted in slightly more distinct values, but was still missing some! Would there by some nice way to create a tumbling window right in the RANGE condition instead?

As a disclaimer I have to say we seem to be fine using a simple window _without_ any early triggering. But of course it would be nice to understand how early triggering could be enabled in a simple & scalable way.

Cheers,
Juho

On Mon, Feb 19, 2018 at 1:44 PM, Fabian Hueske <[hidden email]> wrote:
Hi Juho,

sorry for the late response. I found time to look into this issue.
I agree, that the start and end timestamps of the HOP window should be 1 hour apart from each other. I tried to reproduce the issue, but was not able to do so.
Can you maybe open a JIRA and provide a simple test case (collection data source, no Kafka) that reproduces the issue?

Regarding the task that you are trying to solve, have you looked into OVER windows?

The following query would count for each record, how often a record with the same ID combination was observed in the last hour based on its timestamp:

SELECT
  s_aid1,
  s_cid,
  COUNT(*) OVER (PARTITION BY s_aid1, s_cid ORDER BY rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS occurrence,
  rowtime
FROM events
WHERE s_aid1 IS NOT NULL

If occurrence is 1, the current record is the only record within the last 1 hour with the combination of aid and cid .
The query does not batch the stream by 10 seconds, but rather produces the results in real-time. If the batching is not required, you should be good by adding a filter on occurrence = 1.
Otherwise, you could add the filter and wrap it by 10 secs tumbling window.

Hope this helps,
Fabian


2018-02-14 15:30 GMT+01:00 Juho Autio <[hidden email]>:
I'm joining a tumbling & hopping window in Flink 1.5-SNAPSHOT. The result is unexpected. Am I doing something wrong? Maybe this is just not a supported join type at all? Any way here goes:

I first register these two tables:

1. new_ids: a tumbling window of seen ids within the last 10 seconds:

SELECT
  s_aid1,
  s_cid,
  TS_MIN(rowtime) AS first_seen,
  CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate,
  TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start,
  TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end
FROM events
WHERE s_aid1 IS NOT NULL
GROUP BY
  s_aid1,
  s_cid,
  TUMBLE(rowtime, INTERVAL '10' SECOND)

2. seen_ids: a sliding window of seen ids 1 hour backwards, 10 second hop:

SELECT
  s_aid1,
  s_cid,
  TS_MIN(rowtime) AS first_seen,
  CAST(HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS DATE) AS processdate,
  HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_start,
  HOP_END(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_end
FROM events
WHERE s_aid1 IS NOT NULL
GROUP BY
  s_aid1,
  s_cid,
  HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR)

If I write the results of the "seen_ids" table, the difference between HOP_start and HOP_end is always 1 hour, as expected.

Then I register another query that joins the 2 tables:

unique_ids (mostly including fields for debugging - what I need is the unique, new combinations of s_cid x s_aid1):

SELECT
   new_ids.s_cid,
   new_ids.s_aid1,
   new_ids.processdate AS processdate,
   seen_ids.processdate AS seen_ids_processdate,
   new_ids.first_seen AS new_ids_first_seen,
   seen_ids.first_seen AS seen_ids_first_seen,
   tumble_start,
   HOP_start,
   tumble_end,
   HOP_end
FROM new_ids, seen_ids
WHERE new_ids.s_cid = seen_ids.s_cid
  AND new_ids.s_aid1 = seen_ids.s_aid1
  AND (new_ids.first_seen <= seen_ids.first_seen OR seen_ids.first_seen IS NULL)

I print the results of this table, and surprisingly the HOP_start & HOP_end are only separated by 10 seconds. Is this a bug?

{
  "s_cid": "appsimulator_236e5fb7",
"s_aid1": "L1GENe52d723b-b563-492f-942d-3dc1a31d7e26",

"seen_ids_processdate": "2018-02-14",

"seen_ids_first_seen": "2018-02-14 11:37:59.0",
"new_ids_first_seen":  "2018-02-14 11:34:33.0",
"tumble_start": "2018-02-14 11:34:30.0",
"tumble_end": "2018-02-14 11:34:40.0",

"HOP_start": "2018-02-14 11:37:50.0",
"HOP_end": "2018-02-14 11:38:00.0"
}

What I'm trying to do is exclude the id from the current "new_ids" window if it was already seen before (within the 1 hour scope of "seen_ids"), but that doesn't work either. This example result row also shows that "seen_ids.first_seen" is bigger than it should be.


Even if I can find a fix to this to get what I need, this strategy seems overly complicated. If anyone can suggest a better way, I'd be glad to hear. If this was a batch job, it could be defined simply as:

SELECT DISTINCT s_cid, s_aid1, DATE_FORMAT(rowtime, '%Y%m%d/%H')

+ when streaming this query, the new distinct values should be written out every 10 seconds (ONLY the new ones - within that wrapping 1 hour window). So far I haven't been able to figure out how to do that in a simple way with Flink.


*) TS_MIN is a custom function, but it's just a mapping of Flink's MinAggFunction:

import java.sql.Timestamp

import com.rovio.ds.flink.common.udaf.ImplicitOrdering.ordered

import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
import org.apache.flink.table.functions.aggfunctions.MaxAggFunction
import org.apache.flink.table.functions.aggfunctions.MinAggFunction

object TimestampAggFunctions {

  trait TimestampAggFunction {
    def getInitValue = null
    def getValueTypeInfo = SqlTimeTypeInfo.TIMESTAMP
  }

  class TimestampMinAggFunction extends MinAggFunction[Timestamp] with TimestampAggFunction
  class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] with TimestampAggFunction

}

// Registered with:
tableEnv.registerFunction("TS_MIN", new TimestampMinAggFunction());




Reply | Threaded
Open this post in threaded view
|

Re: Unexpected hop start & end timestamps after stream SQL join

Juho Autio
> a query with an OVER aggregation should emit exactly one row for each input row. 
> Does your comment on "isn't catching all distinct values" mean that this is not the case?

Not really what I meant? The problem is that some ids are not received at all for some time windows.

I did this as you suggested, this part works (there are no duplicates):

        Table result = tableEnv.sql(uniqueIdsSql);
        // remove duplicates (this is a trick to get only distinct values but get them asap)
        result = result.filter("occurrence = 1");

I'm seeing all ids at least once, but missing them from some time windows where they occurred as well. So it seems like the uniqueness is not properly scoped to the time windows. I don't see why not..

I can try to create a simplified class to hopefully reproduce this problem. Maybe also for the original issue that I encountered with hop start/end timestamps after a join.

Thanks for looking into this so far!

On Tue, Feb 27, 2018 at 4:45 PM, Fabian Hueske <[hidden email]> wrote:
Hi Juho,

a query with an OVER aggregation should emit exactly one row for each input row.
Does your comment on "isn't catching all distinct values" mean that this is not the case?

You can combine tumbling windows and over aggregates also by nesting queries as shown below:

SELECT
  s_aid1,
  s_cid,
  first_seen,
  MIN(first_seen) OVER (PARTITION BY s_aid1, s_cid ORDER BY rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS first_seen_1h,
  processdate,
  tumble_start,
  tumble_end
FROM (
  SELECT
    s_aid1,
    s_cid,
    TS_MIN(rowtime) AS first_seen,
    CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate,
    TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start,
    TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end,
    TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) AS rowtime
  FROM events
  WHERE s_aid1 IS NOT NULL
  GROUP BY
    s_aid1,
    s_cid,
    TUMBLE(rowtime, INTERVAL '10' SECOND)
  )

Early triggering is not yet supported for SQL queries.

Best, Fabian

2018-02-27 15:20 GMT+01:00 Juho Autio <[hidden email]>:
Thanks for the hint! For some reason it isn't catching all distinct values (even though it's a much simpler way than what I initially tried and seems good in that sense). First of all, isn't this like a sliding window: "rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW"?

My use case needs a tumbling window. I tried adding PARTITION BY additionally with DATE_FORMAT(rowtime, '%Y%m%d%H') to achieve the same result as with a tumbling window; this resulted in slightly more distinct values, but was still missing some! Would there by some nice way to create a tumbling window right in the RANGE condition instead?

As a disclaimer I have to say we seem to be fine using a simple window _without_ any early triggering. But of course it would be nice to understand how early triggering could be enabled in a simple & scalable way.

Cheers,
Juho

On Mon, Feb 19, 2018 at 1:44 PM, Fabian Hueske <[hidden email]> wrote:
Hi Juho,

sorry for the late response. I found time to look into this issue.
I agree, that the start and end timestamps of the HOP window should be 1 hour apart from each other. I tried to reproduce the issue, but was not able to do so.
Can you maybe open a JIRA and provide a simple test case (collection data source, no Kafka) that reproduces the issue?

Regarding the task that you are trying to solve, have you looked into OVER windows?

The following query would count for each record, how often a record with the same ID combination was observed in the last hour based on its timestamp:

SELECT
  s_aid1,
  s_cid,
  COUNT(*) OVER (PARTITION BY s_aid1, s_cid ORDER BY rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS occurrence,
  rowtime
FROM events
WHERE s_aid1 IS NOT NULL

If occurrence is 1, the current record is the only record within the last 1 hour with the combination of aid and cid .
The query does not batch the stream by 10 seconds, but rather produces the results in real-time. If the batching is not required, you should be good by adding a filter on occurrence = 1.
Otherwise, you could add the filter and wrap it by 10 secs tumbling window.

Hope this helps,
Fabian


2018-02-14 15:30 GMT+01:00 Juho Autio <[hidden email]>:
I'm joining a tumbling & hopping window in Flink 1.5-SNAPSHOT. The result is unexpected. Am I doing something wrong? Maybe this is just not a supported join type at all? Any way here goes:

I first register these two tables:

1. new_ids: a tumbling window of seen ids within the last 10 seconds:

SELECT
  s_aid1,
  s_cid,
  TS_MIN(rowtime) AS first_seen,
  CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate,
  TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start,
  TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end
FROM events
WHERE s_aid1 IS NOT NULL
GROUP BY
  s_aid1,
  s_cid,
  TUMBLE(rowtime, INTERVAL '10' SECOND)

2. seen_ids: a sliding window of seen ids 1 hour backwards, 10 second hop:

SELECT
  s_aid1,
  s_cid,
  TS_MIN(rowtime) AS first_seen,
  CAST(HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS DATE) AS processdate,
  HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_start,
  HOP_END(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_end
FROM events
WHERE s_aid1 IS NOT NULL
GROUP BY
  s_aid1,
  s_cid,
  HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR)

If I write the results of the "seen_ids" table, the difference between HOP_start and HOP_end is always 1 hour, as expected.

Then I register another query that joins the 2 tables:

unique_ids (mostly including fields for debugging - what I need is the unique, new combinations of s_cid x s_aid1):

SELECT
   new_ids.s_cid,
   new_ids.s_aid1,
   new_ids.processdate AS processdate,
   seen_ids.processdate AS seen_ids_processdate,
   new_ids.first_seen AS new_ids_first_seen,
   seen_ids.first_seen AS seen_ids_first_seen,
   tumble_start,
   HOP_start,
   tumble_end,
   HOP_end
FROM new_ids, seen_ids
WHERE new_ids.s_cid = seen_ids.s_cid
  AND new_ids.s_aid1 = seen_ids.s_aid1
  AND (new_ids.first_seen <= seen_ids.first_seen OR seen_ids.first_seen IS NULL)

I print the results of this table, and surprisingly the HOP_start & HOP_end are only separated by 10 seconds. Is this a bug?

{
  "s_cid": "appsimulator_236e5fb7",
"s_aid1": "L1GENe52d723b-b563-492f-942d-3dc1a31d7e26",

"seen_ids_processdate": "2018-02-14",

"seen_ids_first_seen": "2018-02-14 11:37:59.0",
"new_ids_first_seen":  "2018-02-14 11:34:33.0",
"tumble_start": "2018-02-14 11:34:30.0",
"tumble_end": "2018-02-14 11:34:40.0",

"HOP_start": "2018-02-14 11:37:50.0",
"HOP_end": "2018-02-14 11:38:00.0"
}

What I'm trying to do is exclude the id from the current "new_ids" window if it was already seen before (within the 1 hour scope of "seen_ids"), but that doesn't work either. This example result row also shows that "seen_ids.first_seen" is bigger than it should be.


Even if I can find a fix to this to get what I need, this strategy seems overly complicated. If anyone can suggest a better way, I'd be glad to hear. If this was a batch job, it could be defined simply as:

SELECT DISTINCT s_cid, s_aid1, DATE_FORMAT(rowtime, '%Y%m%d/%H')

+ when streaming this query, the new distinct values should be written out every 10 seconds (ONLY the new ones - within that wrapping 1 hour window). So far I haven't been able to figure out how to do that in a simple way with Flink.


*) TS_MIN is a custom function, but it's just a mapping of Flink's MinAggFunction:

import java.sql.Timestamp

import com.rovio.ds.flink.common.udaf.ImplicitOrdering.ordered

import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
import org.apache.flink.table.functions.aggfunctions.MaxAggFunction
import org.apache.flink.table.functions.aggfunctions.MinAggFunction

object TimestampAggFunctions {

  trait TimestampAggFunction {
    def getInitValue = null
    def getValueTypeInfo = SqlTimeTypeInfo.TIMESTAMP
  }

  class TimestampMinAggFunction extends MinAggFunction[Timestamp] with TimestampAggFunction
  class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] with TimestampAggFunction

}

// Registered with:
tableEnv.registerFunction("TS_MIN", new TimestampMinAggFunction());





Reply | Threaded
Open this post in threaded view
|

Re: Unexpected hop start & end timestamps after stream SQL join

Juho Autio
Actually looks like I found why the "count(*) AS occurrence" + filter "occurrence = 1" doesn't work. If there are multiple events with the same event time, they get handled together and share the value for count(*). I printed out some rows before the filter* and this is what I get:

4> {"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.667+0000","occurrence":2}
4> {"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.667+0000","occurrence":2}
4> {"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.668+0000","occurrence":5}
4> {"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.668+0000","occurrence":5}
4> {"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.668+0000","occurrence":5}

Note that I'm now actually using INTERVAL '1' DAY instead of '1' HOUR. To match that, formatting is: "CAST(DATE_FORMAT(rowtime, '%Y%m%d') AS VARCHAR) AS processdate".

So at least the count(*) trick won't work just like that.

For me it's fine to just say that early firing is not possible. Trying to achieve that with some kind of workaround also seems a bit risky to me, like the above attempt shows. Maybe it's best to wait for this to be supported properly. As I said I don't seem to really need early firing right now, because writing out all distinct values once window closes is not too slow for us at the moment.

Thanks again,
Juho

*) The full query was:

SELECT
  s_aid1,
  s_cid,
  CAST(DATE_FORMAT(rowtime, '%Y%m%d') AS VARCHAR) AS processdate,
  rowtime,
  COUNT(*) OVER (
    PARTITION BY s_aid1, s_cid, CAST(DATE_FORMAT(rowtime, '%Y%m%d') AS VARCHAR)
    ORDER BY rowtime
    RANGE BETWEEN INTERVAL '1' DAY
    PRECEDING AND CURRENT ROW
  ) AS occurrence
FROM events
WHERE s_aid1 IS NOT NULL AND s_aid1 <> '' AND s_cid IS NOT NULL

On Tue, Feb 27, 2018 at 7:52 PM, Juho Autio <[hidden email]> wrote:
> a query with an OVER aggregation should emit exactly one row for each input row. 
> Does your comment on "isn't catching all distinct values" mean that this is not the case?

Not really what I meant? The problem is that some ids are not received at all for some time windows.

I did this as you suggested, this part works (there are no duplicates):

        Table result = tableEnv.sql(uniqueIdsSql);
        // remove duplicates (this is a trick to get only distinct values but get them asap)
        result = result.filter("occurrence = 1");

I'm seeing all ids at least once, but missing them from some time windows where they occurred as well. So it seems like the uniqueness is not properly scoped to the time windows. I don't see why not..

I can try to create a simplified class to hopefully reproduce this problem. Maybe also for the original issue that I encountered with hop start/end timestamps after a join.

Thanks for looking into this so far!

On Tue, Feb 27, 2018 at 4:45 PM, Fabian Hueske <[hidden email]> wrote:
Hi Juho,

a query with an OVER aggregation should emit exactly one row for each input row.
Does your comment on "isn't catching all distinct values" mean that this is not the case?

You can combine tumbling windows and over aggregates also by nesting queries as shown below:

SELECT
  s_aid1,
  s_cid,
  first_seen,
  MIN(first_seen) OVER (PARTITION BY s_aid1, s_cid ORDER BY rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS first_seen_1h,
  processdate,
  tumble_start,
  tumble_end
FROM (
  SELECT
    s_aid1,
    s_cid,
    TS_MIN(rowtime) AS first_seen,
    CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate,
    TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start,
    TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end,
    TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) AS rowtime
  FROM events
  WHERE s_aid1 IS NOT NULL
  GROUP BY
    s_aid1,
    s_cid,
    TUMBLE(rowtime, INTERVAL '10' SECOND)
  )

Early triggering is not yet supported for SQL queries.

Best, Fabian

2018-02-27 15:20 GMT+01:00 Juho Autio <[hidden email]>:
Thanks for the hint! For some reason it isn't catching all distinct values (even though it's a much simpler way than what I initially tried and seems good in that sense). First of all, isn't this like a sliding window: "rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW"?

My use case needs a tumbling window. I tried adding PARTITION BY additionally with DATE_FORMAT(rowtime, '%Y%m%d%H') to achieve the same result as with a tumbling window; this resulted in slightly more distinct values, but was still missing some! Would there by some nice way to create a tumbling window right in the RANGE condition instead?

As a disclaimer I have to say we seem to be fine using a simple window _without_ any early triggering. But of course it would be nice to understand how early triggering could be enabled in a simple & scalable way.

Cheers,
Juho

On Mon, Feb 19, 2018 at 1:44 PM, Fabian Hueske <[hidden email]> wrote:
Hi Juho,

sorry for the late response. I found time to look into this issue.
I agree, that the start and end timestamps of the HOP window should be 1 hour apart from each other. I tried to reproduce the issue, but was not able to do so.
Can you maybe open a JIRA and provide a simple test case (collection data source, no Kafka) that reproduces the issue?

Regarding the task that you are trying to solve, have you looked into OVER windows?

The following query would count for each record, how often a record with the same ID combination was observed in the last hour based on its timestamp:

SELECT
  s_aid1,
  s_cid,
  COUNT(*) OVER (PARTITION BY s_aid1, s_cid ORDER BY rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS occurrence,
  rowtime
FROM events
WHERE s_aid1 IS NOT NULL

If occurrence is 1, the current record is the only record within the last 1 hour with the combination of aid and cid .
The query does not batch the stream by 10 seconds, but rather produces the results in real-time. If the batching is not required, you should be good by adding a filter on occurrence = 1.
Otherwise, you could add the filter and wrap it by 10 secs tumbling window.

Hope this helps,
Fabian


2018-02-14 15:30 GMT+01:00 Juho Autio <[hidden email]>:
I'm joining a tumbling & hopping window in Flink 1.5-SNAPSHOT. The result is unexpected. Am I doing something wrong? Maybe this is just not a supported join type at all? Any way here goes:

I first register these two tables:

1. new_ids: a tumbling window of seen ids within the last 10 seconds:

SELECT
  s_aid1,
  s_cid,
  TS_MIN(rowtime) AS first_seen,
  CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate,
  TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start,
  TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end
FROM events
WHERE s_aid1 IS NOT NULL
GROUP BY
  s_aid1,
  s_cid,
  TUMBLE(rowtime, INTERVAL '10' SECOND)

2. seen_ids: a sliding window of seen ids 1 hour backwards, 10 second hop:

SELECT
  s_aid1,
  s_cid,
  TS_MIN(rowtime) AS first_seen,
  CAST(HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS DATE) AS processdate,
  HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_start,
  HOP_END(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_end
FROM events
WHERE s_aid1 IS NOT NULL
GROUP BY
  s_aid1,
  s_cid,
  HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR)

If I write the results of the "seen_ids" table, the difference between HOP_start and HOP_end is always 1 hour, as expected.

Then I register another query that joins the 2 tables:

unique_ids (mostly including fields for debugging - what I need is the unique, new combinations of s_cid x s_aid1):

SELECT
   new_ids.s_cid,
   new_ids.s_aid1,
   new_ids.processdate AS processdate,
   seen_ids.processdate AS seen_ids_processdate,
   new_ids.first_seen AS new_ids_first_seen,
   seen_ids.first_seen AS seen_ids_first_seen,
   tumble_start,
   HOP_start,
   tumble_end,
   HOP_end
FROM new_ids, seen_ids
WHERE new_ids.s_cid = seen_ids.s_cid
  AND new_ids.s_aid1 = seen_ids.s_aid1
  AND (new_ids.first_seen <= seen_ids.first_seen OR seen_ids.first_seen IS NULL)

I print the results of this table, and surprisingly the HOP_start & HOP_end are only separated by 10 seconds. Is this a bug?

{
  "s_cid": "appsimulator_236e5fb7",
"s_aid1": "L1GENe52d723b-b563-492f-942d-3dc1a31d7e26",

"seen_ids_processdate": "2018-02-14",

"seen_ids_first_seen": "2018-02-14 11:37:59.0",
"new_ids_first_seen":  "2018-02-14 11:34:33.0",
"tumble_start": "2018-02-14 11:34:30.0",
"tumble_end": "2018-02-14 11:34:40.0",

"HOP_start": "2018-02-14 11:37:50.0",
"HOP_end": "2018-02-14 11:38:00.0"
}

What I'm trying to do is exclude the id from the current "new_ids" window if it was already seen before (within the 1 hour scope of "seen_ids"), but that doesn't work either. This example result row also shows that "seen_ids.first_seen" is bigger than it should be.


Even if I can find a fix to this to get what I need, this strategy seems overly complicated. If anyone can suggest a better way, I'd be glad to hear. If this was a batch job, it could be defined simply as:

SELECT DISTINCT s_cid, s_aid1, DATE_FORMAT(rowtime, '%Y%m%d/%H')

+ when streaming this query, the new distinct values should be written out every 10 seconds (ONLY the new ones - within that wrapping 1 hour window). So far I haven't been able to figure out how to do that in a simple way with Flink.


*) TS_MIN is a custom function, but it's just a mapping of Flink's MinAggFunction:

import java.sql.Timestamp

import com.rovio.ds.flink.common.udaf.ImplicitOrdering.ordered

import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
import org.apache.flink.table.functions.aggfunctions.MaxAggFunction
import org.apache.flink.table.functions.aggfunctions.MinAggFunction

object TimestampAggFunctions {

  trait TimestampAggFunction {
    def getInitValue = null
    def getValueTypeInfo = SqlTimeTypeInfo.TIMESTAMP
  }

  class TimestampMinAggFunction extends MinAggFunction[Timestamp] with TimestampAggFunction
  class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] with TimestampAggFunction

}

// Registered with:
tableEnv.registerFunction("TS_MIN", new TimestampMinAggFunction());






Reply | Threaded
Open this post in threaded view
|

Re: Unexpected hop start & end timestamps after stream SQL join

Fabian Hueske-2
Hi Juho,

I have to admit I lost a bit track of what you are trying to compute.
I also don't understand the problem with the missing ids.

The query that you shared in the last mail will return for each record with a valid s_aid1, s_cid combination how often the id combination has been seen so far on the current day.
At the beginning of each day, the count is reset to 0 (due to the date in the PARTITION BY clause).
The semantics that all records with the same timestamp are aggregated together, is intentional and defined by the SQL standard.

Something that you could try is to use a COUNT(*) with a "ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" OVER window.
For ROWS windows (in contrast to RANGE windows), each row should be individually counted, i.e., rows with the same timestamp get different counts.
Since you added the day to the partitioning close, the windows will always be scoped per day.
In that case, you need to configure a state retention time [1] of at least a day, to ensure that old windows are removed after the day.


2018-02-27 19:23 GMT+01:00 Juho Autio <[hidden email]>:
Actually looks like I found why the "count(*) AS occurrence" + filter "occurrence = 1" doesn't work. If there are multiple events with the same event time, they get handled together and share the value for count(*). I printed out some rows before the filter* and this is what I get:

4> {"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.667+0000","occurrence":2}
4> {"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.667+0000","occurrence":2}
4> {"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.668+0000","occurrence":5}
4> {"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.668+0000","occurrence":5}
4> {"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.668+0000","occurrence":5}

Note that I'm now actually using INTERVAL '1' DAY instead of '1' HOUR. To match that, formatting is: "CAST(DATE_FORMAT(rowtime, '%Y%m%d') AS VARCHAR) AS processdate".

So at least the count(*) trick won't work just like that.

For me it's fine to just say that early firing is not possible. Trying to achieve that with some kind of workaround also seems a bit risky to me, like the above attempt shows. Maybe it's best to wait for this to be supported properly. As I said I don't seem to really need early firing right now, because writing out all distinct values once window closes is not too slow for us at the moment.

Thanks again,
Juho

*) The full query was:

SELECT
  s_aid1,
  s_cid,
  CAST(DATE_FORMAT(rowtime, '%Y%m%d') AS VARCHAR) AS processdate,
  rowtime,
  COUNT(*) OVER (
    PARTITION BY s_aid1, s_cid, CAST(DATE_FORMAT(rowtime, '%Y%m%d') AS VARCHAR)
    ORDER BY rowtime
    RANGE BETWEEN INTERVAL '1' DAY
    PRECEDING AND CURRENT ROW
  ) AS occurrence
FROM events
WHERE s_aid1 IS NOT NULL AND s_aid1 <> '' AND s_cid IS NOT NULL

On Tue, Feb 27, 2018 at 7:52 PM, Juho Autio <[hidden email]> wrote:
> a query with an OVER aggregation should emit exactly one row for each input row. 
> Does your comment on "isn't catching all distinct values" mean that this is not the case?

Not really what I meant? The problem is that some ids are not received at all for some time windows.

I did this as you suggested, this part works (there are no duplicates):

        Table result = tableEnv.sql(uniqueIdsSql);
        // remove duplicates (this is a trick to get only distinct values but get them asap)
        result = result.filter("occurrence = 1");

I'm seeing all ids at least once, but missing them from some time windows where they occurred as well. So it seems like the uniqueness is not properly scoped to the time windows. I don't see why not..

I can try to create a simplified class to hopefully reproduce this problem. Maybe also for the original issue that I encountered with hop start/end timestamps after a join.

Thanks for looking into this so far!

On Tue, Feb 27, 2018 at 4:45 PM, Fabian Hueske <[hidden email]> wrote:
Hi Juho,

a query with an OVER aggregation should emit exactly one row for each input row.
Does your comment on "isn't catching all distinct values" mean that this is not the case?

You can combine tumbling windows and over aggregates also by nesting queries as shown below:

SELECT
  s_aid1,
  s_cid,
  first_seen,
  MIN(first_seen) OVER (PARTITION BY s_aid1, s_cid ORDER BY rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS first_seen_1h,
  processdate,
  tumble_start,
  tumble_end
FROM (
  SELECT
    s_aid1,
    s_cid,
    TS_MIN(rowtime) AS first_seen,
    CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate,
    TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start,
    TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end,
    TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) AS rowtime
  FROM events
  WHERE s_aid1 IS NOT NULL
  GROUP BY
    s_aid1,
    s_cid,
    TUMBLE(rowtime, INTERVAL '10' SECOND)
  )

Early triggering is not yet supported for SQL queries.

Best, Fabian

2018-02-27 15:20 GMT+01:00 Juho Autio <[hidden email]>:
Thanks for the hint! For some reason it isn't catching all distinct values (even though it's a much simpler way than what I initially tried and seems good in that sense). First of all, isn't this like a sliding window: "rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW"?

My use case needs a tumbling window. I tried adding PARTITION BY additionally with DATE_FORMAT(rowtime, '%Y%m%d%H') to achieve the same result as with a tumbling window; this resulted in slightly more distinct values, but was still missing some! Would there by some nice way to create a tumbling window right in the RANGE condition instead?

As a disclaimer I have to say we seem to be fine using a simple window _without_ any early triggering. But of course it would be nice to understand how early triggering could be enabled in a simple & scalable way.

Cheers,
Juho

On Mon, Feb 19, 2018 at 1:44 PM, Fabian Hueske <[hidden email]> wrote:
Hi Juho,

sorry for the late response. I found time to look into this issue.
I agree, that the start and end timestamps of the HOP window should be 1 hour apart from each other. I tried to reproduce the issue, but was not able to do so.
Can you maybe open a JIRA and provide a simple test case (collection data source, no Kafka) that reproduces the issue?

Regarding the task that you are trying to solve, have you looked into OVER windows?

The following query would count for each record, how often a record with the same ID combination was observed in the last hour based on its timestamp:

SELECT
  s_aid1,
  s_cid,
  COUNT(*) OVER (PARTITION BY s_aid1, s_cid ORDER BY rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS occurrence,
  rowtime
FROM events
WHERE s_aid1 IS NOT NULL

If occurrence is 1, the current record is the only record within the last 1 hour with the combination of aid and cid .
The query does not batch the stream by 10 seconds, but rather produces the results in real-time. If the batching is not required, you should be good by adding a filter on occurrence = 1.
Otherwise, you could add the filter and wrap it by 10 secs tumbling window.

Hope this helps,
Fabian


2018-02-14 15:30 GMT+01:00 Juho Autio <[hidden email]>:
I'm joining a tumbling & hopping window in Flink 1.5-SNAPSHOT. The result is unexpected. Am I doing something wrong? Maybe this is just not a supported join type at all? Any way here goes:

I first register these two tables:

1. new_ids: a tumbling window of seen ids within the last 10 seconds:

SELECT
  s_aid1,
  s_cid,
  TS_MIN(rowtime) AS first_seen,
  CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate,
  TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start,
  TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end
FROM events
WHERE s_aid1 IS NOT NULL
GROUP BY
  s_aid1,
  s_cid,
  TUMBLE(rowtime, INTERVAL '10' SECOND)

2. seen_ids: a sliding window of seen ids 1 hour backwards, 10 second hop:

SELECT
  s_aid1,
  s_cid,
  TS_MIN(rowtime) AS first_seen,
  CAST(HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS DATE) AS processdate,
  HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_start,
  HOP_END(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_end
FROM events
WHERE s_aid1 IS NOT NULL
GROUP BY
  s_aid1,
  s_cid,
  HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR)

If I write the results of the "seen_ids" table, the difference between HOP_start and HOP_end is always 1 hour, as expected.

Then I register another query that joins the 2 tables:

unique_ids (mostly including fields for debugging - what I need is the unique, new combinations of s_cid x s_aid1):

SELECT
   new_ids.s_cid,
   new_ids.s_aid1,
   new_ids.processdate AS processdate,
   seen_ids.processdate AS seen_ids_processdate,
   new_ids.first_seen AS new_ids_first_seen,
   seen_ids.first_seen AS seen_ids_first_seen,
   tumble_start,
   HOP_start,
   tumble_end,
   HOP_end
FROM new_ids, seen_ids
WHERE new_ids.s_cid = seen_ids.s_cid
  AND new_ids.s_aid1 = seen_ids.s_aid1
  AND (new_ids.first_seen <= seen_ids.first_seen OR seen_ids.first_seen IS NULL)

I print the results of this table, and surprisingly the HOP_start & HOP_end are only separated by 10 seconds. Is this a bug?

{
  "s_cid": "appsimulator_236e5fb7",
"s_aid1": "L1GENe52d723b-b563-492f-942d-3dc1a31d7e26",

"seen_ids_processdate": "2018-02-14",

"seen_ids_first_seen": "2018-02-14 11:37:59.0",
"new_ids_first_seen":  "2018-02-14 11:34:33.0",
"tumble_start": "2018-02-14 11:34:30.0",
"tumble_end": "2018-02-14 11:34:40.0",

"HOP_start": "2018-02-14 11:37:50.0",
"HOP_end": "2018-02-14 11:38:00.0"
}

What I'm trying to do is exclude the id from the current "new_ids" window if it was already seen before (within the 1 hour scope of "seen_ids"), but that doesn't work either. This example result row also shows that "seen_ids.first_seen" is bigger than it should be.


Even if I can find a fix to this to get what I need, this strategy seems overly complicated. If anyone can suggest a better way, I'd be glad to hear. If this was a batch job, it could be defined simply as:

SELECT DISTINCT s_cid, s_aid1, DATE_FORMAT(rowtime, '%Y%m%d/%H')

+ when streaming this query, the new distinct values should be written out every 10 seconds (ONLY the new ones - within that wrapping 1 hour window). So far I haven't been able to figure out how to do that in a simple way with Flink.


*) TS_MIN is a custom function, but it's just a mapping of Flink's MinAggFunction:

import java.sql.Timestamp

import com.rovio.ds.flink.common.udaf.ImplicitOrdering.ordered

import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
import org.apache.flink.table.functions.aggfunctions.MaxAggFunction
import org.apache.flink.table.functions.aggfunctions.MinAggFunction

object TimestampAggFunctions {

  trait TimestampAggFunction {
    def getInitValue = null
    def getValueTypeInfo = SqlTimeTypeInfo.TIMESTAMP
  }

  class TimestampMinAggFunction extends MinAggFunction[Timestamp] with TimestampAggFunction
  class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] with TimestampAggFunction

}

// Registered with:
tableEnv.registerFunction("TS_MIN", new TimestampMinAggFunction());