The following query would count for each record, how often a record with the same ID combination was observed in the last hour based on its timestamp:Regarding the task that you are trying to solve, have you looked into OVER windows?Can you maybe open a JIRA and provide a simple test case (collection data source, no Kafka) that reproduces the issue?I agree, that the start and end timestamps of the HOP window should be 1 hour apart from each other. I tried to reproduce the issue, but was not able to do so.Hi Juho,sorry for the late response. I found time to look into this issue.SELECT
s_aid1,s_cid,COUNT(*) OVER (PARTITION BY s_aid1, s_cid ORDER BY rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS occurrence,rowtimeFROM eventsWHERE s_aid1 IS NOT NULLIf occurrence is 1, the current record is the only record within the last 1 hour with the combination of aid and cid .The query does not batch the stream by 10 seconds, but rather produces the results in real-time. If the batching is not required, you should be good by adding a filter on occurrence = 1.
Otherwise, you could add the filter and wrap it by 10 secs tumbling window.Hope this helps,Fabian2018-02-14 15:30 GMT+01:00 Juho Autio <[hidden email]>:I'm joining a tumbling & hopping window in Flink 1.5-SNAPSHOT. The result is unexpected. Am I doing something wrong? Maybe this is just not a supported join type at all? Any way here goes:I first register these two tables:1. new_ids: a tumbling window of seen ids within the last 10 seconds:SELECTs_aid1,s_cid,TS_MIN(rowtime) AS first_seen,CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate, TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start,TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_endFROM eventsWHERE s_aid1 IS NOT NULLGROUP BYs_aid1,s_cid,TUMBLE(rowtime, INTERVAL '10' SECOND)2. seen_ids: a sliding window of seen ids 1 hour backwards, 10 second hop:SELECTs_aid1,s_cid,TS_MIN(rowtime) AS first_seen,CAST(HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS DATE) AS processdate,HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_start,HOP_END(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_endFROM eventsWHERE s_aid1 IS NOT NULLGROUP BYs_aid1,s_cid,HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR)Then I register another query that joins the 2 tables:If I write the results of the "seen_ids" table, the difference between HOP_start and HOP_end is always 1 hour, as expected.unique_ids (mostly including fields for debugging - what I need is the unique, new combinations of s_cid x s_aid1):SELECTnew_ids.s_cid,new_ids.s_aid1,new_ids.processdate AS processdate,seen_ids.processdate AS seen_ids_processdate,new_ids.first_seen AS new_ids_first_seen,seen_ids.first_seen AS seen_ids_first_seen,tumble_start,HOP_start,tumble_end,HOP_endFROM new_ids, seen_idsWHERE new_ids.s_cid = seen_ids.s_cidAND new_ids.s_aid1 = seen_ids.s_aid1AND (new_ids.first_seen <= seen_ids.first_seen OR seen_ids.first_seen IS NULL)I print the results of this table, and surprisingly the HOP_start & HOP_end are only separated by 10 seconds. Is this a bug?{"s_cid": "appsimulator_236e5fb7","s_aid1": "L1GENe52d723b-b563-492f-942d-3dc1a31d7e26", "seen_ids_processdate": "2018-02-14","seen_ids_first_seen": "2018-02-14 11:37:59.0","new_ids_first_seen": "2018-02-14 11:34:33.0","tumble_start": "2018-02-14 11:34:30.0","tumble_end": "2018-02-14 11:34:40.0","HOP_start": "2018-02-14 11:37:50.0","HOP_end": "2018-02-14 11:38:00.0"}What I'm trying to do is exclude the id from the current "new_ids" window if it was already seen before (within the 1 hour scope of "seen_ids"), but that doesn't work either. This example result row also shows that "seen_ids.first_seen" is bigger than it should be.Even if I can find a fix to this to get what I need, this strategy seems overly complicated. If anyone can suggest a better way, I'd be glad to hear. If this was a batch job, it could be defined simply as:SELECT DISTINCT s_cid, s_aid1, DATE_FORMAT(rowtime, '%Y%m%d/%H') + when streaming this query, the new distinct values should be written out every 10 seconds (ONLY the new ones - within that wrapping 1 hour window). So far I haven't been able to figure out how to do that in a simple way with Flink.*) TS_MIN is a custom function, but it's just a mapping of Flink's MinAggFunction:import java.sql.Timestampimport com.rovio.ds.flink.common.udaf.ImplicitOrdering.ordered import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo import org.apache.flink.table.functions.aggfunctions.MaxAggFunction import org.apache.flink.table.functions.aggfunctions.MinAggFunction object TimestampAggFunctions {trait TimestampAggFunction {def getInitValue = nulldef getValueTypeInfo = SqlTimeTypeInfo.TIMESTAMP}class TimestampMinAggFunction extends MinAggFunction[Timestamp] with TimestampAggFunctionclass TimestampMaxAggFunction extends MaxAggFunction[Timestamp] with TimestampAggFunction}// Registered with:tableEnv.registerFunction("TS_MIN", new TimestampMinAggFunction());
Free forum by Nabble | Edit this page |