Using key.fields in 1.12

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Using key.fields in 1.12

Aeden Jameson
I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
feature of the Kafa SQL Connector. My current connector is configured
as ,

connector.type    = 'kafka'
connector.version = 'universal'
connector.topic   = 'my-topic'
connector.properties.group.id = 'my-consumer-group'
connector.properties.bootstrap.servers = '...'
format.type = 'avro'
format.avro-schema = '....'

I tried adding

key.fields = 'my_key_field'

as well as

key.format = 'avro'
key.fields = 'my_key_field'

but I get the exception

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
Unsupported property keys:
key.fields
key.format

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
        at org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
        at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
        at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
        at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
        at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
        ... 21 more

I have validated that the uber jar clearly contains the 1.12
dependencies. What is that magic combination of properties to get
key.fields to work? Or is it not supported with avro?

--
Thank You,
Aeden
Reply | Threaded
Open this post in threaded view
|

Re: Using key.fields in 1.12

Piotr Nowojski-4
Hey,

have you added Kafka connector as the dependency? [1]


Best,
Piotrek

śr., 6 sty 2021 o 04:37 Aeden Jameson <[hidden email]> napisał(a):
I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
feature of the Kafa SQL Connector. My current connector is configured
as ,

connector.type    = 'kafka'
connector.version = 'universal'
connector.topic   = 'my-topic'
connector.properties.group.id = 'my-consumer-group'
connector.properties.bootstrap.servers = '...'
format.type = 'avro'
format.avro-schema = '....'

I tried adding

key.fields = 'my_key_field'

as well as

key.format = 'avro'
key.fields = 'my_key_field'

but I get the exception

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
Unsupported property keys:
key.fields
key.format

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
        at org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
        at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
        at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
        at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
        at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
        ... 21 more

I have validated that the uber jar clearly contains the 1.12
dependencies. What is that magic combination of properties to get
key.fields to work? Or is it not supported with avro?

--
Thank You,
Aeden
Reply | Threaded
Open this post in threaded view
|

Re: Using key.fields in 1.12

Aeden Jameson
Yes, I do have that dependency. I see it in the dependency view of
intellij and directly. in the uber jar. Thanks for responding.

- Aeden

On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski <[hidden email]> wrote:

>
> Hey,
>
> have you added Kafka connector as the dependency? [1]
>
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies
>
> Best,
> Piotrek
>
> śr., 6 sty 2021 o 04:37 Aeden Jameson <[hidden email]> napisał(a):
>>
>> I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
>> feature of the Kafa SQL Connector. My current connector is configured
>> as ,
>>
>> connector.type    = 'kafka'
>> connector.version = 'universal'
>> connector.topic   = 'my-topic'
>> connector.properties.group.id = 'my-consumer-group'
>> connector.properties.bootstrap.servers = '...'
>> format.type = 'avro'
>> format.avro-schema = '....'
>>
>> I tried adding
>>
>> key.fields = 'my_key_field'
>>
>> as well as
>>
>> key.format = 'avro'
>> key.fields = 'my_key_field'
>>
>> but I get the exception
>>
>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>> Could not find a suitable table factory for
>> 'org.apache.flink.table.factories.TableSourceFactory' in
>> the classpath.
>>
>> Reason: No factory supports all properties.
>>
>> The matching candidates:
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>> Unsupported property keys:
>> key.fields
>> key.format
>>
>> The following factories have been considered:
>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>         at org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
>>         at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
>>         at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>>         at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>>         at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
>>         ... 21 more
>>
>> I have validated that the uber jar clearly contains the 1.12
>> dependencies. What is that magic combination of properties to get
>> key.fields to work? Or is it not supported with avro?
>>
>> --
>> Thank You,
>> Aeden



--
Cheers,
Aeden

GitHub: https://github.com/aedenj
Linked In: http://www.linkedin.com/in/aedenjameson
Blah Blah Blah: http://www.twitter.com/daliful
Reply | Threaded
Open this post in threaded view
|

