Flink sql case when problem

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink sql case when problem

纳兰清风
Hello,

    When I am using case when statement in flink sql, I got an error as follow:

org.apache.flink.table.planner.codegen.CodeGenException: Unable to find common type of GeneratedExpression(field$3,isNull$3,,INT,None) and ArrayBuffer(GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData) str$4),false,,CHAR(0) NOT NULL,Some()), GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData) str$5),false,,CHAR(1) NOT NULL,Some(0))).
at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.$anonfun$generateIn$2(ScalarOperatorGens.scala:354)
at scala.Option.orElse(Option.scala:289)
at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateIn(ScalarOperatorGens.scala:354)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:724)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:507)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$4(CalcCodeGenerator.scala:141)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
......

My SQL is 
create table source_tb (
a varchar,
b INTEGER
) with (
'connector' = 'kafka',
...
);

create table sink_tb (
a varchar,
c varchar
) with (
'connector' = 'console',
'format' = 'rich-json'
);

insert into sink_tb(a, c)
select a,
case
when b is null or b = '' or b = '0' then '1000-01-01 00:00:00'
else from_unixtime(cast(b as bigint))
end as c
from source_tb;
But it works well when I change the when statement to  b is null or b in ('', '0')

Does anyone have idea about this ?



 

Reply | Threaded
Open this post in threaded view
|

Re: Flink sql case when problem

Leonard Xu
Hi, houying

It looks like a bug when code generate the operator code, which Flink version are you using? 
Could you help create an JIRA ticket?


Best,
Leonard


在 2021年6月17日,19:48,纳兰清风 <[hidden email]> 写道:

Hello,

    When I am using case when statement in flink sql, I got an error as follow:

org.apache.flink.table.planner.codegen.CodeGenException: Unable to find common type of GeneratedExpression(field$3,isNull$3,,INT,None) and ArrayBuffer(GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData) str$4),false,,CHAR(0) NOT NULL,Some()), GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData) str$5),false,,CHAR(1) NOT NULL,Some(0))).
at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.$anonfun$generateIn$2(ScalarOperatorGens.scala:354)
at scala.Option.orElse(Option.scala:289)
at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateIn(ScalarOperatorGens.scala:354)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:724)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:507)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$4(CalcCodeGenerator.scala:141)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
......

My SQL is 
create table source_tb (
a varchar,
b INTEGER
) with (
'connector' = 'kafka',
...
);

create table sink_tb (
a varchar,
c varchar
) with (
'connector' = 'console',
'format' = 'rich-json'
);

insert into sink_tb(a, c)
select a,
case
when b is null or b = '' or b = '0' then '1000-01-01 00:00:00'
else from_unixtime(cast(b as bigint))
end as c
from source_tb;
But it works well when I change the when statement to  b is null or b in ('', '0')

Does anyone have idea about this ?



 

Reply | Threaded
Open this post in threaded view
|

Re:Re: Flink sql case when problem

纳兰清风
Hi Leonard Xu,
    
    The version is 1.13. Is it a bug? I noticed that the type of column `b` is integer, but I use it as varchar.
    What the expected action should it be ?


At 2021-06-17 20:11:24, "Leonard Xu" <[hidden email]> wrote:

Hi, houying

It looks like a bug when code generate the operator code, which Flink version are you using? 
Could you help create an JIRA ticket?


Best,
Leonard


在 2021年6月17日,19:48,纳兰清风 <[hidden email]> 写道:

Hello,

    When I am using case when statement in flink sql, I got an error as follow:

org.apache.flink.table.planner.codegen.CodeGenException: Unable to find common type of GeneratedExpression(field$3,isNull$3,,INT,None) and ArrayBuffer(GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData) str$4),false,,CHAR(0) NOT NULL,Some()), GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData) str$5),false,,CHAR(1) NOT NULL,Some(0))).
at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.$anonfun$generateIn$2(ScalarOperatorGens.scala:354)
at scala.Option.orElse(Option.scala:289)
at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateIn(ScalarOperatorGens.scala:354)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:724)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:507)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$4(CalcCodeGenerator.scala:141)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
......

My SQL is 
create table source_tb (
a varchar,
b INTEGER
) with (
'connector' = 'kafka',
...
);

create table sink_tb (
a varchar,
c varchar
) with (
'connector' = 'console',
'format' = 'rich-json'
);

insert into sink_tb(a, c)
select a,
case
when b is null or b = '' or b = '0' then '1000-01-01 00:00:00'
else from_unixtime(cast(b as bigint))
end as c
from source_tb;
But it works well when I change the when statement to  b is null or b in ('', '0')

