Does Flink SQL "in" operation has length limit?

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Does Flink SQL "in" operation has length limit?

徐涛
Hi,
    When I am executing the following SQL in flink 1.6.1, some error throws out saying that it has a support issue, but when I reduce the number of integers in the “in” sentence, for example,
    trackId in (124427150,71648998) , Flink does not complain anything, so I wonder is there any length limit in “in” operation?
Thanks a lot.

SELECT
trackId as id,track_title as description, count(*) as cnt
FROM
play
WHERE
appName='play.statistics.trace' and
trackId in (124427150,71648998,124493327,524043,27300837,30300481,27300809,124744768,45982512,124526566,124556427,124804208,74302264,119588973,30496269,27300288,124098818,125071530,120918746,124171456,30413034,124888075,125270551,125434224,27300195,45982342,45982468,45982355,65349883,124705962,65349905,124298305,124889583,45982338,20506255,18556415,122161128,27299018,122850375,124862362,45982336,59613202,122991190,124590280,124867563,45982332,124515944,20506257,122572115,92083574)
GROUP BY
HOP(started_at_ts, INTERVAL '5' SECOND, INTERVAL '5' MINUTE),trackId,track_title;


FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
  FlinkLogicalCalc(expr#0..3=[{inputs}], started_at_ts=[$t2], trackId=[$t0], track_title=[$t1])
    FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
      FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)], trackId=[$t1], track_title=[$t2], started_at_ts=[$t4], $condition=[$t6])
        FlinkLogicalNativeTableScan(table=[[play]])
      FlinkLogicalValues(tuples=[[{ 124427150 }, { 71648998 }, { 124493327 }, { 524043 }, { 27300837 }, { 30300481 }, { 27300809 }, { 124744768 }, { 45982512 }, { 124526566 }, { 124556427 }, { 124804208 }, { 74302264 }, { 119588973 }, { 30496269 }, { 27300288 }, { 124098818 }, { 125071530 }, { 120918746 }, { 124171456 }, { 30413034 }, { 124888075 }, { 125270551 }, { 125434224 }, { 27300195 }, { 45982342 }, { 45982468 }, { 45982355 }, { 65349883 }, { 124705962 }, { 65349905 }, { 124298305 }, { 124889583 }, { 45982338 }, { 20506255 }, { 18556415 }, { 122161128 }, { 27299018 }, { 122850375 }, { 124862362 }, { 45982336 }, { 59613202 }, { 122991190 }, { 124590280 }, { 124867563 }, { 45982332 }, { 124515944 }, { 20506257 }, { 122572115 }, { 92083574 }]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
at com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:141)
at com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:139)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.ximalaya.flink.dsl.application.FlinkApplication$.main(FlinkApplication.scala:139)
at com.ximalaya.flink.dsl.web.test.DslTestUtils$.executeDslFile(DslTestUtils.scala:69)
at com.ximalaya.flink.dsl.web.test.PlayCountTest$.main(PlayCountTest.scala:5)
at com.ximalaya.flink.dsl.web.test.PlayCountTest.main(PlayCountTest.scala)

Best 
Henry
Reply | Threaded
Open this post in threaded view
|

Re: Does Flink SQL "in" operation has length limit?

vino yang
Hi Henry,

Maybe the number of elements in your IN clause is out of range? Its default value is 20, you can modify it with this configuration item:

withInSubQueryThreshold(XXX)

This API comes from Calcite.

Thanks, vino.

徐涛 <[hidden email]> 于2018年9月28日周五 下午4:23写道:
Hi,
    When I am executing the following SQL in flink 1.6.1, some error throws out saying that it has a support issue, but when I reduce the number of integers in the “in” sentence, for example,
    trackId in (124427150,71648998) , Flink does not complain anything, so I wonder is there any length limit in “in” operation?
Thanks a lot.

SELECT
trackId as id,track_title as description, count(*) as cnt
FROM
play
WHERE
appName='play.statistics.trace' and
trackId in (124427150,71648998,124493327,524043,27300837,30300481,27300809,124744768,45982512,124526566,124556427,124804208,74302264,119588973,30496269,27300288,124098818,125071530,120918746,124171456,30413034,124888075,125270551,125434224,27300195,45982342,45982468,45982355,65349883,124705962,65349905,124298305,124889583,45982338,20506255,18556415,122161128,27299018,122850375,124862362,45982336,59613202,122991190,124590280,124867563,45982332,124515944,20506257,122572115,92083574)
GROUP BY
HOP(started_at_ts, INTERVAL '5' SECOND, INTERVAL '5' MINUTE),trackId,track_title;


FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
  FlinkLogicalCalc(expr#0..3=[{inputs}], started_at_ts=[$t2], trackId=[$t0], track_title=[$t1])
    FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
      FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)], trackId=[$t1], track_title=[$t2], started_at_ts=[$t4], $condition=[$t6])
        FlinkLogicalNativeTableScan(table=[[play]])
      FlinkLogicalValues(tuples=[[{ 124427150 }, { 71648998 }, { 124493327 }, { 524043 }, { 27300837 }, { 30300481 }, { 27300809 }, { 124744768 }, { 45982512 }, { 124526566 }, { 124556427 }, { 124804208 }, { 74302264 }, { 119588973 }, { 30496269 }, { 27300288 }, { 124098818 }, { 125071530 }, { 120918746 }, { 124171456 }, { 30413034 }, { 124888075 }, { 125270551 }, { 125434224 }, { 27300195 }, { 45982342 }, { 45982468 }, { 45982355 }, { 65349883 }, { 124705962 }, { 65349905 }, { 124298305 }, { 124889583 }, { 45982338 }, { 20506255 }, { 18556415 }, { 122161128 }, { 27299018 }, { 122850375 }, { 124862362 }, { 45982336 }, { 59613202 }, { 122991190 }, { 124590280 }, { 124867563 }, { 45982332 }, { 124515944 }, { 20506257 }, { 122572115 }, { 92083574 }]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
at com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:141)
at com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:139)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.ximalaya.flink.dsl.application.FlinkApplication$.main(FlinkApplication.scala:139)
at com.ximalaya.flink.dsl.web.test.DslTestUtils$.executeDslFile(DslTestUtils.scala:69)
at com.ximalaya.flink.dsl.web.test.PlayCountTest$.main(PlayCountTest.scala:5)
at com.ximalaya.flink.dsl.web.test.PlayCountTest.main(PlayCountTest.scala)

Best 
Henry
Reply | Threaded
Open this post in threaded view
|

Re: Does Flink SQL "in" operation has length limit?

Rong Rong
Hi Henry, Vino.

I think IN operator was translated into either a RexSubQuery or a SqlStdOperatorTable.IN operator.
I think Vino was referring to the first case.
For the second case (I think that's what you are facing here), they are converted into tuples and the maximum we currently have in Flink was Tuple25.java, I was wondering if that was the issue you are facing. You can probably split the IN into many IN combining with OR.

--
Rong

On Fri, Sep 28, 2018 at 2:33 AM vino yang <[hidden email]> wrote:
Hi Henry,

Maybe the number of elements in your IN clause is out of range? Its default value is 20, you can modify it with this configuration item:

withInSubQueryThreshold(XXX)

This API comes from Calcite.

Thanks, vino.

徐涛 <[hidden email]> 于2018年9月28日周五 下午4:23写道:
Hi,
    When I am executing the following SQL in flink 1.6.1, some error throws out saying that it has a support issue, but when I reduce the number of integers in the “in” sentence, for example,
    trackId in (124427150,71648998) , Flink does not complain anything, so I wonder is there any length limit in “in” operation?
Thanks a lot.

SELECT
trackId as id,track_title as description, count(*) as cnt
FROM
play
WHERE
appName='play.statistics.trace' and
trackId in (124427150,71648998,124493327,524043,27300837,30300481,27300809,124744768,45982512,124526566,124556427,124804208,74302264,119588973,30496269,27300288,124098818,125071530,120918746,124171456,30413034,124888075,125270551,125434224,27300195,45982342,45982468,45982355,65349883,124705962,65349905,124298305,124889583,45982338,20506255,18556415,122161128,27299018,122850375,124862362,45982336,59613202,122991190,124590280,124867563,45982332,124515944,20506257,122572115,92083574)
GROUP BY
HOP(started_at_ts, INTERVAL '5' SECOND, INTERVAL '5' MINUTE),trackId,track_title;


FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
  FlinkLogicalCalc(expr#0..3=[{inputs}], started_at_ts=[$t2], trackId=[$t0], track_title=[$t1])
    FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
      FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)], trackId=[$t1], track_title=[$t2], started_at_ts=[$t4], $condition=[$t6])
        FlinkLogicalNativeTableScan(table=[[play]])
      FlinkLogicalValues(tuples=[[{ 124427150 }, { 71648998 }, { 124493327 }, { 524043 }, { 27300837 }, { 30300481 }, { 27300809 }, { 124744768 }, { 45982512 }, { 124526566 }, { 124556427 }, { 124804208 }, { 74302264 }, { 119588973 }, { 30496269 }, { 27300288 }, { 124098818 }, { 125071530 }, { 120918746 }, { 124171456 }, { 30413034 }, { 124888075 }, { 125270551 }, { 125434224 }, { 27300195 }, { 45982342 }, { 45982468 }, { 45982355 }, { 65349883 }, { 124705962 }, { 65349905 }, { 124298305 }, { 124889583 }, { 45982338 }, { 20506255 }, { 18556415 }, { 122161128 }, { 27299018 }, { 122850375 }, { 124862362 }, { 45982336 }, { 59613202 }, { 122991190 }, { 124590280 }, { 124867563 }, { 45982332 }, { 124515944 }, { 20506257 }, { 122572115 }, { 92083574 }]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
at com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:141)
at com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:139)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.ximalaya.flink.dsl.application.FlinkApplication$.main(FlinkApplication.scala:139)
at com.ximalaya.flink.dsl.web.test.DslTestUtils$.executeDslFile(DslTestUtils.scala:69)
at com.ximalaya.flink.dsl.web.test.PlayCountTest$.main(PlayCountTest.scala:5)
at com.ximalaya.flink.dsl.web.test.PlayCountTest.main(PlayCountTest.scala)