Re: Using key.fields in 1.12

Timo Walther
Hi Aeden,

we updated the connector property design in 1.11 [1]. The old
translation layer exists for backwards compatibility and is indicated by
`connector.type=kafka`.

However, `connector = kafka` indicates the new property design and
`key.fields` is only available there. Please check all properties again
when upgrading, they are mentioned here [2].

Regards,
Timo


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/


On 06.01.21 18:35, Aeden Jameson wrote:

> Yes, I do have that dependency. I see it in the dependency view of
> intellij and directly. in the uber jar. Thanks for responding.
>
> - Aeden
>
> On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski <[hidden email]> wrote:
>>
>> Hey,
>>
>> have you added Kafka connector as the dependency? [1]
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies
>>
>> Best,
>> Piotrek
>>
>> śr., 6 sty 2021 o 04:37 Aeden Jameson <[hidden email]> napisał(a):
>>>
>>> I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
>>> feature of the Kafa SQL Connector. My current connector is configured
>>> as ,
>>>
>>> connector.type    = 'kafka'
>>> connector.version = 'universal'
>>> connector.topic   = 'my-topic'
>>> connector.properties.group.id = 'my-consumer-group'
>>> connector.properties.bootstrap.servers = '...'
>>> format.type = 'avro'
>>> format.avro-schema = '....'
>>>
>>> I tried adding
>>>
>>> key.fields = 'my_key_field'
>>>
>>> as well as
>>>
>>> key.format = 'avro'
>>> key.fields = 'my_key_field'
>>>
>>> but I get the exception
>>>
>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>>> Could not find a suitable table factory for
>>> 'org.apache.flink.table.factories.TableSourceFactory' in
>>> the classpath.
>>>
>>> Reason: No factory supports all properties.
>>>
>>> The matching candidates:
>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>> Unsupported property keys:
>>> key.fields
>>> key.format
>>>
>>> The following factories have been considered:
>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>>          at org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
>>>          at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
>>>          at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>>>          at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>>>          at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
>>>          ... 21 more
>>>
>>> I have validated that the uber jar clearly contains the 1.12
>>> dependencies. What is that magic combination of properties to get
>>> key.fields to work? Or is it not supported with avro?
>>>
>>> --
>>> Thank You,
>>> Aeden
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Using key.fields in 1.12

Aeden Jameson
Hi Timo,

Thanks for responding. You're right. So I did update the properties.
From what I can tell the new design you're referring to uses the
KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields)
options, instead of KafkaTableSourceSinkFactoryBase, which doesn't
support those options. Is that right? So I updated my configuration to

connector    = 'kafka'
topic   = 'my-topic'
properties.group.id = 'my-consumer-group'
properties.bootstrap.servers = '...'
format = 'avro'
format.avro-schema = '....'
key.fields = 'my_key_field'

However, the property format.avro-schema doesn't appear to be
supported by KafkaDynamicTableFactory. I get this exception.

Caused by: org.apache.flink.table.api.ValidationException: Unsupported
options found for connector 'kafka'.

Unsupported options:

format.avro-schema

Supported options:

connector
format
key.fields
key.fields-prefix
key.format
properties.bootstrap.servers
properties.group.id
property-version
scan.startup.mode
scan.startup.specific-offsets
scan.startup.timestamp-millis
scan.topic-partition-discovery.interval
sink.parallelism
sink.partitioner
sink.semantic
topic
topic-pattern
value.fields-include
value.format
        at org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324)
        at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554)
        at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573)
        at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141)
        at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
        ... 21 more

FAILURE: Build failed with an exception.




The format.avro-schema property was supported it what looks to me the
old design in in KafkaTableSourceSinkFactoryBase with this line,

    properties.add(FORMAT + ".*");

    https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160

Does format.avro-schema need to be specified differently?

