Hi!
We are using apache-flink-1.4.2. It seems this version doesn't support count(DISTINCT). I am trying to find a way to dedup the stream. So I tried: SELECT CONCAT_WS( '-', CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR), CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR), CAST(user_id AS VARCHAR) ), COUNT(DISTINCT(event_id)) AS event_count FROM event_foo GROUP BY user_id, MONTH(longToDateTime(rowtime)), YEAR(longToDateTime(rowtime)) (the duplicate events have the same 'event_id' (and user_id), the other fields e.g. timestamps may or may not be different) But that failed because DISTINCT is not supported. As a workaround I tried: SELECT CONCAT_WS( '-', CAST(MONTH(row_datetime) AS VARCHAR), CAST(YEAR(row_datetime) AS VARCHAR), CAST(user_id AS VARCHAR) ), COUNT(event_id) AS event_count FROM ( SELECT user_id, event_id, maxtimestamp(longToDateTime(rowtime)) as row_datetime FROM event_foo GROUP BY event_id, user_id ) GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime) I am hoping the inner SELECT to do the deduping because logically it is equivalent to a DISTINCT. This works in my functional testing. Will it also work if the dedups span different event buckets? I was hoping that as long as the events arrive within the state "retention time" in flink they should be deduped but I am new to Flink so I am not sure about that. Can someone please correct me if I am wrong? Is this a reasonable workaround for lack of DISTINCT support? Please let me know if there is a better way. Thanks, Vinod |
More details on the error with query#1 that used COUNT(DISTINCT()): org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: FlinkLogicalCalc(expr#0..8=[{inputs}], expr#9=[_UTF-16LE'-'], expr#10=[CAST($t1):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"], expr#11=[CAST($t2):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"], expr#12=[CAST($t0):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"], expr#13=[CONCAT_WS($t9, $t10, $t11, $t12)], EXPR$0=[$t13], mastercard_world_elite_monthly_rides_encoded=[$t8], lower_boundary=[$t3], latency_marker=[$t4]) FlinkLogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $5), IS NOT DISTINCT FROM($1, $6), IS NOT DISTINCT FROM($2, $7))], joinType=[inner]) FlinkLogicalAggregate(group=[{0, 1, 2}], lower_boundary=[mintimestamp($4)], latency_marker=[maxtimestamp($4)]) FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)], expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)], expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)], expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)], expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'], expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)], expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3], $f1=[$t13], $f2=[$t15], ride_id=[$t1], $f4=[$t9], $condition=[$t21]) FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]]) FlinkLogicalAggregate(group=[{0, 1, 2}], mastercard_world_elite_monthly_rides_encoded=[COUNT($3)]) FlinkLogicalAggregate(group=[{0, 1, 2, 3}]) FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)], expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)], expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)], expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)], expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'], expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)], expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3], $f1=[$t13], $f2=[$t15], ride_id=[$t1], $condition=[$t21]) FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]]) This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274) at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730) at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414) On Wed, May 29, 2019 at 1:49 PM Vinod Mehra <[hidden email]> wrote:
|
Another interesting thing is that if I add DISTINCT in the 2nd query it doesn't complain. But because of the inner-select it is a no-op because the inner select is doing the deduping: SELECT CONCAT_WS( '-', CAST(MONTH(row_datetime) AS VARCHAR), CAST(YEAR(row_datetime) AS VARCHAR), CAST(user_id AS VARCHAR) ), COUNT(DISTINCT(event_id)) AS event_count -- note the DISTINCT keyword here. Flink doesn't barf for this. FROM ( SELECT user_id, event_id, maxtimestamp(longToDateTime(rowtime)) as row_datetime FROM event_foo GROUP BY event_id, user_id ) GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime) On Wed, May 29, 2019 at 5:15 PM Vinod Mehra <[hidden email]> wrote:
|
Hi Vinod, IIRC, support for DISTINCT aggregates was added in Flink 1.6.0 (released August, 9th 2018) [1]. Also note that by default, this query will accumulate more and more state, i.e., for each grouping key it will hold all unique event_ids. You could configure an idle state retention time to clean up unused state. Regarding the boundaries, with the current query they are fixed to one month and sharply cut (as one would expect). You could try to use a long running session window [3]. This would also remove the need for the idle state configuration because Flink would know when state can be discarded. Hope this helps, Fabian Am Do., 30. Mai 2019 um 02:18 Uhr schrieb Vinod Mehra <[hidden email]>:
|
Thanks a lot Fabian for the detailed response. I know all the duplicates are going to arrive within an hour max of the actual event. So using a 1 hour running session window should be fine for me. Is the following the right way to do it in apache-flink-1.4.2?
Thanks, Vinod On Mon, Jun 3, 2019 at 4:52 AM Fabian Hueske <[hidden email]> wrote:
|
To be clear I want the outer grouping to have a longer retention time (of the order of week or month - for which we are using 'idle state retention time') and inner grouping to have a shorter retention period (1 hour max). So hoping the session window will do the right thing. Thanks, Vinod On Tue, Jun 4, 2019 at 5:14 PM Vinod Mehra <[hidden email]> wrote:
|
Hi Vinod, Sorry for the late reply. Your approach looks good to me. A few things to note: * It is not possible to set different idle state retention timers for different parts of a query. All operators that support idle state retention use the same configuration. * The inner query with the SESSION window does not support (and also does not need) idle state retention timers. SESSION window state is automatically cleaned up based on watermarks. So, you don't have to worry about this one. * If you set the idle state retention time to one month (+some safety margin) it should not affect the correctness of the results because the query would maintain the counts for one more month after the last access. However, this also means that you keep up to two months of state around (up to two counts for each user). Cheers, Fabian Am Mi., 5. Juni 2019 um 03:07 Uhr schrieb Vinod Mehra <[hidden email]>:
|
Free forum by Nabble | Edit this page |