Does anyone have idea about this ?



 



 

Reply | Threaded
Open this post in threaded view
|

Re: Flink sql case when problem

JING ZHANG
In reply to this post by Leonard Xu
Hi houying,
The root cause of `CodeGenException` is comparing Integer with Varchar (b is VARCHAR, '' and '0' are VARCHAR).
The Problem could be solved by updating type of b from INTEGER to VARCHAR.
Note, comparing INTEGER with VARCHAR may introduce other unexpected results. For example in your above demo, select * from source_tb where b = '1', the condition b = '1' would return false for records with b = 1

Then let's analyze why two queries in your examples leads to different results.
> Why `CodeGenException` would be thrown out when using b = '' or b = '0'
Calcite would generate a `SEARCHR` operator. When generating code for the expression, the left expression is b, resultType is IntType; right expressions are '' and '0', result types are CharType. So an `CodeGenException` would be thrown out.

But it works well when I change the when statement to b in ('','0')
Because Calcite would convert `IN` to `OR` when converting SQL to RelNode since we set the convert threshold to Integer.MAX_VALUE in Flink. 
After converting `IN` to `OR`, Calcite would first cast '1' and '0' to the type of b explicitly, then compare them to b.
Please note that If update the threshold of conversion from Integer.MAX_VALUE to 1, the query also throw exception when convert SQL to RelNode, like the following exceptions:

java.lang.ClassCastException: org.apache.calcite.util.NlsString cannot be cast to java.math.BigDecimal

at org.apache.calcite.sql2rel.SqlToRelConverter.convertLiteralInValuesList(SqlToRelConverter.java:1759)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertRowValues(SqlToRelConverter.java:1685)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryOrInList(SqlToRelConverter.java:1620)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertExists(SqlToRelConverter.java:1603)
at org.apache.calcite.sql2rel.SqlToRelConverter.substituteSubQuery(SqlToRelConverter.java:1170)
at org.apache.calcite.sql2rel.SqlToRelConverter.replaceSubQueries(SqlToRelConverter.java:1063)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4185)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:687)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1056)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1025)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:302)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:640)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:291)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainSql(TableEnvironmentImpl.java:678)

@Leonard
Do we need to add improvements in the Flink framework to solve the problem?
If yes, we could create a JIRA to track this, and discuss how to do improvement.
BTW, there is another related issue which needs discussion, select * from source_tb where b = '1', the condition b = '1' would return false for records with b = 1, the behavior is different with Postgres and Mysql.

Best regards,
JING ZHANG

Leonard Xu <[hidden email]> 于2021年6月17日周四 下午8:11写道:
Hi, houying

It looks like a bug when code generate the operator code, which Flink version are you using? 
Could you help create an JIRA ticket?


Best,
Leonard


在 2021年6月17日,19:48,纳兰清风 <[hidden email]> 写道:

Hello,

    When I am using case when statement in flink sql, I got an error as follow:

org.apache.flink.table.planner.codegen.CodeGenException: Unable to find common type of GeneratedExpression(field$3,isNull$3,,INT,None) and ArrayBuffer(GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData) str$4),false,,CHAR(0) NOT NULL,Some()), GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData) str$5),false,,CHAR(1) NOT NULL,Some(0))).
at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.$anonfun$generateIn$2(ScalarOperatorGens.scala:354)
at scala.Option.orElse(Option.scala:289)
at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateIn(ScalarOperatorGens.scala:354)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:724)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:507)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$4(CalcCodeGenerator.scala:141)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
......

My SQL is 
create table source_tb (
a varchar,
b INTEGER
) with (
'connector' = 'kafka',
...
);

create table sink_tb (
a varchar,
c varchar
) with (
'connector' = 'console',
'format' = 'rich-json'
);

insert into sink_tb(a, c)
select a,
case
when b is null or b = '' or b = '0' then '1000-01-01 00:00:00'
else from_unixtime(cast(b as bigint))
end as c
from source_tb;
But it works well when I change the when statement to  b is null or b in ('', '0')

Does anyone have idea about this ?



 

Reply | Threaded
Open this post in threaded view
|

回复: Flink sql case when problem

Jacky Yin 殷传旺
Hello Jing,

Regarding the convention(from 'IN' to 'OR') threshold, could you please kindly explain it with more details?  Is it the count of the items of the 'IN' clause? 


BR,
Jacky