Thank you,
Aeden

On Thu, Jan 7, 2021 at 12:15 AM Timo Walther <[hidden email]> wrote:

>
> Hi Aeden,
>
> we updated the connector property design in 1.11 [1]. The old
> translation layer exists for backwards compatibility and is indicated by
> `connector.type=kafka`.
>
> However, `connector = kafka` indicates the new property design and
> `key.fields` is only available there. Please check all properties again
> when upgrading, they are mentioned here [2].
>
> Regards,
> Timo
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/
>
>
> On 06.01.21 18:35, Aeden Jameson wrote:
> > Yes, I do have that dependency. I see it in the dependency view of
> > intellij and directly. in the uber jar. Thanks for responding.
> >
> > - Aeden
> >
> > On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski <[hidden email]> wrote:
> >>
> >> Hey,
> >>
> >> have you added Kafka connector as the dependency? [1]
> >>
> >> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies
> >>
> >> Best,
> >> Piotrek
> >>
> >> śr., 6 sty 2021 o 04:37 Aeden Jameson <[hidden email]> napisał(a):
> >>>
> >>> I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
> >>> feature of the Kafa SQL Connector. My current connector is configured
> >>> as ,
> >>>
> >>> connector.type    = 'kafka'
> >>> connector.version = 'universal'
> >>> connector.topic   = 'my-topic'
> >>> connector.properties.group.id = 'my-consumer-group'
> >>> connector.properties.bootstrap.servers = '...'
> >>> format.type = 'avro'
> >>> format.avro-schema = '....'
> >>>
> >>> I tried adding
> >>>
> >>> key.fields = 'my_key_field'
> >>>
> >>> as well as
> >>>
> >>> key.format = 'avro'
> >>> key.fields = 'my_key_field'
> >>>
> >>> but I get the exception
> >>>
> >>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> >>> Could not find a suitable table factory for
> >>> 'org.apache.flink.table.factories.TableSourceFactory' in
> >>> the classpath.
> >>>
> >>> Reason: No factory supports all properties.
> >>>
> >>> The matching candidates:
> >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> >>> Unsupported property keys:
> >>> key.fields
> >>> key.format
> >>>
> >>> The following factories have been considered:
> >>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> >>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> >>>          at org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
> >>>          at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
> >>>          at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
> >>>          at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
> >>>          at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
> >>>          ... 21 more
> >>>
> >>> I have validated that the uber jar clearly contains the 1.12
> >>> dependencies. What is that magic combination of properties to get
> >>> key.fields to work? Or is it not supported with avro?
> >>>
> >>> --
> >>> Thank You,
> >>> Aeden
> >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Using key.fields in 1.12

Timo Walther
Hi Aeden,

`format.avro-schema` is not required anymore in the new design. The Avro
schema is derived entirely from the table's schema.

Regards,
Timo



On 07.01.21 09:41, Aeden Jameson wrote:

> Hi Timo,
>
> Thanks for responding. You're right. So I did update the properties.
>>From what I can tell the new design you're referring to uses the
> KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields)
> options, instead of KafkaTableSourceSinkFactoryBase, which doesn't
> support those options. Is that right? So I updated my configuration to
>
> connector    = 'kafka'
> topic   = 'my-topic'
> properties.group.id = 'my-consumer-group'
> properties.bootstrap.servers = '...'
> format = 'avro'
> format.avro-schema = '....'
> key.fields = 'my_key_field'
>
> However, the property format.avro-schema doesn't appear to be
> supported by KafkaDynamicTableFactory. I get this exception.
>
> Caused by: org.apache.flink.table.api.ValidationException: Unsupported
> options found for connector 'kafka'.
>
> Unsupported options:
>
> format.avro-schema
>
> Supported options:
>
> connector
> format
> key.fields
> key.fields-prefix
> key.format
> properties.bootstrap.servers
> properties.group.id
> property-version
> scan.startup.mode
> scan.startup.specific-offsets
> scan.startup.timestamp-millis
> scan.topic-partition-discovery.interval
> sink.parallelism
> sink.partitioner
> sink.semantic
> topic
> topic-pattern
> value.fields-include
> value.format
>          at org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324)
>          at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554)
>          at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573)
>          at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141)
>          at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
>          ... 21 more
>
> FAILURE: Build failed with an exception.
>
>
>
>
> The format.avro-schema property was supported it what looks to me the
> old design in in KafkaTableSourceSinkFactoryBase with this line,
>
>      properties.add(FORMAT + ".*");
>
>      https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160
>
> Does format.avro-schema need to be specified differently?
>
> Thank you,
> Aeden
>
> On Thu, Jan 7, 2021 at 12:15 AM Timo Walther <[hidden email]> wrote:
>>
>> Hi Aeden,
>>
>> we updated the connector property design in 1.11 [1]. The old
>> translation layer exists for backwards compatibility and is indicated by
>> `connector.type=kafka`.
>>
>> However, `connector = kafka` indicates the new property design and
>> `key.fields` is only available there. Please check all properties again
>> when upgrading, they are mentioned here [2].
>>
>> Regards,
>> Timo
>>
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/
>>
>>
>> On 06.01.21 18:35, Aeden Jameson wrote:
>>> Yes, I do have that dependency. I see it in the dependency view of
>>> intellij and directly. in the uber jar. Thanks for responding.
>>>
>>> - Aeden
>>>
>>> On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski <[hidden email]> wrote:
>>>>
>>>> Hey,
>>>>
>>>> have you added Kafka connector as the dependency? [1]
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies
>>>>
>>>> Best,
>>>> Piotrek
>>>>
>>>> śr., 6 sty 2021 o 04:37 Aeden Jameson <[hidden email]> napisał(a):
>>>>>
>>>>> I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
>>>>> feature of the Kafa SQL Connector. My current connector is configured
>>>>> as ,
>>>>>
>>>>> connector.type    = 'kafka'
>>>>> connector.version = 'universal'
>>>>> connector.topic   = 'my-topic'
>>>>> connector.properties.group.id = 'my-consumer-group'
>>>>> connector.properties.bootstrap.servers = '...'
>>>>> format.type = 'avro'
>>>>> format.avro-schema = '....'
>>>>>
>>>>> I tried adding
>>>>>
>>>>> key.fields = 'my_key_field'
>>>>>
>>>>> as well as
>>>>>
>>>>> key.format = 'avro'
>>>>> key.fields = 'my_key_field'
>>>>>
>>>>> but I get the exception
>>>>>
>>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>>>>> Could not find a suitable table factory for
>>>>> 'org.apache.flink.table.factories.TableSourceFactory' in
>>>>> the classpath.
>>>>>
>>>>> Reason: No factory supports all properties.
>>>>>
>>>>> The matching candidates:
>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>>>> Unsupported property keys:
>>>>> key.fields
>>>>> key.format
>>>>>
>>>>> The following factories have been considered:
>>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>>>>           at org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
>>>>>           at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
>>>>>           at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>>>>>           at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>>>>>           at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
>>>>>           ... 21 more
>>>>>
>>>>> I have validated that the uber jar clearly contains the 1.12
>>>>> dependencies. What is that magic combination of properties to get
>>>>> key.fields to work? Or is it not supported with avro?
>>>>>
>>>>> --
>>>>> Thank You,
>>>>> Aeden
>>>
>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Using key.fields in 1.12

Aeden Jameson
 Brilliant, thank you. That will come in handy. I was looking through docs hoping there was a way to still specify the schema with no luck. Does such an option exist?

On Thu, Jan 7, 2021 at 2:33 AM Timo Walther <[hidden email]> wrote:
Hi Aeden,