Best 
Henry
Reply | Threaded
Open this post in threaded view
|

Re: Does Flink SQL "in" operation has length limit?

Hequn Cheng
Hi, 

I haven't look into the code. If this is limited by Tuple, would it better to implement it with Row? 

Best, Hequn

On Fri, Sep 28, 2018 at 9:27 PM Rong Rong <[hidden email]> wrote:
Hi Henry, Vino.

I think IN operator was translated into either a RexSubQuery or a SqlStdOperatorTable.IN operator.
I think Vino was referring to the first case.
For the second case (I think that's what you are facing here), they are converted into tuples and the maximum we currently have in Flink was Tuple25.java, I was wondering if that was the issue you are facing. You can probably split the IN into many IN combining with OR.

--
Rong

On Fri, Sep 28, 2018 at 2:33 AM vino yang <[hidden email]> wrote:
Hi Henry,

Maybe the number of elements in your IN clause is out of range? Its default value is 20, you can modify it with this configuration item:

withInSubQueryThreshold(XXX)

This API comes from Calcite.

Thanks, vino.

徐涛 <[hidden email]> 于2018年9月28日周五 下午4:23写道:
Hi,
    When I am executing the following SQL in flink 1.6.1, some error throws out saying that it has a support issue, but when I reduce the number of integers in the “in” sentence, for example,
    trackId in (124427150,71648998) , Flink does not complain anything, so I wonder is there any length limit in “in” operation?
Thanks a lot.

SELECT
trackId as id,track_title as description, count(*) as cnt
FROM
play
WHERE
appName='play.statistics.trace' and
trackId in (124427150,71648998,124493327,524043,27300837,30300481,27300809,124744768,45982512,124526566,124556427,124804208,74302264,119588973,30496269,27300288,124098818,125071530,120918746,124171456,30413034,124888075,125270551,125434224,27300195,45982342,45982468,45982355,65349883,124705962,65349905,124298305,124889583,45982338,20506255,18556415,122161128,27299018,122850375,124862362,45982336,59613202,122991190,124590280,124867563,45982332,124515944,20506257,122572115,92083574)
GROUP BY
HOP(started_at_ts, INTERVAL '5' SECOND, INTERVAL '5' MINUTE),trackId,track_title;


FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
  FlinkLogicalCalc(expr#0..3=[{inputs}], started_at_ts=[$t2], trackId=[$t0], track_title=[$t1])
    FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
      FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)], trackId=[$t1], track_title=[$t2], started_at_ts=[$t4], $condition=[$t6])
        FlinkLogicalNativeTableScan(table=[[play]])
      FlinkLogicalValues(tuples=[[{ 124427150 }, { 71648998 }, { 124493327 }, { 524043 }, { 27300837 }, { 30300481 }, { 27300809 }, { 124744768 }, { 45982512 }, { 124526566 }, { 124556427 }, { 124804208 }, { 74302264 }, { 119588973 }, { 30496269 }, { 27300288 }, { 124098818 }, { 125071530 }, { 120918746 }, { 124171456 }, { 30413034 }, { 124888075 }, { 125270551 }, { 125434224 }, { 27300195 }, { 45982342 }, { 45982468 }, { 45982355 }, { 65349883 }, { 124705962 }, { 65349905 }, { 124298305 }, { 124889583 }, { 45982338 }, { 20506255 }, { 18556415 }, { 122161128 }, { 27299018 }, { 122850375 }, { 124862362 }, { 45982336 }, { 59613202 }, { 122991190 }, { 124590280 }, { 124867563 }, { 45982332 }, { 124515944 }, { 20506257 }, { 122572115 }, { 92083574 }]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
at com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:141)
at com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:139)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.ximalaya.flink.dsl.application.FlinkApplication$.main(FlinkApplication.scala:139)
at com.ximalaya.flink.dsl.web.test.DslTestUtils$.executeDslFile(DslTestUtils.scala:69)
at com.ximalaya.flink.dsl.web.test.PlayCountTest$.main(PlayCountTest.scala:5)
at com.ximalaya.flink.dsl.web.test.PlayCountTest.main(PlayCountTest.scala)