发件人: JING ZHANG <[hidden email]>
发送时间: 2021年6月18日 15:19
收件人: Leonard Xu <[hidden email]>
抄送: 纳兰清风 <[hidden email]>; User-Flink <[hidden email]>
主题: Re: Flink sql case when problem
 
Hi houying,
The root cause of `CodeGenException` is comparing Integer with Varchar (b is VARCHAR, '' and '0' are VARCHAR).
The Problem could be solved by updating type of b from INTEGER to VARCHAR.
Note, comparing INTEGER with VARCHAR may introduce other unexpected results. For example in your above demo, select * from source_tb where b = '1', the condition b = '1' would return false for records with b = 1

Then let's analyze why two queries in your examples leads to different results.
> Why `CodeGenException` would be thrown out when using b = '' or b = '0'
Calcite would generate a `SEARCHR` operator. When generating code for the expression, the left expression is b, resultType is IntType; right expressions are '' and '0', result types are CharType. So an `CodeGenException` would be thrown out.

But it works well when I change the when statement to b in ('','0')
Because Calcite would convert `IN` to `OR` when converting SQL to RelNode since we set the convert threshold to Integer.MAX_VALUE in Flink. 
After converting `IN` to `OR`, Calcite would first cast '1' and '0' to the type of b explicitly, then compare them to b.
Please note that If update the threshold of conversion from Integer.MAX_VALUE to 1, the query also throw exception when convert SQL to RelNode, like the following exceptions:

java.lang.ClassCastException: org.apache.calcite.util.NlsString cannot be cast to java.math.BigDecimal

at org.apache.calcite.sql2rel.SqlToRelConverter.convertLiteralInValuesList(SqlToRelConverter.java:1759)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertRowValues(SqlToRelConverter.java:1685)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryOrInList(SqlToRelConverter.java:1620)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertExists(SqlToRelConverter.java:1603)
at org.apache.calcite.sql2rel.SqlToRelConverter.substituteSubQuery(SqlToRelConverter.java:1170)
at org.apache.calcite.sql2rel.SqlToRelConverter.replaceSubQueries(SqlToRelConverter.java:1063)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4185)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:687)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1056)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1025)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:302)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:640)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:291)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainSql(TableEnvironmentImpl.java:678)

@Leonard
Do we need to add improvements in the Flink framework to solve the problem?
If yes, we could create a JIRA to track this, and discuss how to do improvement.
BTW, there is another related issue which needs discussion, select * from source_tb where b = '1', the condition b = '1' would return false for records with b = 1, the behavior is different with Postgres and Mysql.

Best regards,
JING ZHANG

Leonard Xu <[hidden email]> 于2021年6月17日周四 下午8:11写道:
Hi, houying

It looks like a bug when code generate the operator code, which Flink version are you using? 
Could you help create an JIRA ticket?


Best,
Leonard


在 2021年6月17日,19:48,纳兰清风 <[hidden email]> 写道:

Hello,

    When I am using case when statement in flink sql, I got an error as follow:

org.apache.flink.table.planner.codegen.CodeGenException: Unable to find common type of GeneratedExpression(field$3,isNull$3,,INT,None) and ArrayBuffer(GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData) str$4),false,,CHAR(0) NOT NULL,Some()), GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData) str$5),false,,CHAR(1) NOT NULL,Some(0))).
at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.$anonfun$generateIn$2(ScalarOperatorGens.scala:354)
at scala.Option.orElse(Option.scala:289)
at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateIn(ScalarOperatorGens.scala:354)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:724)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:507)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$4(CalcCodeGenerator.scala:141)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
......

My SQL is 
create table source_tb (
a varchar,
b INTEGER
) with (
'connector' = 'kafka',
...
);

create table sink_tb (
a varchar,
c varchar
) with (
'connector' = 'console',
'format' = 'rich-json'
);

insert into sink_tb(a, c)
select a,
case
when b is null or b = '' or b = '0' then '1000-01-01 00:00:00'
else from_unixtime(cast(b as bigint))
end as c
from source_tb;
But it works well when I change the when statement to  b is null or b in ('', '0')

Does anyone have idea about this ?



 

Reply | Threaded
Open this post in threaded view
|

Re: Flink sql case when problem

JING ZHANG
Hi Jacky,
Yes, the threshold is the count of items of the 'IN' clause.
The threshold value is defined in `PlannerContext`#getSqlToRelConverterConfig (Please see the following picture). Now it is set to Integer.MAX_VALUE, so Calcite always converts `IN` to `OR` when convert Sql to RelNode. 
image.png
Best regards,
JING ZHANG