`format.avro-schema` is not required anymore in the new design. The Avro
schema is derived entirely from the table's schema.

Regards,
Timo



On 07.01.21 09:41, Aeden Jameson wrote:
> Hi Timo,
>
> Thanks for responding. You're right. So I did update the properties.
>>From what I can tell the new design you're referring to uses the
> KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields)
> options, instead of KafkaTableSourceSinkFactoryBase, which doesn't
> support those options. Is that right? So I updated my configuration to
>
> connector    = 'kafka'
> topic   = 'my-topic'
> properties.group.id = 'my-consumer-group'
> properties.bootstrap.servers = '...'
> format = 'avro'
> format.avro-schema = '....'
> key.fields = 'my_key_field'
>
> However, the property format.avro-schema doesn't appear to be
> supported by KafkaDynamicTableFactory. I get this exception.
>
> Caused by: org.apache.flink.table.api.ValidationException: Unsupported
> options found for connector 'kafka'.
>
> Unsupported options:
>
> format.avro-schema
>
> Supported options:
>
> connector
> format
> key.fields
> key.fields-prefix
> key.format
> properties.bootstrap.servers
> properties.group.id
> property-version
> scan.startup.mode
> scan.startup.specific-offsets
> scan.startup.timestamp-millis
> scan.topic-partition-discovery.interval
> sink.parallelism
> sink.partitioner
> sink.semantic
> topic
> topic-pattern
> value.fields-include
> value.format
>          at org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324)
>          at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554)
>          at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573)
>          at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141)
>          at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
>          ... 21 more
>
> FAILURE: Build failed with an exception.
>
>
>
>
> The format.avro-schema property was supported it what looks to me the
> old design in in KafkaTableSourceSinkFactoryBase with this line,
>
>      properties.add(FORMAT + ".*");
>
>      https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160
>
> Does format.avro-schema need to be specified differently?
>
> Thank you,
> Aeden
>
> On Thu, Jan 7, 2021 at 12:15 AM Timo Walther <[hidden email]> wrote:
>>
>> Hi Aeden,
>>
>> we updated the connector property design in 1.11 [1]. The old
>> translation layer exists for backwards compatibility and is indicated by
>> `connector.type=kafka`.
>>
>> However, `connector = kafka` indicates the new property design and
>> `key.fields` is only available there. Please check all properties again
>> when upgrading, they are mentioned here [2].
>>
>> Regards,
>> Timo
>>
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/
>>
>>
>> On 06.01.21 18:35, Aeden Jameson wrote:
>>> Yes, I do have that dependency. I see it in the dependency view of
>>> intellij and directly. in the uber jar. Thanks for responding.
>>>
>>> - Aeden
>>>
>>> On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski <[hidden email]> wrote:
>>>>
>>>> Hey,
>>>>
>>>> have you added Kafka connector as the dependency? [1]
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies
>>>>
>>>> Best,
>>>> Piotrek
>>>>
>>>> śr., 6 sty 2021 o 04:37 Aeden Jameson <[hidden email]> napisał(a):
>>>>>
>>>>> I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
>>>>> feature of the Kafa SQL Connector. My current connector is configured
>>>>> as ,
>>>>>
>>>>> connector.type    = 'kafka'
>>>>> connector.version = 'universal'
>>>>> connector.topic   = 'my-topic'
>>>>> connector.properties.group.id = 'my-consumer-group'
>>>>> connector.properties.bootstrap.servers = '...'
>>>>> format.type = 'avro'
>>>>> format.avro-schema = '....'
>>>>>
>>>>> I tried adding
>>>>>
>>>>> key.fields = 'my_key_field'
>>>>>
>>>>> as well as
>>>>>
>>>>> key.format = 'avro'
>>>>> key.fields = 'my_key_field'
>>>>>
>>>>> but I get the exception
>>>>>
>>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>>>>> Could not find a suitable table factory for
>>>>> 'org.apache.flink.table.factories.TableSourceFactory' in
>>>>> the classpath.
>>>>>
>>>>> Reason: No factory supports all properties.
>>>>>
>>>>> The matching candidates:
>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>>>> Unsupported property keys:
>>>>> key.fields
>>>>> key.format
>>>>>
>>>>> The following factories have been considered:
>>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>>>>           at org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
>>>>>           at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
>>>>>           at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>>>>>           at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>>>>>           at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
>>>>>           ... 21 more
>>>>>
>>>>> I have validated that the uber jar clearly contains the 1.12
>>>>> dependencies. What is that magic combination of properties to get
>>>>> key.fields to work? Or is it not supported with avro?
>>>>>
>>>>> --
>>>>> Thank You,
>>>>> Aeden
>>>
>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Using key.fields in 1.12

