Hi,
I have a use case where I would like to find distinct rows over certain period of time. Requirement is that new row is emitted asap. Otherwise the requirement is mainly to just filter out data to have smaller dataset for downstream. I noticed that SELECT DISTINCT and state retention time of 12 hours would in theory do the trick. You can find the code below. Few questions. 1) Why is SELECT DISTINCT creating a retract stream? In which scenarios we would get update/delete rows? 2) If I run the below code with the example data (also below) without state retention config I get the two append rows (expected). If I run exactly the code below (with the retention config) I'll get two appends and one delete for AN1234 and then one append for AN5555. What is going on? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); StreamQueryConfig qConfig = tableEnv.queryConfig(); // set idle state retention time. min = max = 12 hours qConfig.withIdleStateRetentionTime(Time.hours(12)); // create a TableSource CsvTableSource csvSource = CsvTableSource .builder() .path("data.csv") .field("ts", Types.SQL_TIMESTAMP()) .field("aid1", Types.STRING()) .field("aid2", Types.STRING()) .field("advertiser_id", Types.STRING()) .field("platform_id", Types.STRING()) .fieldDelimiter(",") .build(); tableEnv.registerTableSource("CsvTable", csvSource); Table result = tableEnv.sqlQuery( "SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable"); StdOutRetractStreamTableSink out = new StdOutRetractStreamTableSink(new String[] {"aid1", "aid2", "advertiser_id", "platform_id"}, new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()}); result.writeToSink(out, qConfig); env.execute(); Here is a simple csv dataset of three rows: 2018-01-31 12:00:00,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1234,1234567890 2018-01-31 12:00:02,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1234,1234567890 2018-01-31 12:00:02,AN5555,RC5555,0000-0000-0000-00001,1234-1234-1234-1234,1234567891 |
Hi Henri,
I try to answer your question: 1) You are right, SELECT DISTINCT should not need a retract stream. Internally, this is translated into an aggregation without an aggregate function call. So this definitely needs improvement. 2) The problem is that SELECT DISTINCT is not officially supported nor tested. I opened an issue for this [1]. Until this issue is fixed I would recommend to implement a custom aggregate function that keeps track values seen so far [2]. Regards, Timo [1] https://issues.apache.org/jira/browse/FLINK-8564 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/udfs.html#aggregation-functions Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen:
|
Hi Henri,
I just noticed that I had a tiny mistake in my little test program. So SELECT DISTINCT is officially supported. But the question if this is a valid append stream is still up for discussion. I will loop in Fabian (in CC). For the general behavior you can also look into the code and especially the comments there [1]. Regards, Timo [1] https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala Am 2/6/18 um 1:36 PM schrieb Timo Walther:
|
Hi Henri, thanks for reaching out and providing code and data to reproduce the issue.I think you are right, a "SELECT DISTINCT a, b, c FROM X" should not result in a retraction stream. However, with the current implementation we internally need a retraction stream if a state retention time is configured. The reason lies in how state retention time is defined: the state retention time will remove the state for a key if it hasn't been seen for x time. This means that an operator resets a state clean-up timer of a key whenever a new record with that key is received. This is also true for retraction / insertion messages of the same record. If we implement the GroupBy that performs the DISTINCT as an operator that emits an append stream, all downstream operator won't see any updates because the GroupBy only emits the first and filters out all duplicates. Hence, downstream operators would perform a clean-up too early. I see that these are internals that users should not need to worry about, but right now there is no easy solution to this. Eventually, the clean-up timer reset should be differently implemented than using retraction and insert of the same record. However, this would be a more involved change and requires good planning. I'll file a JIRA for that. Thanks again for bringing the issue to our attention. Best, Fabian 2018-02-06 13:59 GMT+01:00 Timo Walther <[hidden email]>:
|
Hi, Thanks. Doing this deduplication would be easy just by using vanilla flink api and state (check if this is a new key and then emit), but the issue has been automatic state cleanup. However, it looks like this streaming sql retention time implementation uses the process function and timer. I was a bit reluctant to use that because I was worried that the approach would be overkill with our volumes, but maybe it will work just fine. Can you help me a bit how to implement it efficiently? Basically we get estimated of 20M of distinct rows/key and roughly 300 events per key during one day. What I would like to do is to clear the state for specific key if I have not seen such key for last 12 hours. I think its very close to example here: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html. Instead of emitting the data onTimer I would just clear the state. In the example each tuple will invoke registerEventTimeTimer(). Is this the correct pattern? E.g. in our case we could get hundreds of events with the same key during few minutes, so would we then register hundreds of timer instances? Br, Henkka On Tue, Feb 6, 2018 at 3:45 PM, Fabian Hueske <[hidden email]> wrote:
|
Hi Henkka, This should be fairly easy to implement in a ProcessFunction. That's why the state retention time allows to set a min and max timer. With that, you only have to set a timer every (max - min) interval. For example, if you say, the application should keep the state at least for 12 hours but the most for 14 hours, you only need to register a new timer every 2 hours. 2018-02-06 15:47 GMT+01:00 Henri Heiskanen <[hidden email]>:
|
Hi, Oh, right.. got it. Thanks! Br, Henkka On Tue, Feb 6, 2018 at 5:01 PM, Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |