Hello,
I'm checking if this is intentional or a bug in Apache Flink SQL (Flink 1.6.0). I am using processing time with a RocksDB backend. I have not checked if this issue is also occurring in the Table API. I have not checked if this issue also exists for event time (although I suspect it does). Consider the following two queries: "SELECT foo, COUNT(bar) FROM MyTable WHERE faz = 'xyz' GROUP BY HOP(myTime, INTERVAL '1' SECOND, INTERVAL '5' SECOND), foo" and "SELECT foo, COUNT(bar) FROM MyTable WHERE faz = 'xyz' GROUP BY TUMBLE(myTime, INTERVAL '5' SECOND), foo" I have found in my testing for both of the above that events received in the first 5 seconds are ignored. In other words, the first window interval is effectively a black hole, and only events which are received starting after the first 5 seconds of the stream being "up" are processed. Is this ignoring of events during the first interval a bug or intentional? Many thanks, John |
Hi John, Are you sure that the first rows of the first window are dropped? When a query with processing time windows is terminated, the last window is not computed. This in fact intentional and does not apply to event-time windows. Best, Fabian 2018-09-17 17:21 GMT+02:00 John Stone <[hidden email]>:
|
In reply to this post by John Stone
Hi John,
I’ve not dug into this yet, but IMO, it shouldn’t be the case. I just wonder how do you judge that the data in the first five seconds are not processed by the system? Best, Xingcan > On Sep 17, 2018, at 11:21 PM, John Stone <[hidden email]> wrote: > > Hello, > > I'm checking if this is intentional or a bug in Apache Flink SQL (Flink 1.6.0). > > I am using processing time with a RocksDB backend. I have not checked if this issue is also occurring in the Table API. I have not checked if this issue also exists for event time (although I suspect it does). > > Consider the following two queries: > > "SELECT foo, COUNT(bar) > FROM MyTable > WHERE faz = 'xyz' > GROUP BY HOP(myTime, INTERVAL '1' SECOND, INTERVAL '5' SECOND), foo" > > and > > "SELECT foo, COUNT(bar) > FROM MyTable > WHERE faz = 'xyz' > GROUP BY TUMBLE(myTime, INTERVAL '5' SECOND), foo" > > I have found in my testing for both of the above that events received in the first 5 seconds are ignored. In other words, the first window interval is effectively a black hole, and only events which are received starting after the first 5 seconds of the stream being "up" are processed. > > Is this ignoring of events during the first interval a bug or intentional? > > Many thanks, > > John |
In reply to this post by John Stone
Yes, I am certain events are being ignored or dropped during the first five seconds. Further investigation on my part reveals that the "ignore" period is exactly the first five seconds of the stream - regardless of the size of the window.
Situation I have a script which pushes an event into Kafka once every second structured as: {"userId": "[hidden email]", "timestamp": <timestamp from the producer>} My stream uses this Kafka queue as its source. JSON schema and table schema are as follows: final Json jsonFormat = new Json() .failOnMissingField(false) .jsonSchema("{" + " type: 'object'," + " properties: {" + " userId: { type: 'string' }," + " timestamp: { type: 'integer' }" + " }" + "}"); final Schema tableSchema = new Schema() .field("userId", Types.STRING()) .field("timestamp", TypeInformation.of(BigDecimal.class)) .field("proctime", Types.SQL_TIMESTAMP()) .proctime(); StreamTableEnvironment is configured to be in append mode, and table source is named "MyEventTable". The stream is using the following SQL query: final String sql = " SELECT userId, `timestamp` " + " FROM MyEventTable " + " GROUP BY HOP(proctime, INTERVAL '1' SECOND, INTERVAL '10' SECOND), userId, `timestamp` "; final Table resultTable = tableEnvironment.sqlQuery(sql); Code which I'm using to verify that events are being dropped: streamTableEnvironment.toAppendStream(sqlResultTable, Row.class) .map((MapFunction<Row, String>) row -> { final String userId = row.getField(0).toString(); final BigDecimal timestamp = (BigDecimal) row.getField(1); return String.format( "(%s, %s)", userId, timestamp.toString() ); }) .print(); No events produced during the first five seconds following a cold start of Flink are ever printed to the console. Any and all events produced after the first five seconds following a cold start of Flink are always printed to the console. All processes are running on the same system. This issue does not occur when using raw Flink (windowed or not) nor when using Flink CEP. Again, have not tried Table API. |
Hmm, that's interesting. HOP and TUMBLE window aggregations are directly translated into their corresponding DataStream counterparts (Sliding, Tumble). There should be no filtering of records. I assume you tried a simple query like "SELECT * FROM MyEventTable" and received all expected data? Fabian 2018-09-17 18:56 GMT+02:00 [hidden email] <[hidden email]>: Yes, I am certain events are being ignored or dropped during the first five seconds. Further investigation on my part reveals that the "ignore" period is exactly the first five seconds of the stream - regardless of the size of the window. |
This is in fact a very strange behavior. To add to the discussion, when you mentioned: "raw Flink (windowed or not) nor when using Flink CEP", how were the comparisons being done? Also, were you able to get the results correct without the additional GROUP BY term of "foo" or "userId"? -- Rong On Mon, Sep 17, 2018 at 12:30 PM Fabian Hueske <[hidden email]> wrote:
|
Hi John,
I suppose that was caused by the groupBy field “timestamp”. You were actually grouping on two time fields simultaneously, the processing time and the time from your producer. As @Rong suggested, try removing the additional groupBy field “timestamp” and check the result again. Best, Xingcan
|
In reply to this post by John Stone
Thank you all for your assistance. I believe I've found the root cause if the behavior I am seeing.
If I just use "SELECT * FROM MyEventTable" (Fabian's question), I find that events received in the first 3 seconds are ignored as opposed to the original 5. What I'm seeing seems to suggest that the system is just not ready to start receiving events during that time, i.e. the underlying DataStream is still getting created, and that removing the GROUP BY reduces the overall size of the stream which in turn reduces the initialization time. Many thanks, John |
Hi John, Just to clarify, this missing data is due to the starting overhead and not due to a bug? Best, Fabian 2018-09-18 15:35 GMT+02:00 John Stone <[hidden email]>: Thank you all for your assistance. I believe I've found the root cause if the behavior I am seeing. |
Fabian,
I believe so, yes. Many thanks, John |
Free forum by Nabble | Edit this page |