question about concating an array in flink sql

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

question about concating an array in flink sql

vtygoss


Hi,


I have below use case


I want concat an array<string> using comma separator, but got exception “Cannot apply 'CONCAT_WS' to arguments of type 'CONCAT_WS(<CHAR(1)>, <VARCHAR(2147483647) ARRAY>)'. Supported form(s): 'CONCAT_WS(<STRING>)”。


How to concat an array in flink sql?  please help to offer some advices.


Regards 



```

[test case code]


val senv = StreamExecutionEnvironment.getExecutionEnvironment
val tenv = StreamTableEnvironment.create(senv)
import org.apache.flink.api.scala._
val dStream = senv.fromCollection(Array((1,Array("1a","1b","1c")),(2,Array("2d","2e","2f"))))
import org.apache.flink.table.api._
val table = tenv.fromDataStream(dStream, $("id"),$("names"))
tenv.createTemporaryView("test",table)
tenv.executeSql("select id, concat_ws(',',names) from test").print()

```



Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 12 to line 1, column 31: Cannot apply 'CONCAT_WS' to arguments of type 'CONCAT_WS(<CHAR(1)>, <VARCHAR(2147483647) ARRAY>)'. Supported form(s): 'CONCAT_WS(<STRING>)'

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)

at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:193)

at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:659)

at com.test.TestPk$.main(TestPk.scala:25)

at com.test.TestPk.main(TestPk.scala)

Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 12 to line 1, column 31: Cannot apply 'CONCAT_WS' to arguments of type 'CONCAT_WS(<CHAR(1)>, <VARCHAR(2147483647) ARRAY>)'. Supported form(s): 'CONCAT_WS(<STRING>)'

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)

at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043)

at org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389)

at org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:262)

at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)

at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)



Reply | Threaded
Open this post in threaded view
|

Re: question about concating an array in flink sql

JING ZHANG
Hi vtygoss,
If the length of names is fixed, please try this 'select id, concat_ws(',',names[1],names[2],names[3]) from test', and note begin with 1 instead of 0.
Else maybe you need to define a custom UDF which receives two arguments, first is string as separator, second is a string array as contents. Please check[1] for details.


Best regards,
JING ZHANG


vtygoss <[hidden email]> 于2021年6月10日周四 下午5:25写道:


Hi,


I have below use case


I want concat an array<string> using comma separator, but got exception “Cannot apply 'CONCAT_WS' to arguments of type 'CONCAT_WS(<CHAR(1)>, <VARCHAR(2147483647) ARRAY>)'. Supported form(s): 'CONCAT_WS(<STRING>)”。


How to concat an array in flink sql?  please help to offer some advices.


Regards 



```

[test case code]


val senv = StreamExecutionEnvironment.getExecutionEnvironment
val tenv = StreamTableEnvironment.create(senv)
import org.apache.flink.api.scala._
val dStream = senv.fromCollection(Array((1,Array("1a","1b","1c")),(2,Array("2d","2e","2f"))))
import org.apache.flink.table.api._
val table = tenv.fromDataStream(dStream, $("id"),$("names"))
tenv.createTemporaryView("test",table)
tenv.executeSql("select id, concat_ws(',',names) from test").print()

```



Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 12 to line 1, column 31: Cannot apply 'CONCAT_WS' to arguments of type 'CONCAT_WS(<CHAR(1)>, <VARCHAR(2147483647) ARRAY>)'. Supported form(s): 'CONCAT_WS(<STRING>)'

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)

at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:193)

at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:659)

at com.test.TestPk$.main(TestPk.scala:25)

at com.test.TestPk.main(TestPk.scala)

Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 12 to line 1, column 31: Cannot apply 'CONCAT_WS' to arguments of type 'CONCAT_WS(<CHAR(1)>, <VARCHAR(2147483647) ARRAY>)'. Supported form(s): 'CONCAT_WS(<STRING>)'

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)

at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043)

at org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389)

at org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:262)

at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)

at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)