I'm joining a tumbling & hopping window in Flink 1.5-SNAPSHOT. The result is unexpected. Am I doing something wrong? Maybe this is just not a supported join type at all? Any way here goes:
I first register these two tables:
1. new_ids: a tumbling window of seen ids within the last 10 seconds:
SELECT
s_aid1,
s_cid,
TS_MIN(rowtime) AS first_seen,
CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate,
TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start,
TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end
FROM events
WHERE s_aid1 IS NOT NULL
GROUP BY
s_aid1,
s_cid,
TUMBLE(rowtime, INTERVAL '10' SECOND)
2. seen_ids: a sliding window of seen ids 1 hour backwards, 10 second hop:
SELECT
s_aid1,
s_cid,
TS_MIN(rowtime) AS first_seen,
CAST(HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS DATE) AS processdate,
HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_start,
HOP_END(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_end
FROM events
WHERE s_aid1 IS NOT NULL
GROUP BY
s_aid1,
s_cid,
HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR)
unique_ids (mostly including fields for debugging - what I need is the unique, new combinations of s_cid x s_aid1):
SELECT
new_ids.s_cid,
new_ids.s_aid1,
new_ids.processdate AS processdate,
seen_ids.processdate AS seen_ids_processdate,
new_ids.first_seen AS new_ids_first_seen,
seen_ids.first_seen AS seen_ids_first_seen,
tumble_start,
HOP_start,
tumble_end,
HOP_end
FROM new_ids, seen_ids
WHERE new_ids.s_cid = seen_ids.s_cid
AND new_ids.s_aid1 = seen_ids.s_aid1
AND (new_ids.first_seen <= seen_ids.first_seen OR seen_ids.first_seen IS NULL)
I print the results of this table, and surprisingly the HOP_start & HOP_end are only separated by 10 seconds. Is this a bug?
{
"s_cid": "appsimulator_236e5fb7",
"s_aid1": "L1GENe52d723b-b563-492f-942d-3dc1a31d7e26",
"seen_ids_processdate": "2018-02-14",
"seen_ids_first_seen": "2018-02-14 11:37:59.0",
"new_ids_first_seen": "2018-02-14 11:34:33.0",
"tumble_start": "2018-02-14 11:34:30.0",
"tumble_end": "2018-02-14 11:34:40.0",
"HOP_start": "2018-02-14 11:37:50.0",
"HOP_end": "2018-02-14 11:38:00.0"
}
What I'm trying to do is exclude the id from the current "new_ids" window if it was already seen before (within the 1 hour scope of "seen_ids"), but that doesn't work either. This example result row also shows that "seen_ids.first_seen" is bigger than it should be.
Even if I can find a fix to this to get what I need, this strategy seems overly complicated. If anyone can suggest a better way, I'd be glad to hear. If this was a batch job, it could be defined simply as:
SELECT DISTINCT s_cid, s_aid1, DATE_FORMAT(rowtime, '%Y%m%d/%H')
+ when streaming this query, the new distinct values should be written out every 10 seconds (ONLY the new ones - within that wrapping 1 hour window). So far I haven't been able to figure out how to do that in a simple way with Flink.
*) TS_MIN is a custom function, but it's just a mapping of Flink's MinAggFunction:
import java.sql.Timestamp
import com.rovio.ds.flink.common.udaf.ImplicitOrdering.ordered
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
import org.apache.flink.table.functions.aggfunctions.MaxAggFunction
import org.apache.flink.table.functions.aggfunctions.MinAggFunction
object TimestampAggFunctions {
trait TimestampAggFunction {
def getInitValue = null
def getValueTypeInfo = SqlTimeTypeInfo.TIMESTAMP
}
class TimestampMinAggFunction extends MinAggFunction[Timestamp] with TimestampAggFunction
class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] with TimestampAggFunction
}
// Registered with:
tableEnv.registerFunction("TS_MIN", new TimestampMinAggFunction());