Best 
Henry
Reply | Threaded
Open this post in threaded view
|

Re: Does Flink SQL "in" operation has length limit?

Rong Rong
Yes. 

Thanks for bringing this up Hequn! :-) I think Tuple would not be the best container to use. 

However, in search for alternative, shouldn't Collection / List be a more suitable solution? Row seems to not fit in the context (as there can be Rows with elements of different type).
I vaguely recall there was similar JIRA but might not be related to IN clause. Let me try to dig it up.

--
Rong

On Fri, Sep 28, 2018 at 9:32 AM Hequn Cheng <[hidden email]> wrote:
Hi, 

I haven't look into the code. If this is limited by Tuple, would it better to implement it with Row? 

Best, Hequn

On Fri, Sep 28, 2018 at 9:27 PM Rong Rong <[hidden email]> wrote:
Hi Henry, Vino.

I think IN operator was translated into either a RexSubQuery or a SqlStdOperatorTable.IN operator.
I think Vino was referring to the first case.
For the second case (I think that's what you are facing here), they are converted into tuples and the maximum we currently have in Flink was Tuple25.java, I was wondering if that was the issue you are facing. You can probably split the IN into many IN combining with OR.

--
Rong

On Fri, Sep 28, 2018 at 2:33 AM vino yang <[hidden email]> wrote:
Hi Henry,

Maybe the number of elements in your IN clause is out of range? Its default value is 20, you can modify it with this configuration item:

withInSubQueryThreshold(XXX)

This API comes from Calcite.

Thanks, vino.

徐涛 <[hidden email]> 于2018年9月28日周五 下午4:23写道:
Hi,
    When I am executing the following SQL in flink 1.6.1, some error throws out saying that it has a support issue, but when I reduce the number of integers in the “in” sentence, for example,
    trackId in (124427150,71648998) , Flink does not complain anything, so I wonder is there any length limit in “in” operation?
Thanks a lot.

SELECT
trackId as id,track_title as description, count(*) as cnt
FROM
play
WHERE
appName='play.statistics.trace' and
trackId in (124427150,71648998,124493327,524043,27300837,30300481,27300809,124744768,45982512,124526566,124556427,124804208,74302264,119588973,30496269,27300288,124098818,125071530,120918746,124171456,30413034,124888075,125270551,125434224,27300195,45982342,45982468,45982355,65349883,124705962,65349905,124298305,124889583,45982338,20506255,18556415,122161128,27299018,122850375,124862362,45982336,59613202,122991190,124590280,124867563,45982332,124515944,20506257,122572115,92083574)
GROUP BY
HOP(started_at_ts, INTERVAL '5' SECOND, INTERVAL '5' MINUTE),trackId,track_title;


FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
  FlinkLogicalCalc(expr#0..3=[{inputs}], started_at_ts=[$t2], trackId=[$t0], track_title=[$t1])
    FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
      FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)], trackId=[$t1], track_title=[$t2], started_at_ts=[$t4], $condition=[$t6])
        FlinkLogicalNativeTableScan(table=[[play]])
      FlinkLogicalValues(tuples=[[{ 124427150 }, { 71648998 }, { 124493327 }, { 524043 }, { 27300837 }, { 30300481 }, { 27300809 }, { 124744768 }, { 45982512 }, { 124526566 }, { 124556427 }, { 124804208 }, { 74302264 }, { 119588973 }, { 30496269 }, { 27300288 }, { 124098818 }, { 125071530 }, { 120918746 }, { 124171456 }, { 30413034 }, { 124888075 }, { 125270551 }, { 125434224 }, { 27300195 }, { 45982342 }, { 45982468 }, { 45982355 }, { 65349883 }, { 124705962 }, { 65349905 }, { 124298305 }, { 124889583 }, { 45982338 }, { 20506255 }, { 18556415 }, { 122161128 }, { 27299018 }, { 122850375 }, { 124862362 }, { 45982336 }, { 59613202 }, { 122991190 }, { 124590280 }, { 124867563 }, { 45982332 }, { 124515944 }, { 20506257 }, { 122572115 }, { 92083574 }]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
at com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:141)
at com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:139)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.ximalaya.flink.dsl.application.FlinkApplication$.main(FlinkApplication.scala:139)
at com.ximalaya.flink.dsl.web.test.DslTestUtils$.executeDslFile(DslTestUtils.scala:69)
at com.ximalaya.flink.dsl.web.test.PlayCountTest$.main(PlayCountTest.scala:5)
at com.ximalaya.flink.dsl.web.test.PlayCountTest.main(PlayCountTest.scala)

