Potential bug in Flink SQL HOP and TUMBLE operators

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Potential bug in Flink SQL HOP and TUMBLE operators

John Stone
Hello,

I'm checking if this is intentional or a bug in Apache Flink SQL (Flink 1.6.0).

I am using processing time with a RocksDB backend.  I have not checked if this issue is also occurring in the Table API.  I have not checked if this issue also exists for event time (although I suspect it does).

Consider the following two queries:

"SELECT foo, COUNT(bar)
FROM MyTable
WHERE  faz = 'xyz'
GROUP BY HOP(myTime, INTERVAL '1' SECOND, INTERVAL '5' SECOND), foo"

and

"SELECT foo, COUNT(bar)
FROM MyTable
WHERE  faz = 'xyz'
GROUP BY TUMBLE(myTime, INTERVAL '5' SECOND), foo"

I have found in my testing for both of the above that events received in the first 5 seconds are ignored.  In other words, the first window interval is effectively a black hole, and only events which are received starting after the first 5 seconds of the stream being "up" are processed.

Is this ignoring of events during the first interval a bug or intentional?

Many thanks,

John
Reply | Threaded
Open this post in threaded view
|

Re: Potential bug in Flink SQL HOP and TUMBLE operators

Fabian Hueske-2
Hi John,

Are you sure that the first rows of the first window are dropped?
When a query with processing time windows is terminated, the last window is not computed. This in fact intentional and does not apply to event-time windows.

Best, Fabian


2018-09-17 17:21 GMT+02:00 John Stone <[hidden email]>:
Hello,

I'm checking if this is intentional or a bug in Apache Flink SQL (Flink 1.6.0).

I am using processing time with a RocksDB backend.  I have not checked if this issue is also occurring in the Table API.  I have not checked if this issue also exists for event time (although I suspect it does).

Consider the following two queries:

"SELECT foo, COUNT(bar)
FROM MyTable
WHERE  faz = 'xyz'
GROUP BY HOP(myTime, INTERVAL '1' SECOND, INTERVAL '5' SECOND), foo"

and

"SELECT foo, COUNT(bar)
FROM MyTable
WHERE  faz = 'xyz'
GROUP BY TUMBLE(myTime, INTERVAL '5' SECOND), foo"

I have found in my testing for both of the above that events received in the first 5 seconds are ignored.  In other words, the first window interval is effectively a black hole, and only events which are received starting after the first 5 seconds of the stream being "up" are processed.

Is this ignoring of events during the first interval a bug or intentional?

Many thanks,

John

Reply | Threaded
Open this post in threaded view
|

Re: Potential bug in Flink SQL HOP and TUMBLE operators

Xingcan Cui
In reply to this post by John Stone
Hi John,

I’ve not dug into this yet, but IMO, it shouldn’t be the case. I just wonder how do you judge that the data in the first five seconds are not processed by the system?

Best,
Xingcan

> On Sep 17, 2018, at 11:21 PM, John Stone <[hidden email]> wrote:
>
> Hello,
>
> I'm checking if this is intentional or a bug in Apache Flink SQL (Flink 1.6.0).
>
> I am using processing time with a RocksDB backend.  I have not checked if this issue is also occurring in the Table API.  I have not checked if this issue also exists for event time (although I suspect it does).
>
> Consider the following two queries:
>
> "SELECT foo, COUNT(bar)
> FROM MyTable
> WHERE  faz = 'xyz'
> GROUP BY HOP(myTime, INTERVAL '1' SECOND, INTERVAL '5' SECOND), foo"
>
> and
>
> "SELECT foo, COUNT(bar)
> FROM MyTable
> WHERE  faz = 'xyz'
> GROUP BY TUMBLE(myTime, INTERVAL '5' SECOND), foo"
>
> I have found in my testing for both of the above that events received in the first 5 seconds are ignored.  In other words, the first window interval is effectively a black hole, and only events which are received starting after the first 5 seconds of the stream being "up" are processed.
>
> Is this ignoring of events during the first interval a bug or intentional?
>
> Many thanks,
>
> John

Reply | Threaded
Open this post in threaded view
|

Re: Potential bug in Flink SQL HOP and TUMBLE operators

John Stone
In reply to this post by John Stone
Yes, I am certain events are being ignored or dropped during the first five seconds.  Further investigation on my part reveals that the "ignore" period is exactly the first five seconds of the stream - regardless of the size of the window.

Situation

I have a script which pushes an event into Kafka once every second structured as:

{"userId": "[hidden email]", "timestamp": <timestamp from the producer>}

My stream uses this Kafka queue as its source.  JSON schema and table schema are as follows:

final Json jsonFormat = new Json()
        .failOnMissingField(false)
        .jsonSchema("{"
            + "  type: 'object',"
            + "  properties: {"
            + "    userId: { type: 'string' },"
            + "    timestamp: { type: 'integer' }"
            + "  }"
            + "}");

final Schema tableSchema = new Schema()
        .field("userId", Types.STRING())
        .field("timestamp", TypeInformation.of(BigDecimal.class))
        .field("proctime", Types.SQL_TIMESTAMP())
        .proctime();

StreamTableEnvironment is configured to be in append mode, and table source is named "MyEventTable".  The stream is using the following SQL query:

final String sql =
    " SELECT userId, `timestamp` "
        + " FROM MyEventTable "
        + " GROUP BY HOP(proctime, INTERVAL '1' SECOND, INTERVAL '10' SECOND), userId, `timestamp` ";
final Table resultTable = tableEnvironment.sqlQuery(sql);

Code which I'm using to verify that events are being dropped:

streamTableEnvironment.toAppendStream(sqlResultTable, Row.class)
    .map((MapFunction<Row, String>) row -> {
      final String userId = row.getField(0).toString();
      final BigDecimal timestamp = (BigDecimal) row.getField(1);

      return String.format(
          "(%s, %s)",
          userId, timestamp.toString()
      );
    })
    .print();


No events produced during the first five seconds following a cold start of Flink are ever printed to the console.  Any and all events produced after the first five seconds following a cold start of Flink are always printed to the console.  All processes are running on the same system.

This issue does not occur when using raw Flink (windowed or not) nor when using Flink CEP.  Again, have not tried Table API.

Reply | Threaded
Open this post in threaded view
|

Re: Potential bug in Flink SQL HOP and TUMBLE operators

Fabian Hueske-2
Hmm, that's interesting.
HOP and TUMBLE window aggregations are directly translated into their corresponding DataStream counterparts (Sliding, Tumble).
There should be no filtering of records.

I assume you tried a simple query like "SELECT * FROM MyEventTable" and received all expected data?

Fabian

2018-09-17 18:56 GMT+02:00 [hidden email] <[hidden email]>:
Yes, I am certain events are being ignored or dropped during the first five seconds.  Further investigation on my part reveals that the "ignore" period is exactly the first five seconds of the stream - regardless of the size of the window.

Situation

I have a script which pushes an event into Kafka once every second structured as:

{"userId": "[hidden email]", "timestamp": <timestamp from the producer>}

My stream uses this Kafka queue as its source.  JSON schema and table schema are as follows:

final Json jsonFormat = new Json()
        .failOnMissingField(false)
        .jsonSchema("{"
            + "  type: 'object',"
            + "  properties: {"
            + "    userId: { type: 'string' },"
            + "    timestamp: { type: 'integer' }"
            + "  }"
            + "}");

final Schema tableSchema = new Schema()
        .field("userId", Types.STRING())
        .field("timestamp", TypeInformation.of(BigDecimal.class))
        .field("proctime", Types.SQL_TIMESTAMP())
        .proctime();

StreamTableEnvironment is configured to be in append mode, and table source is named "MyEventTable".  The stream is using the following SQL query:

final String sql =
    " SELECT userId, `timestamp` "
        + " FROM MyEventTable "
        + " GROUP BY HOP(proctime, INTERVAL '1' SECOND, INTERVAL '10' SECOND), userId, `timestamp` ";
final Table resultTable = tableEnvironment.sqlQuery(sql);

Code which I'm using to verify that events are being dropped:

streamTableEnvironment.toAppendStream(sqlResultTable, Row.class)
    .map((MapFunction<Row, String>) row -> {
      final String userId = row.getField(0).toString();
      final BigDecimal timestamp = (BigDecimal) row.getField(1);

      return String.format(
          "(%s, %s)",
          userId, timestamp.toString()
      );
    })
    .print();


No events produced during the first five seconds following a cold start of Flink are ever printed to the console.  Any and all events produced after the first five seconds following a cold start of Flink are always printed to the console.  All processes are running on the same system.

This issue does not occur when using raw Flink (windowed or not) nor when using Flink CEP.  Again, have not tried Table API.


Reply | Threaded
Open this post in threaded view
|

Re: Potential bug in Flink SQL HOP and TUMBLE operators

Rong Rong
This is in fact a very strange behavior. 

To add to the discussion, when you mentioned: "raw Flink (windowed or not) nor when using Flink CEP", how were the comparisons being done? 
Also, were you able to get the results correct without the additional GROUP BY term of "foo" or "userId"?

--
Rong

On Mon, Sep 17, 2018 at 12:30 PM Fabian Hueske <[hidden email]> wrote:
Hmm, that's interesting.
HOP and TUMBLE window aggregations are directly translated into their corresponding DataStream counterparts (Sliding, Tumble).
There should be no filtering of records.

I assume you tried a simple query like "SELECT * FROM MyEventTable" and received all expected data?

Fabian

2018-09-17 18:56 GMT+02:00 [hidden email] <[hidden email]>:
Yes, I am certain events are being ignored or dropped during the first five seconds.  Further investigation on my part reveals that the "ignore" period is exactly the first five seconds of the stream - regardless of the size of the window.

Situation

I have a script which pushes an event into Kafka once every second structured as:

{"userId": "[hidden email]", "timestamp": <timestamp from the producer>}

My stream uses this Kafka queue as its source.  JSON schema and table schema are as follows:

final Json jsonFormat = new Json()
        .failOnMissingField(false)
        .jsonSchema("{"
            + "  type: 'object',"
            + "  properties: {"
            + "    userId: { type: 'string' },"
            + "    timestamp: { type: 'integer' }"
            + "  }"
            + "}");

final Schema tableSchema = new Schema()
        .field("userId", Types.STRING())
        .field("timestamp", TypeInformation.of(BigDecimal.class))
        .field("proctime", Types.SQL_TIMESTAMP())
        .proctime();

StreamTableEnvironment is configured to be in append mode, and table source is named "MyEventTable".  The stream is using the following SQL query:

final String sql =
    " SELECT userId, `timestamp` "
        + " FROM MyEventTable "
        + " GROUP BY HOP(proctime, INTERVAL '1' SECOND, INTERVAL '10' SECOND), userId, `timestamp` ";
final Table resultTable = tableEnvironment.sqlQuery(sql);

Code which I'm using to verify that events are being dropped:

streamTableEnvironment.toAppendStream(sqlResultTable, Row.class)
    .map((MapFunction<Row, String>) row -> {
      final String userId = row.getField(0).toString();
      final BigDecimal timestamp = (BigDecimal) row.getField(1);

      return String.format(
          "(%s, %s)",
          userId, timestamp.toString()
      );
    })
    .print();


No events produced during the first five seconds following a cold start of Flink are ever printed to the console.  Any and all events produced after the first five seconds following a cold start of Flink are always printed to the console.  All processes are running on the same system.

This issue does not occur when using raw Flink (windowed or not) nor when using Flink CEP.  Again, have not tried Table API.


Reply | Threaded
Open this post in threaded view
|

Re: Potential bug in Flink SQL HOP and TUMBLE operators

Xingcan Cui
Hi John,

I suppose that was caused by the groupBy field “timestamp”. You were actually grouping on two time fields simultaneously, the processing time and the time from your producer. As @Rong suggested, try removing the additional groupBy field “timestamp” and check the result again.

Best,
Xingcan

On Sep 18, 2018, at 6:50 AM, Rong Rong <[hidden email]> wrote:

This is in fact a very strange behavior. 

To add to the discussion, when you mentioned: "raw Flink (windowed or not) nor when using Flink CEP", how were the comparisons being done? 
Also, were you able to get the results correct without the additional GROUP BY term of "foo" or "userId"?