Timo Walther
There are plans to also derive the table schema from Avro schema. But we
haven't decided on a syntax for this yet. For now, we only support this
through catalogs such as Confluent schema registry.

Regards,
Timo


On 07.01.21 21:42, Aeden Jameson wrote:

>   Brilliant, thank you. That will come in handy. I was looking through
> docs hoping there was a way to still specify the schema with no luck.
> Does such an option exist?
>
> On Thu, Jan 7, 2021 at 2:33 AM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Aeden,
>
>     `format.avro-schema` is not required anymore in the new design. The
>     Avro
>     schema is derived entirely from the table's schema.
>
>     Regards,
>     Timo
>
>
>
>     On 07.01.21 09:41, Aeden Jameson wrote:
>      > Hi Timo,
>      >
>      > Thanks for responding. You're right. So I did update the properties.
>      >>From what I can tell the new design you're referring to uses the
>      > KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields)
>      > options, instead of KafkaTableSourceSinkFactoryBase, which doesn't
>      > support those options. Is that right? So I updated my
>     configuration to
>      >
>      > connector    = 'kafka'
>      > topic   = 'my-topic'
>      > properties.group.id <http://properties.group.id> =
>     'my-consumer-group'
>      > properties.bootstrap.servers = '...'
>      > format = 'avro'
>      > format.avro-schema = '....'
>      > key.fields = 'my_key_field'
>      >
>      > However, the property format.avro-schema doesn't appear to be
>      > supported by KafkaDynamicTableFactory. I get this exception.
>      >
>      > Caused by: org.apache.flink.table.api.ValidationException:
>     Unsupported
>      > options found for connector 'kafka'.
>      >
>      > Unsupported options:
>      >
>      > format.avro-schema
>      >
>      > Supported options:
>      >
>      > connector
>      > format
>      > key.fields
>      > key.fields-prefix
>      > key.format
>      > properties.bootstrap.servers
>      > properties.group.id <http://properties.group.id>
>      > property-version
>      > scan.startup.mode
>      > scan.startup.specific-offsets
>      > scan.startup.timestamp-millis
>      > scan.topic-partition-discovery.interval
>      > sink.parallelism
>      > sink.partitioner
>      > sink.semantic
>      > topic
>      > topic-pattern
>      > value.fields-include
>      > value.format
>      >          at
>     org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324)
>      >          at
>     org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554)
>      >          at
>     org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573)
>      >          at
>     org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141)
>      >          at
>     org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
>      >          ... 21 more
>      >
>      > FAILURE: Build failed with an exception.
>      >
>      >
>      >
>      >
>      > The format.avro-schema property was supported it what looks to me the
>      > old design in in KafkaTableSourceSinkFactoryBase with this line,
>      >
>      >      properties.add(FORMAT + ".*");
>      >
>      >
>     https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160
>     <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160>
>      >
>      > Does format.avro-schema need to be specified differently?
>      >
>      > Thank you,
>      > Aeden
>      >
>      > On Thu, Jan 7, 2021 at 12:15 AM Timo Walther <[hidden email]
>     <mailto:[hidden email]>> wrote:
>      >>
>      >> Hi Aeden,
>      >>
>      >> we updated the connector property design in 1.11 [1]. The old
>      >> translation layer exists for backwards compatibility and is
>     indicated by
>      >> `connector.type=kafka`.
>      >>
>      >> However, `connector = kafka` indicates the new property design and
>      >> `key.fields` is only available there. Please check all
>     properties again
>      >> when upgrading, they are mentioned here [2].
>      >>
>      >> Regards,
>      >> Timo
>      >>
>      >>
>      >> [1]
>      >>
>     https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
>     <https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory>
>      >> [2]
>     https://ci.apache.org/projects/flink/flink-docs-release-1.12/
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.12/>
>      >>
>      >>
>      >> On 06.01.21 18:35, Aeden Jameson wrote:
>      >>> Yes, I do have that dependency. I see it in the dependency view of
>      >>> intellij and directly. in the uber jar. Thanks for responding.
>      >>>
>      >>> - Aeden
>      >>>
>      >>> On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski
>     <[hidden email] <mailto:[hidden email]>> wrote:
>      >>>>
>      >>>> Hey,
>      >>>>
>      >>>> have you added Kafka connector as the dependency? [1]
>      >>>>
>      >>>> [1]
>     https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies
>     <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies>
>      >>>>
>      >>>> Best,
>      >>>> Piotrek
>      >>>>
>      >>>> śr., 6 sty 2021 o 04:37 Aeden Jameson <[hidden email]
>     <mailto:[hidden email]>> napisał(a):
>      >>>>>
>      >>>>> I've upgraded from 1.11.1 to 1.12 in hopes of using the
>     key.fields
>      >>>>> feature of the Kafa SQL Connector. My current connector is
>     configured
>      >>>>> as ,
>      >>>>>
>      >>>>> connector.type    = 'kafka'
>      >>>>> connector.version = 'universal'
>      >>>>> connector.topic   = 'my-topic'
>      >>>>> connector.properties.group.id
>     <http://connector.properties.group.id> = 'my-consumer-group'
>      >>>>> connector.properties.bootstrap.servers = '...'
>      >>>>> format.type = 'avro'
>      >>>>> format.avro-schema = '....'
>      >>>>>
>      >>>>> I tried adding
>      >>>>>
>      >>>>> key.fields = 'my_key_field'
>      >>>>>
>      >>>>> as well as
>      >>>>>
>      >>>>> key.format = 'avro'
>      >>>>> key.fields = 'my_key_field'
>      >>>>>
>      >>>>> but I get the exception
>      >>>>>
>      >>>>> Caused by:
>     org.apache.flink.table.api.NoMatchingTableFactoryException:
>      >>>>> Could not find a suitable table factory for
>      >>>>> 'org.apache.flink.table.factories.TableSourceFactory' in
>      >>>>> the classpath.
>      >>>>>
>      >>>>> Reason: No factory supports all properties.
>      >>>>>
>      >>>>> The matching candidates:
>      >>>>>
>     org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>      >>>>> Unsupported property keys:
>      >>>>> key.fields
>      >>>>> key.format
>      >>>>>
>      >>>>> The following factories have been considered:
>      >>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>      >>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>      >>>>>
>     org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>      >>>>>           at
>     org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
>      >>>>>           at
>     org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
>      >>>>>           at
>     org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>      >>>>>           at
>     org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>      >>>>>           at
>     org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
>      >>>>>           ... 21 more
>      >>>>>
>      >>>>> I have validated that the uber jar clearly contains the 1.12
>      >>>>> dependencies. What is that magic combination of properties to get
>      >>>>> key.fields to work? Or is it not supported with avro?
>      >>>>>
>      >>>>> --
>      >>>>> Thank You,
>      >>>>> Aeden
>      >>>
>      >>>
>      >>>
>      >>
>      >
>