Jacky Yin 殷传旺 <[hidden email]> 于2021年6月18日周五 下午5:56写道:
Hello Jing,

Regarding the convention(from 'IN' to 'OR') threshold, could you please kindly explain it with more details?  Is it the count of the items of the 'IN' clause? 


BR,
Jacky

发件人: JING ZHANG <[hidden email]>
发送时间: 2021年6月18日 15:19
收件人: Leonard Xu <[hidden email]>
抄送: 纳兰清风 <[hidden email]>; User-Flink <[hidden email]>
主题: Re: Flink sql case when problem
 
Hi houying,
The root cause of `CodeGenException` is comparing Integer with Varchar (b is VARCHAR, '' and '0' are VARCHAR).
The Problem could be solved by updating type of b from INTEGER to VARCHAR.
Note, comparing INTEGER with VARCHAR may introduce other unexpected results. For example in your above demo, select * from source_tb where b = '1', the condition b = '1' would return false for records with b = 1

Then let's analyze why two queries in your examples leads to different results.
> Why `CodeGenException` would be thrown out when using b = '' or b = '0'
Calcite would generate a `SEARCHR` operator. When generating code for the expression, the left expression is b, resultType is IntType; right expressions are '' and '0', result types are CharType. So an `CodeGenException` would be thrown out.

But it works well when I change the when statement to b in ('','0')
Because Calcite would convert `IN` to `OR` when converting SQL to RelNode since we set the convert threshold to Integer.MAX_VALUE in Flink. 
After converting `IN` to `OR`, Calcite would first cast '1' and '0' to the type of b explicitly, then compare them to b.
Please note that If update the threshold of conversion from Integer.MAX_VALUE to 1, the query also throw exception when convert SQL to RelNode, like the following exceptions:

java.lang.ClassCastException: org.apache.calcite.util.NlsString cannot be cast to java.math.BigDecimal

at org.apache.calcite.sql2rel.SqlToRelConverter.convertLiteralInValuesList(SqlToRelConverter.java:1759)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertRowValues(SqlToRelConverter.java:1685)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryOrInList(SqlToRelConverter.java:1620)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertExists(SqlToRelConverter.java:1603)
at org.apache.calcite.sql2rel.SqlToRelConverter.substituteSubQuery(SqlToRelConverter.java:1170)
at org.apache.calcite.sql2rel.SqlToRelConverter.replaceSubQueries(SqlToRelConverter.java:1063)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4185)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:687)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1056)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1025)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:302)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:640)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:291)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainSql(TableEnvironmentImpl.java:678)

@Leonard
Do we need to add improvements in the Flink framework to solve the problem?
If yes, we could create a JIRA to track this, and discuss how to do improvement.
BTW, there is another related issue which needs discussion, select * from source_tb where b = '1', the condition b = '1' would return false for records with b = 1, the behavior is different with Postgres and Mysql.

Best regards,
JING ZHANG

Leonard Xu <[hidden email]> 于2021年6月17日周四 下午8:11写道:
Hi, houying

It looks like a bug when code generate the operator code, which Flink version are you using? 
Could you help create an JIRA ticket?


Best,
Leonard


在 2021年6月17日,19:48,纳兰清风 <[hidden email]> 写道:

Hello,

    When I am using case when statement in flink sql, I got an error as follow:

org.apache.flink.table.planner.codegen.CodeGenException: Unable to find common type of GeneratedExpression(field$3,isNull$3,,INT,None) and ArrayBuffer(GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData) str$4),false,,CHAR(0) NOT NULL,Some()), GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData) str$5),false,,CHAR(1) NOT NULL,Some(0))).
at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.$anonfun$generateIn$2(ScalarOperatorGens.scala:354)
at scala.Option.orElse(Option.scala:289)
at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateIn(ScalarOperatorGens.scala:354)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:724)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:507)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$4(CalcCodeGenerator.scala:141)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
......

My SQL is 
create table source_tb (
a varchar,
b INTEGER
) with (
'connector' = 'kafka',
...
);

create table sink_tb (
a varchar,
c varchar
) with (
'connector' = 'console',
'format' = 'rich-json'
);

insert into sink_tb(a, c)
select a,
case
when b is null or b = '' or b = '0' then '1000-01-01 00:00:00'
else from_unixtime(cast(b as bigint))
end as c
from source_tb;
But it works well when I change the when statement to  b is null or b in ('', '0')

Does anyone have idea about this ?