Best 
Henry
Reply | Threaded
Open this post in threaded view
|

Re: Does Flink SQL "in" operation has length limit?

Timo Walther
Hi,

tuple should not be used anywhere in flink-table. @Rong can you point us to the corresponding code? I haven't looked into the code but we should definitely support this query. @Henry feel free to open an issue for it.

Regards,
Timo


Am 28.09.18 um 19:14 schrieb Rong Rong:
Yes. 

Thanks for bringing this up Hequn! :-) I think Tuple would not be the best container to use. 

However, in search for alternative, shouldn't Collection / List be a more suitable solution? Row seems to not fit in the context (as there can be Rows with elements of different type).
I vaguely recall there was similar JIRA but might not be related to IN clause. Let me try to dig it up.

--
Rong

On Fri, Sep 28, 2018 at 9:32 AM Hequn Cheng <[hidden email]> wrote:
Hi, 

I haven't look into the code. If this is limited by Tuple, would it better to implement it with Row? 

Best, Hequn

On Fri, Sep 28, 2018 at 9:27 PM Rong Rong <[hidden email]> wrote:
Hi Henry, Vino.

I think IN operator was translated into either a RexSubQuery or a SqlStdOperatorTable.IN operator.
I think Vino was referring to the first case.
For the second case (I think that's what you are facing here), they are converted into tuples and the maximum we currently have in Flink was Tuple25.java, I was wondering if that was the issue you are facing. You can probably split the IN into many IN combining with OR.

--
Rong

On Fri, Sep 28, 2018 at 2:33 AM vino yang <[hidden email]> wrote:
Hi Henry,

Maybe the number of elements in your IN clause is out of range? Its default value is 20, you can modify it with this configuration item:

withInSubQueryThreshold(XXX)

This API comes from Calcite.

Thanks, vino.

徐涛 <[hidden email]> 于2018年9月28日周五 下午4:23写道:
Hi,
    When I am executing the following SQL in flink 1.6.1, some error throws out saying that it has a support issue, but when I reduce the number of integers in the “in” sentence, for example,
    trackId in (124427150,71648998) , Flink does not complain anything, so I wonder is there any length limit in “in” operation?
Thanks a lot.

SELECT
    trackId as id,track_title as description, count(*) as cnt
FROM
    play
WHERE
    appName='play.statistics.trace' and
    trackId in (124427150,71648998,124493327,524043,27300837,30300481,27300809,124744768,45982512,124526566,124556427,124804208,74302264,119588973,30496269,27300288,124098818,125071530,120918746,124171456,30413034,124888075,125270551,125434224,27300195,45982342,45982468,45982355,65349883,124705962,65349905,124298305,124889583,45982338,20506255,18556415,122161128,27299018,122850375,124862362,45982336,59613202,122991190,124590280,124867563,45982332,124515944,20506257,122572115,92083574)
GROUP BY
    HOP(started_at_ts, INTERVAL '5' SECOND, INTERVAL '5' MINUTE),trackId,track_title;


FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
  FlinkLogicalCalc(expr#0..3=[{inputs}], started_at_ts=[$t2], trackId=[$t0], track_title=[$t1])
    FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
      FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)], trackId=[$t1], track_title=[$t2], started_at_ts=[$t4], $condition=[$t6])
        FlinkLogicalNativeTableScan(table=[[play]])
      FlinkLogicalValues(tuples=[[{ 124427150 }, { 71648998 }, { 124493327 }, { 524043 }, { 27300837 }, { 30300481 }, { 27300809 }, { 124744768 }, { 45982512 }, { 124526566 }, { 124556427 }, { 124804208 }, { 74302264 }, { 119588973 }, { 30496269 }, { 27300288 }, { 124098818 }, { 125071530 }, { 120918746 }, { 124171456 }, { 30413034 }, { 124888075 }, { 125270551 }, { 125434224 }, { 27300195 }, { 45982342 }, { 45982468 }, { 45982355 }, { 65349883 }, { 124705962 }, { 65349905 }, { 124298305 }, { 124889583 }, { 45982338 }, { 20506255 }, { 18556415 }, { 122161128 }, { 27299018 }, { 122850375 }, { 124862362 }, { 45982336 }, { 59613202 }, { 122991190 }, { 124590280 }, { 124867563 }, { 45982332 }, { 124515944 }, { 20506257 }, { 122572115 }, { 92083574 }]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
at com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:141)
at com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:139)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.ximalaya.flink.dsl.application.FlinkApplication$.main(FlinkApplication.scala:139)
at com.ximalaya.flink.dsl.web.test.DslTestUtils$.executeDslFile(DslTestUtils.scala:69)
at com.ximalaya.flink.dsl.web.test.PlayCountTest$.main(PlayCountTest.scala:5)
at com.ximalaya.flink.dsl.web.test.PlayCountTest.main(PlayCountTest.scala)

