Questions about UDTF in flink SQL

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Questions about UDTF in flink SQL

wangsan
Hi all,

When using user-defined table function in Flink SQL, it seems that the result type of a table function must be determinstic.

If I want a UDTF whose result type is determined by its input parameters, what should I do?

What I want to do is like this:

```
SELECT input, f1, f2 length FROM MyTable, LATERAL TABLE(unnest_udtf(input, v1, v2)) as T(f1, f2), LATERAL TABLE(unnest_udtf(input, v3, v4, v5)) as T(f3, f4, f5)
```

I can surely register the same UDTF with different name and configuration, but I guess that’s not a good idea :(.

If we can not make this in Flink SQL for now , may be we should consider this feature in future?

Best,
wangsan
Reply | Threaded
Open this post in threaded view
|

Re: Questions about UDTF in flink SQL

Timo Walther
Hi Wangsan,

currently, UDFs have very strict result type assumptions. This is
necessary to determine the serializers for the cluster. There were
multiple requests for more flexible handling of types in UDFs.

Please have a look at:
- [FLINK-7358] Add implicitly converts support for User-defined function
- [FLINK-9294] [table] Improve type inference for UDFs with composite
parameter and/or result type
- [FLINK-10958] [table] Add overload support for user defined function

I you think those issues do not represent what you need. You can open a
new issue with a little example of what feature you think is missing.

Regards,
Timo


Am 28.11.18 um 09:59 schrieb wangsan:

> Hi all,
>
> When using user-defined table function in Flink SQL, it seems that the result type of a table function must be determinstic.
>
> If I want a UDTF whose result type is determined by its input parameters, what should I do?
>
> What I want to do is like this:
>
> ```
> SELECT input, f1, f2 length FROM MyTable, LATERAL TABLE(unnest_udtf(input, v1, v2)) as T(f1, f2), LATERAL TABLE(unnest_udtf(input, v3, v4, v5)) as T(f3, f4, f5)
> ```
>
> I can surely register the same UDTF with different name and configuration, but I guess that’s not a good idea :(.
>
> If we can not make this in Flink SQL for now , may be we should consider this feature in future?
>
> Best,
> wangsan


Reply | Threaded
Open this post in threaded view
|

Re: Questions about UDTF in flink SQL

Jark Wu-3
Hi Wangsan,

If I understand correctly, you want the return type of UDTF is determined by the actual arguments, not a fixed result type. For example:

udtf("int, string, long", inputField)    returns  a composite type with [f0: INT, f1: VARCHAR, f2: BIGINT]
udtf("int", inputField)    returns  an atomic type with [f0: INT]

This is an interesting and useful feature IMO. But it maybe need some modification for the current API of TableFunction to
provide an additional `TypeInformation[T] getResultType(Object[] arguments, Class[] argTypes)` interface. Which means need 
more discussion in the community.

But you can create an issue if this is what you want and we can discuss how to support it.

Best,
Jark



On Thu, 29 Nov 2018 at 19:14, Timo Walther <[hidden email]> wrote:
Hi Wangsan,

currently, UDFs have very strict result type assumptions. This is
necessary to determine the serializers for the cluster. There were
multiple requests for more flexible handling of types in UDFs.

Please have a look at:
- [FLINK-7358] Add implicitly converts support for User-defined function
- [FLINK-9294] [table] Improve type inference for UDFs with composite
parameter and/or result type
- [FLINK-10958] [table] Add overload support for user defined function

I you think those issues do not represent what you need. You can open a
new issue with a little example of what feature you think is missing.

Regards,
Timo


Am 28.11.18 um 09:59 schrieb wangsan:
> Hi all,
>
> When using user-defined table function in Flink SQL, it seems that the result type of a table function must be determinstic.
>
> If I want a UDTF whose result type is determined by its input parameters, what should I do?
>
> What I want to do is like this:
>
> ```
> SELECT input, f1, f2 length FROM MyTable, LATERAL TABLE(unnest_udtf(input, v1, v2)) as T(f1, f2), LATERAL TABLE(unnest_udtf(input, v3, v4, v5)) as T(f3, f4, f5)
> ```
>
> I can surely register the same UDTF with different name and configuration, but I guess that’s not a good idea :(.
>
> If we can not make this in Flink SQL for now , may be we should consider this feature in future?
>
> Best,
> wangsan


Reply | Threaded
Open this post in threaded view
|

Re: Questions about UDTF in flink SQL

Rong Rong
Hi Wangsan,

If your require is essentially wha Jark describe, we already have a proposal following up [FLINK-9249] in its related/parent task: [FLINK-9484]. We are already implementing some of these internally and have one PR ready for review for FLINK-9294.

Please kindly take a look and see if there's any additional features you would like to comment and suggest.

Thanks,
Rong

On Fri, Nov 30, 2018 at 1:54 AM Jark Wu <[hidden email]> wrote:
Hi Wangsan,

If I understand correctly, you want the return type of UDTF is determined by the actual arguments, not a fixed result type. For example:

udtf("int, string, long", inputField)    returns  a composite type with [f0: INT, f1: VARCHAR, f2: BIGINT]
udtf("int", inputField)    returns  an atomic type with [f0: INT]

This is an interesting and useful feature IMO. But it maybe need some modification for the current API of TableFunction to
provide an additional `TypeInformation[T] getResultType(Object[] arguments, Class[] argTypes)` interface. Which means need 
more discussion in the community.

But you can create an issue if this is what you want and we can discuss how to support it.

Best,
Jark



On Thu, 29 Nov 2018 at 19:14, Timo Walther <[hidden email]> wrote:
Hi Wangsan,

currently, UDFs have very strict result type assumptions. This is
necessary to determine the serializers for the cluster. There were
multiple requests for more flexible handling of types in UDFs.

Please have a look at:
- [FLINK-7358] Add implicitly converts support for User-defined function
- [FLINK-9294] [table] Improve type inference for UDFs with composite
parameter and/or result type
- [FLINK-10958] [table] Add overload support for user defined function

I you think those issues do not represent what you need. You can open a
new issue with a little example of what feature you think is missing.

Regards,
Timo


Am 28.11.18 um 09:59 schrieb wangsan:
> Hi all,
>
> When using user-defined table function in Flink SQL, it seems that the result type of a table function must be determinstic.
>
> If I want a UDTF whose result type is determined by its input parameters, what should I do?
>
> What I want to do is like this:
>
> ```
> SELECT input, f1, f2 length FROM MyTable, LATERAL TABLE(unnest_udtf(input, v1, v2)) as T(f1, f2), LATERAL TABLE(unnest_udtf(input, v3, v4, v5)) as T(f3, f4, f5)
> ```
>
> I can surely register the same UDTF with different name and configuration, but I guess that’s not a good idea :(.
>
> If we can not make this in Flink SQL for now , may be we should consider this feature in future?
>
> Best,
> wangsan


Reply | Threaded
Open this post in threaded view
|

Re: Questions about UDTF in flink SQL

wangsan
Hi Rong,

Yes, what Jark described is exactly whet I need. Currently we have a work around for this problem, by using a UDF whose result type is a Map. I will took a look on your proposals and PR. 

Thanks for your help and suggestions.

Best,
Wangsan


On Dec 1, 2018, at 7:30 AM, Rong Rong <[hidden email]> wrote:

Hi Wangsan,

If your require is essentially wha Jark describe, we already have a proposal following up [FLINK-9249] in its related/parent task: [FLINK-9484]. We are already implementing some of these internally and have one PR ready for review for FLINK-9294.

Please kindly take a look and see if there's any additional features you would like to comment and suggest.

Thanks,
Rong

On Fri, Nov 30, 2018 at 1:54 AM Jark Wu <[hidden email]> wrote:
Hi Wangsan,

If I understand correctly, you want the return type of UDTF is determined by the actual arguments, not a fixed result type. For example:

udtf("int, string, long", inputField)    returns  a composite type with [f0: INT, f1: VARCHAR, f2: BIGINT]
udtf("int", inputField)    returns  an atomic type with [f0: INT]

This is an interesting and useful feature IMO. But it maybe need some modification for the current API of TableFunction to
provide an additional `TypeInformation[T] getResultType(Object[] arguments, Class[] argTypes)` interface. Which means need 
more discussion in the community.

But you can create an issue if this is what you want and we can discuss how to support it.

Best,
Jark



On Thu, 29 Nov 2018 at 19:14, Timo Walther <[hidden email]> wrote:
Hi Wangsan,

currently, UDFs have very strict result type assumptions. This is
necessary to determine the serializers for the cluster. There were
multiple requests for more flexible handling of types in UDFs.

Please have a look at:
- [FLINK-7358] Add implicitly converts support for User-defined function
- [FLINK-9294] [table] Improve type inference for UDFs with composite
parameter and/or result type
- [FLINK-10958] [table] Add overload support for user defined function

I you think those issues do not represent what you need. You can open a
new issue with a little example of what feature you think is missing.

Regards,
Timo


Am 28.11.18 um 09:59 schrieb wangsan:
> Hi all,
>
> When using user-defined table function in Flink SQL, it seems that the result type of a table function must be determinstic.
>
> If I want a UDTF whose result type is determined by its input parameters, what should I do?
>
> What I want to do is like this:
>
> ```
> SELECT input, f1, f2 length FROM MyTable, LATERAL TABLE(unnest_udtf(input, v1, v2)) as T(f1, f2), LATERAL TABLE(unnest_udtf(input, v3, v4, v5)) as T(f3, f4, f5)
> ```
>
> I can surely register the same UDTF with different name and configuration, but I guess that’s not a good idea :(.
>
> If we can not make this in Flink SQL for now , may be we should consider this feature in future?
>
> Best,
> wangsan