--
Rong

On Mon, Sep 17, 2018 at 12:30 PM Fabian Hueske <[hidden email]> wrote:
Hmm, that's interesting.
HOP and TUMBLE window aggregations are directly translated into their corresponding DataStream counterparts (Sliding, Tumble).
There should be no filtering of records.

I assume you tried a simple query like "SELECT * FROM MyEventTable" and received all expected data?

Fabian

2018-09-17 18:56 GMT+02:00 [hidden email] <[hidden email]>:
Yes, I am certain events are being ignored or dropped during the first five seconds.  Further investigation on my part reveals that the "ignore" period is exactly the first five seconds of the stream - regardless of the size of the window.

Situation

I have a script which pushes an event into Kafka once every second structured as:

{"userId": "[hidden email]", "timestamp": <timestamp from the producer>}

My stream uses this Kafka queue as its source.  JSON schema and table schema are as follows:

final Json jsonFormat = new Json()
        .failOnMissingField(false)
        .jsonSchema("{"
            + "  type: 'object',"
            + "  properties: {"
            + "    userId: { type: 'string' },"
            + "    timestamp: { type: 'integer' }"
            + "  }"
            + "}");

final Schema tableSchema = new Schema()
        .field("userId", Types.STRING())
        .field("timestamp", TypeInformation.of(BigDecimal.class))
        .field("proctime", Types.SQL_TIMESTAMP())
        .proctime();

StreamTableEnvironment is configured to be in append mode, and table source is named "MyEventTable".  The stream is using the following SQL query:

final String sql =
    " SELECT userId, `timestamp` "
        + " FROM MyEventTable "
        + " GROUP BY HOP(proctime, INTERVAL '1' SECOND, INTERVAL '10' SECOND), userId, `timestamp` ";
final Table resultTable = tableEnvironment.sqlQuery(sql);

Code which I'm using to verify that events are being dropped:

streamTableEnvironment.toAppendStream(sqlResultTable, Row.class)
    .map((MapFunction<Row, String>) row -> {
      final String userId = row.getField(0).toString();
      final BigDecimal timestamp = (BigDecimal) row.getField(1);

      return String.format(
          "(%s, %s)",
          userId, timestamp.toString()
      );
    })
    .print();


No events produced during the first five seconds following a cold start of Flink are ever printed to the console.  Any and all events produced after the first five seconds following a cold start of Flink are always printed to the console.  All processes are running on the same system.

This issue does not occur when using raw Flink (windowed or not) nor when using Flink CEP.  Again, have not tried Table API.



Reply | Threaded
Open this post in threaded view
|

Re: Potential bug in Flink SQL HOP and TUMBLE operators

John Stone
In reply to this post by John Stone
Thank you all for your assistance.  I believe I've found the root cause if the behavior I am seeing.

If I just use "SELECT * FROM MyEventTable" (Fabian's question), I find that events received in the first 3 seconds are ignored as opposed to the original 5.

What I'm seeing seems to suggest that the system is just not ready to start receiving events during that time, i.e. the underlying DataStream is still getting created, and that removing the GROUP BY reduces the overall size of the stream which in turn reduces the initialization time.

Many thanks,

John
Reply | Threaded
Open this post in threaded view
|

Re: Potential bug in Flink SQL HOP and TUMBLE operators

Fabian Hueske-2
Hi John,

Just to clarify, this missing data is due to the starting overhead and not due to a bug?

Best, Fabian

2018-09-18 15:35 GMT+02:00 John Stone <[hidden email]>:
Thank you all for your assistance.  I believe I've found the root cause if the behavior I am seeing.

If I just use "SELECT * FROM MyEventTable" (Fabian's question), I find that events received in the first 3 seconds are ignored as opposed to the original 5.

What I'm seeing seems to suggest that the system is just not ready to start receiving events during that time, i.e. the underlying DataStream is still getting created, and that removing the GROUP BY reduces the overall size of the stream which in turn reduces the initialization time.

Many thanks,

John

Reply | Threaded
Open this post in threaded view
|

Re: Potential bug in Flink SQL HOP and TUMBLE operators

John Stone
Fabian,

I believe so, yes.

Many thanks,

John