Best 
Henry


Reply | Threaded
Open this post in threaded view
|

Re: Does Flink SQL "in" operation has length limit?

Fabian Hueske-2
Hi,

I had a look into the code. From what I saw, we are translating the values into Rows.
The problem here is that the IN clause is translated into a join and that the join results contains a time attribute field. This is a safety restriction to ensure that time attributes do not lose their watermark alignment because joins can return their results in random order. This should be related to or same as [1].

Anyway, we should not translate IN clauses to joins for incrementally evaluated queries (aka. streaming queries).
The main problem here is that the join materializes both inputs which is fine for the VALUES input but not for the "stream".
I created FLINK-10474 to fix the problem.

A workaround for the problem could be a user-defined scalar function that replaces the IN clause.

Best, Fabian


Am Mo., 1. Okt. 2018 um 10:01 Uhr schrieb Timo Walther <[hidden email]>:
Hi,

tuple should not be used anywhere in flink-table. @Rong can you point us to the corresponding code? I haven't looked into the code but we should definitely support this query. @Henry feel free to open an issue for it.

Regards,
Timo


Am 28.09.18 um 19:14 schrieb Rong Rong:
Yes. 

Thanks for bringing this up Hequn! :-) I think Tuple would not be the best container to use. 

However, in search for alternative, shouldn't Collection / List be a more suitable solution? Row seems to not fit in the context (as there can be Rows with elements of different type).
I vaguely recall there was similar JIRA but might not be related to IN clause. Let me try to dig it up.

--
Rong

On Fri, Sep 28, 2018 at 9:32 AM Hequn Cheng <[hidden email]> wrote:
Hi, 

I haven't look into the code. If this is limited by Tuple, would it better to implement it with Row? 

Best, Hequn

On Fri, Sep 28, 2018 at 9:27 PM Rong Rong <[hidden email]> wrote:
Hi Henry, Vino.

I think IN operator was translated into either a RexSubQuery or a SqlStdOperatorTable.IN operator.
I think Vino was referring to the first case.
For the second case (I think that's what you are facing here), they are converted into tuples and the maximum we currently have in Flink was Tuple25.java, I was wondering if that was the issue you are facing. You can probably split the IN into many IN combining with OR.

--
Rong

On Fri, Sep 28, 2018 at 2:33 AM vino yang <[hidden email]> wrote:
Hi Henry,

Maybe the number of elements in your IN clause is out of range? Its default value is 20, you can modify it with this configuration item:

withInSubQueryThreshold(XXX)

This API comes from Calcite.

Thanks, vino.

徐涛 <[hidden email]> 于2018年9月28日周五 下午4:23写道:
Hi,
    When I am executing the following SQL in flink 1.6.1, some error throws out saying that it has a support issue, but when I reduce the number of integers in the “in” sentence, for example,
    trackId in (124427150,71648998) , Flink does not complain anything, so I wonder is there any length limit in “in” operation?
Thanks a lot.

SELECT
    trackId as id,track_title as description, count(*) as cnt
FROM
    play
WHERE
    appName='play.statistics.trace' and
    trackId in (124427150,71648998,124493327,524043,27300837,30300481,27300809,124744768,45982512,124526566,124556427,124804208,74302264,119588973,30496269,27300288,124098818,125071530,120918746,124171456,30413034,124888075,125270551,125434224,27300195,45982342,45982468,45982355,65349883,124705962,65349905,124298305,124889583,45982338,20506255,18556415,122161128,27299018,122850375,124862362,45982336,59613202,122991190,124590280,124867563,45982332,124515944,20506257,122572115,92083574)
GROUP BY
    HOP(started_at_ts, INTERVAL '5' SECOND, INTERVAL '5' MINUTE),trackId,track_title;


FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
  FlinkLogicalCalc(expr#0..3=[{inputs}], started_at_ts=[$t2], trackId=[$t0], track_title=[$t1])
    FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
      FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)], trackId=[$t1], track_title=[$t2], started_at_ts=[$t4], $condition=[$t6])
        FlinkLogicalNativeTableScan(table=[[play]])
      FlinkLogicalValues(tuples=[[{ 124427150 }, { 71648998 }, { 124493327 }, { 524043 }, { 27300837 }, { 30300481 }, { 27300809 }, { 124744768 }, { 45982512 }, { 124526566 }, { 124556427 }, { 124804208 }, { 74302264 }, { 119588973 }, { 30496269 }, { 27300288 }, { 124098818 }, { 125071530 }, { 120918746 }, { 124171456 }, { 30413034 }, { 124888075 }, { 125270551 }, { 125434224 }, { 27300195 }, { 45982342 }, { 45982468 }, { 45982355 }, { 65349883 }, { 124705962 }, { 65349905 }, { 124298305 }, { 124889583 }, { 45982338 }, { 20506255 }, { 18556415 }, { 122161128 }, { 27299018 }, { 122850375 }, { 124862362 }, { 45982336 }, { 59613202 }, { 122991190 }, { 124590280 }, { 124867563 }, { 45982332 }, { 124515944 }, { 20506257 }, { 122572115 }, { 92083574 }]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
at com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:141)
at com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:139)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.ximalaya.flink.dsl.application.FlinkApplication$.main(FlinkApplication.scala:139)
at com.ximalaya.flink.dsl.web.test.DslTestUtils$.executeDslFile(DslTestUtils.scala:69)
at com.ximalaya.flink.dsl.web.test.PlayCountTest$.main(PlayCountTest.scala:5)
at com.ximalaya.flink.dsl.web.test.PlayCountTest.main(PlayCountTest.scala)

Best 
Henry


Reply | Threaded
Open this post in threaded view
|

Re: Does Flink SQL "in" operation has length limit?

Rong Rong
Hi Fabian. Yes I think that was what I missed. I haven't looked into the code but just inferring from the translated plan pasted by Henry. 

Let me try to take a look and put in a fix for this.

Thanks,
Rong

On Mon, Oct 1, 2018, 7:28 AM Fabian Hueske <[hidden email]> wrote:
Hi,

I had a look into the code. From what I saw, we are translating the values into Rows.
The problem here is that the IN clause is translated into a join and that the join results contains a time attribute field. This is a safety restriction to ensure that time attributes do not lose their watermark alignment because joins can return their results in random order. This should be related to or same as [1].

Anyway, we should not translate IN clauses to joins for incrementally evaluated queries (aka. streaming queries).
The main problem here is that the join materializes both inputs which is fine for the VALUES input but not for the "stream".
I created FLINK-10474 to fix the problem.

A workaround for the problem could be a user-defined scalar function that replaces the IN clause.

Best, Fabian


Am Mo., 1. Okt. 2018 um 10:01 Uhr schrieb Timo Walther <[hidden email]>:
Hi,

tuple should not be used anywhere in flink-table. @Rong can you point us to the corresponding code? I haven't looked into the code but we should definitely support this query. @Henry feel free to open an issue for it.

Regards,
Timo


Am 28.09.18 um 19:14 schrieb Rong Rong:
Yes. 

Thanks for bringing this up Hequn! :-) I think Tuple would not be the best container to use. 

However, in search for alternative, shouldn't Collection / List be a more suitable solution? Row seems to not fit in the context (as there can be Rows with elements of different type).
I vaguely recall there was similar JIRA but might not be related to IN clause. Let me try to dig it up.

--
Rong

On Fri, Sep 28, 2018 at 9:32 AM Hequn Cheng <[hidden email]> wrote:
Hi, 

I haven't look into the code. If this is limited by Tuple, would it better to implement it with Row? 

Best, Hequn

On Fri, Sep 28, 2018 at 9:27 PM Rong Rong <[hidden email]> wrote:
Hi Henry, Vino.

I think IN operator was translated into either a RexSubQuery or a SqlStdOperatorTable.IN operator.
I think Vino was referring to the first case.
For the second case (I think that's what you are facing here), they are converted into tuples and the maximum we currently have in Flink was Tuple25.java, I was wondering if that was the issue you are facing. You can probably split the IN into many IN combining with OR.

--
Rong

On Fri, Sep 28, 2018 at 2:33 AM vino yang <[hidden email]> wrote:
Hi Henry,

Maybe the number of elements in your IN clause is out of range? Its default value is 20, you can modify it with this configuration item:

withInSubQueryThreshold(XXX)

This API comes from Calcite.

Thanks, vino.

徐涛 <[hidden email]> 于2018年9月28日周五 下午4:23写道:
Hi,
    When I am executing the following SQL in flink 1.6.1, some error throws out saying that it has a support issue, but when I reduce the number of integers in the “in” sentence, for example,
    trackId in (124427150,71648998) , Flink does not complain anything, so I wonder is there any length limit in “in” operation?
Thanks a lot.

SELECT
    trackId as id,track_title as description, count(*) as cnt
FROM
    play
WHERE
    appName='play.statistics.trace' and
    trackId in (124427150,71648998,124493327,524043,27300837,30300481,27300809,124744768,45982512,124526566,124556427,124804208,74302264,119588973,30496269,27300288,124098818,125071530,120918746,124171456,30413034,124888075,125270551,125434224,27300195,45982342,45982468,45982355,65349883,124705962,65349905,124298305,124889583,45982338,20506255,18556415,122161128,27299018,122850375,124862362,45982336,59613202,122991190,124590280,124867563,45982332,124515944,20506257,122572115,92083574)
GROUP BY
    HOP(started_at_ts, INTERVAL '5' SECOND, INTERVAL '5' MINUTE),trackId,track_title;


FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
  FlinkLogicalCalc(expr#0..3=[{inputs}], started_at_ts=[$t2], trackId=[$t0], track_title=[$t1])
    FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
      FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)], trackId=[$t1], track_title=[$t2], started_at_ts=[$t4], $condition=[$t6])
        FlinkLogicalNativeTableScan(table=[[play]])
      FlinkLogicalValues(tuples=[[{ 124427150 }, { 71648998 }, { 124493327 }, { 524043 }, { 27300837 }, { 30300481 }, { 27300809 }, { 124744768 }, { 45982512 }, { 124526566 }, { 124556427 }, { 124804208 }, { 74302264 }, { 119588973 }, { 30496269 }, { 27300288 }, { 124098818 }, { 125071530 }, { 120918746 }, { 124171456 }, { 30413034 }, { 124888075 }, { 125270551 }, { 125434224 }, { 27300195 }, { 45982342 }, { 45982468 }, { 45982355 }, { 65349883 }, { 124705962 }, { 65349905 }, { 124298305 }, { 124889583 }, { 45982338 }, { 20506255 }, { 18556415 }, { 122161128 }, { 27299018 }, { 122850375 }, { 124862362 }, { 45982336 }, { 59613202 }, { 122991190 }, { 124590280 }, { 124867563 }, { 45982332 }, { 124515944 }, { 20506257 }, { 122572115 }, { 92083574 }]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
at com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:141)
at com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:139)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.ximalaya.flink.dsl.application.FlinkApplication$.main(FlinkApplication.scala:139)
at com.ximalaya.flink.dsl.web.test.DslTestUtils$.executeDslFile(DslTestUtils.scala:69)
at com.ximalaya.flink.dsl.web.test.PlayCountTest$.main(PlayCountTest.scala:5)
at com.ximalaya.flink.dsl.web.test.PlayCountTest.main(PlayCountTest.scala)

Best 
Henry