I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
feature of the Kafa SQL Connector. My current connector is configured as , connector.type = 'kafka' connector.version = 'universal' connector.topic = 'my-topic' connector.properties.group.id = 'my-consumer-group' connector.properties.bootstrap.servers = '...' format.type = 'avro' format.avro-schema = '....' I tried adding key.fields = 'my_key_field' as well as key.format = 'avro' key.fields = 'my_key_field' but I get the exception Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: No factory supports all properties. The matching candidates: org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory Unsupported property keys: key.fields key.format The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory at org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46) ... 21 more I have validated that the uber jar clearly contains the 1.12 dependencies. What is that magic combination of properties to get key.fields to work? Or is it not supported with avro? -- Thank You, Aeden |
Hey, have you added Kafka connector as the dependency? [1] [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies Best, Piotrek śr., 6 sty 2021 o 04:37 Aeden Jameson <[hidden email]> napisał(a): I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields |
Yes, I do have that dependency. I see it in the dependency view of
intellij and directly. in the uber jar. Thanks for responding. - Aeden On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski <[hidden email]> wrote: > > Hey, > > have you added Kafka connector as the dependency? [1] > > [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies > > Best, > Piotrek > > śr., 6 sty 2021 o 04:37 Aeden Jameson <[hidden email]> napisał(a): >> >> I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields >> feature of the Kafa SQL Connector. My current connector is configured >> as , >> >> connector.type = 'kafka' >> connector.version = 'universal' >> connector.topic = 'my-topic' >> connector.properties.group.id = 'my-consumer-group' >> connector.properties.bootstrap.servers = '...' >> format.type = 'avro' >> format.avro-schema = '....' >> >> I tried adding >> >> key.fields = 'my_key_field' >> >> as well as >> >> key.format = 'avro' >> key.fields = 'my_key_field' >> >> but I get the exception >> >> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >> Could not find a suitable table factory for >> 'org.apache.flink.table.factories.TableSourceFactory' in >> the classpath. >> >> Reason: No factory supports all properties. >> >> The matching candidates: >> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >> Unsupported property keys: >> key.fields >> key.format >> >> The following factories have been considered: >> org.apache.flink.table.sources.CsvBatchTableSourceFactory >> org.apache.flink.table.sources.CsvAppendTableSourceFactory >> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >> at org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434) >> at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195) >> at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) >> at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) >> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46) >> ... 21 more >> >> I have validated that the uber jar clearly contains the 1.12 >> dependencies. What is that magic combination of properties to get >> key.fields to work? Or is it not supported with avro? >> >> -- >> Thank You, >> Aeden -- Cheers, Aeden GitHub: https://github.com/aedenj Linked In: http://www.linkedin.com/in/aedenjameson Blah Blah Blah: http://www.twitter.com/daliful |
Hi Aeden,
we updated the connector property design in 1.11 [1]. The old translation layer exists for backwards compatibility and is indicated by `connector.type=kafka`. However, `connector = kafka` indicates the new property design and `key.fields` is only available there. Please check all properties again when upgrading, they are mentioned here [2]. Regards, Timo [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ On 06.01.21 18:35, Aeden Jameson wrote: > Yes, I do have that dependency. I see it in the dependency view of > intellij and directly. in the uber jar. Thanks for responding. > > - Aeden > > On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski <[hidden email]> wrote: >> >> Hey, >> >> have you added Kafka connector as the dependency? [1] >> >> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies >> >> Best, >> Piotrek >> >> śr., 6 sty 2021 o 04:37 Aeden Jameson <[hidden email]> napisał(a): >>> >>> I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields >>> feature of the Kafa SQL Connector. My current connector is configured >>> as , >>> >>> connector.type = 'kafka' >>> connector.version = 'universal' >>> connector.topic = 'my-topic' >>> connector.properties.group.id = 'my-consumer-group' >>> connector.properties.bootstrap.servers = '...' >>> format.type = 'avro' >>> format.avro-schema = '....' >>> >>> I tried adding >>> >>> key.fields = 'my_key_field' >>> >>> as well as >>> >>> key.format = 'avro' >>> key.fields = 'my_key_field' >>> >>> but I get the exception >>> >>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >>> Could not find a suitable table factory for >>> 'org.apache.flink.table.factories.TableSourceFactory' in >>> the classpath. >>> >>> Reason: No factory supports all properties. >>> >>> The matching candidates: >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >>> Unsupported property keys: >>> key.fields >>> key.format >>> >>> The following factories have been considered: >>> org.apache.flink.table.sources.CsvBatchTableSourceFactory >>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >>> at org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434) >>> at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195) >>> at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) >>> at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) >>> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46) >>> ... 21 more >>> >>> I have validated that the uber jar clearly contains the 1.12 >>> dependencies. What is that magic combination of properties to get >>> key.fields to work? Or is it not supported with avro? >>> >>> -- >>> Thank You, >>> Aeden > > > |
Hi Timo,
Thanks for responding. You're right. So I did update the properties. From what I can tell the new design you're referring to uses the KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields) options, instead of KafkaTableSourceSinkFactoryBase, which doesn't support those options. Is that right? So I updated my configuration to connector = 'kafka' topic = 'my-topic' properties.group.id = 'my-consumer-group' properties.bootstrap.servers = '...' format = 'avro' format.avro-schema = '....' key.fields = 'my_key_field' However, the property format.avro-schema doesn't appear to be supported by KafkaDynamicTableFactory. I get this exception. Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for connector 'kafka'. Unsupported options: format.avro-schema Supported options: connector format key.fields key.fields-prefix key.format properties.bootstrap.servers properties.group.id property-version scan.startup.mode scan.startup.specific-offsets scan.startup.timestamp-millis scan.topic-partition-discovery.interval sink.parallelism sink.partitioner sink.semantic topic topic-pattern value.fields-include value.format at org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324) at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554) at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573) at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122) ... 21 more FAILURE: Build failed with an exception. The format.avro-schema property was supported it what looks to me the old design in in KafkaTableSourceSinkFactoryBase with this line, properties.add(FORMAT + ".*"); https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160 Does format.avro-schema need to be specified differently? Thank you, Aeden On Thu, Jan 7, 2021 at 12:15 AM Timo Walther <[hidden email]> wrote: > > Hi Aeden, > > we updated the connector property design in 1.11 [1]. The old > translation layer exists for backwards compatibility and is indicated by > `connector.type=kafka`. > > However, `connector = kafka` indicates the new property design and > `key.fields` is only available there. Please check all properties again > when upgrading, they are mentioned here [2]. > > Regards, > Timo > > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ > > > On 06.01.21 18:35, Aeden Jameson wrote: > > Yes, I do have that dependency. I see it in the dependency view of > > intellij and directly. in the uber jar. Thanks for responding. > > > > - Aeden > > > > On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski <[hidden email]> wrote: > >> > >> Hey, > >> > >> have you added Kafka connector as the dependency? [1] > >> > >> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies > >> > >> Best, > >> Piotrek > >> > >> śr., 6 sty 2021 o 04:37 Aeden Jameson <[hidden email]> napisał(a): > >>> > >>> I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields > >>> feature of the Kafa SQL Connector. My current connector is configured > >>> as , > >>> > >>> connector.type = 'kafka' > >>> connector.version = 'universal' > >>> connector.topic = 'my-topic' > >>> connector.properties.group.id = 'my-consumer-group' > >>> connector.properties.bootstrap.servers = '...' > >>> format.type = 'avro' > >>> format.avro-schema = '....' > >>> > >>> I tried adding > >>> > >>> key.fields = 'my_key_field' > >>> > >>> as well as > >>> > >>> key.format = 'avro' > >>> key.fields = 'my_key_field' > >>> > >>> but I get the exception > >>> > >>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > >>> Could not find a suitable table factory for > >>> 'org.apache.flink.table.factories.TableSourceFactory' in > >>> the classpath. > >>> > >>> Reason: No factory supports all properties. > >>> > >>> The matching candidates: > >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > >>> Unsupported property keys: > >>> key.fields > >>> key.format > >>> > >>> The following factories have been considered: > >>> org.apache.flink.table.sources.CsvBatchTableSourceFactory > >>> org.apache.flink.table.sources.CsvAppendTableSourceFactory > >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > >>> at org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434) > >>> at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195) > >>> at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) > >>> at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) > >>> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46) > >>> ... 21 more > >>> > >>> I have validated that the uber jar clearly contains the 1.12 > >>> dependencies. What is that magic combination of properties to get > >>> key.fields to work? Or is it not supported with avro? > >>> > >>> -- > >>> Thank You, > >>> Aeden > > > > > > > |
Hi Aeden,
`format.avro-schema` is not required anymore in the new design. The Avro schema is derived entirely from the table's schema. Regards, Timo On 07.01.21 09:41, Aeden Jameson wrote: > Hi Timo, > > Thanks for responding. You're right. So I did update the properties. >>From what I can tell the new design you're referring to uses the > KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields) > options, instead of KafkaTableSourceSinkFactoryBase, which doesn't > support those options. Is that right? So I updated my configuration to > > connector = 'kafka' > topic = 'my-topic' > properties.group.id = 'my-consumer-group' > properties.bootstrap.servers = '...' > format = 'avro' > format.avro-schema = '....' > key.fields = 'my_key_field' > > However, the property format.avro-schema doesn't appear to be > supported by KafkaDynamicTableFactory. I get this exception. > > Caused by: org.apache.flink.table.api.ValidationException: Unsupported > options found for connector 'kafka'. > > Unsupported options: > > format.avro-schema > > Supported options: > > connector > format > key.fields > key.fields-prefix > key.format > properties.bootstrap.servers > properties.group.id > property-version > scan.startup.mode > scan.startup.specific-offsets > scan.startup.timestamp-millis > scan.topic-partition-discovery.interval > sink.parallelism > sink.partitioner > sink.semantic > topic > topic-pattern > value.fields-include > value.format > at org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324) > at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554) > at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573) > at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141) > at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122) > ... 21 more > > FAILURE: Build failed with an exception. > > > > > The format.avro-schema property was supported it what looks to me the > old design in in KafkaTableSourceSinkFactoryBase with this line, > > properties.add(FORMAT + ".*"); > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160 > > Does format.avro-schema need to be specified differently? > > Thank you, > Aeden > > On Thu, Jan 7, 2021 at 12:15 AM Timo Walther <[hidden email]> wrote: >> >> Hi Aeden, >> >> we updated the connector property design in 1.11 [1]. The old >> translation layer exists for backwards compatibility and is indicated by >> `connector.type=kafka`. >> >> However, `connector = kafka` indicates the new property design and >> `key.fields` is only available there. Please check all properties again >> when upgrading, they are mentioned here [2]. >> >> Regards, >> Timo >> >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory >> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ >> >> >> On 06.01.21 18:35, Aeden Jameson wrote: >>> Yes, I do have that dependency. I see it in the dependency view of >>> intellij and directly. in the uber jar. Thanks for responding. >>> >>> - Aeden >>> >>> On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski <[hidden email]> wrote: >>>> >>>> Hey, >>>> >>>> have you added Kafka connector as the dependency? [1] >>>> >>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies >>>> >>>> Best, >>>> Piotrek >>>> >>>> śr., 6 sty 2021 o 04:37 Aeden Jameson <[hidden email]> napisał(a): >>>>> >>>>> I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields >>>>> feature of the Kafa SQL Connector. My current connector is configured >>>>> as , >>>>> >>>>> connector.type = 'kafka' >>>>> connector.version = 'universal' >>>>> connector.topic = 'my-topic' >>>>> connector.properties.group.id = 'my-consumer-group' >>>>> connector.properties.bootstrap.servers = '...' >>>>> format.type = 'avro' >>>>> format.avro-schema = '....' >>>>> >>>>> I tried adding >>>>> >>>>> key.fields = 'my_key_field' >>>>> >>>>> as well as >>>>> >>>>> key.format = 'avro' >>>>> key.fields = 'my_key_field' >>>>> >>>>> but I get the exception >>>>> >>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >>>>> Could not find a suitable table factory for >>>>> 'org.apache.flink.table.factories.TableSourceFactory' in >>>>> the classpath. >>>>> >>>>> Reason: No factory supports all properties. >>>>> >>>>> The matching candidates: >>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >>>>> Unsupported property keys: >>>>> key.fields >>>>> key.format >>>>> >>>>> The following factories have been considered: >>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory >>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >>>>> at org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434) >>>>> at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195) >>>>> at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) >>>>> at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) >>>>> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46) >>>>> ... 21 more >>>>> >>>>> I have validated that the uber jar clearly contains the 1.12 >>>>> dependencies. What is that magic combination of properties to get >>>>> key.fields to work? Or is it not supported with avro? >>>>> >>>>> -- >>>>> Thank You, >>>>> Aeden >>> >>> >>> >> > |
Brilliant, thank you. That will come in handy. I was looking through docs hoping there was a way to still specify the schema with no luck. Does such an option exist? On Thu, Jan 7, 2021 at 2:33 AM Timo Walther <[hidden email]> wrote: Hi Aeden, |
There are plans to also derive the table schema from Avro schema. But we
haven't decided on a syntax for this yet. For now, we only support this through catalogs such as Confluent schema registry. Regards, Timo On 07.01.21 21:42, Aeden Jameson wrote: > Brilliant, thank you. That will come in handy. I was looking through > docs hoping there was a way to still specify the schema with no luck. > Does such an option exist? > > On Thu, Jan 7, 2021 at 2:33 AM Timo Walther <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi Aeden, > > `format.avro-schema` is not required anymore in the new design. The > Avro > schema is derived entirely from the table's schema. > > Regards, > Timo > > > > On 07.01.21 09:41, Aeden Jameson wrote: > > Hi Timo, > > > > Thanks for responding. You're right. So I did update the properties. > >>From what I can tell the new design you're referring to uses the > > KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields) > > options, instead of KafkaTableSourceSinkFactoryBase, which doesn't > > support those options. Is that right? So I updated my > configuration to > > > > connector = 'kafka' > > topic = 'my-topic' > > properties.group.id <http://properties.group.id> = > 'my-consumer-group' > > properties.bootstrap.servers = '...' > > format = 'avro' > > format.avro-schema = '....' > > key.fields = 'my_key_field' > > > > However, the property format.avro-schema doesn't appear to be > > supported by KafkaDynamicTableFactory. I get this exception. > > > > Caused by: org.apache.flink.table.api.ValidationException: > Unsupported > > options found for connector 'kafka'. > > > > Unsupported options: > > > > format.avro-schema > > > > Supported options: > > > > connector > > format > > key.fields > > key.fields-prefix > > key.format > > properties.bootstrap.servers > > properties.group.id <http://properties.group.id> > > property-version > > scan.startup.mode > > scan.startup.specific-offsets > > scan.startup.timestamp-millis > > scan.topic-partition-discovery.interval > > sink.parallelism > > sink.partitioner > > sink.semantic > > topic > > topic-pattern > > value.fields-include > > value.format > > at > org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324) > > at > org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554) > > at > org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573) > > at > org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141) > > at > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122) > > ... 21 more > > > > FAILURE: Build failed with an exception. > > > > > > > > > > The format.avro-schema property was supported it what looks to me the > > old design in in KafkaTableSourceSinkFactoryBase with this line, > > > > properties.add(FORMAT + ".*"); > > > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160 > <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160> > > > > Does format.avro-schema need to be specified differently? > > > > Thank you, > > Aeden > > > > On Thu, Jan 7, 2021 at 12:15 AM Timo Walther <[hidden email] > <mailto:[hidden email]>> wrote: > >> > >> Hi Aeden, > >> > >> we updated the connector property design in 1.11 [1]. The old > >> translation layer exists for backwards compatibility and is > indicated by > >> `connector.type=kafka`. > >> > >> However, `connector = kafka` indicates the new property design and > >> `key.fields` is only available there. Please check all > properties again > >> when upgrading, they are mentioned here [2]. > >> > >> Regards, > >> Timo > >> > >> > >> [1] > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory> > >> [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/ > <https://ci.apache.org/projects/flink/flink-docs-release-1.12/> > >> > >> > >> On 06.01.21 18:35, Aeden Jameson wrote: > >>> Yes, I do have that dependency. I see it in the dependency view of > >>> intellij and directly. in the uber jar. Thanks for responding. > >>> > >>> - Aeden > >>> > >>> On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski > <[hidden email] <mailto:[hidden email]>> wrote: > >>>> > >>>> Hey, > >>>> > >>>> have you added Kafka connector as the dependency? [1] > >>>> > >>>> [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies> > >>>> > >>>> Best, > >>>> Piotrek > >>>> > >>>> śr., 6 sty 2021 o 04:37 Aeden Jameson <[hidden email] > <mailto:[hidden email]>> napisał(a): > >>>>> > >>>>> I've upgraded from 1.11.1 to 1.12 in hopes of using the > key.fields > >>>>> feature of the Kafa SQL Connector. My current connector is > configured > >>>>> as , > >>>>> > >>>>> connector.type = 'kafka' > >>>>> connector.version = 'universal' > >>>>> connector.topic = 'my-topic' > >>>>> connector.properties.group.id > <http://connector.properties.group.id> = 'my-consumer-group' > >>>>> connector.properties.bootstrap.servers = '...' > >>>>> format.type = 'avro' > >>>>> format.avro-schema = '....' > >>>>> > >>>>> I tried adding > >>>>> > >>>>> key.fields = 'my_key_field' > >>>>> > >>>>> as well as > >>>>> > >>>>> key.format = 'avro' > >>>>> key.fields = 'my_key_field' > >>>>> > >>>>> but I get the exception > >>>>> > >>>>> Caused by: > org.apache.flink.table.api.NoMatchingTableFactoryException: > >>>>> Could not find a suitable table factory for > >>>>> 'org.apache.flink.table.factories.TableSourceFactory' in > >>>>> the classpath. > >>>>> > >>>>> Reason: No factory supports all properties. > >>>>> > >>>>> The matching candidates: > >>>>> > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > >>>>> Unsupported property keys: > >>>>> key.fields > >>>>> key.format > >>>>> > >>>>> The following factories have been considered: > >>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory > >>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory > >>>>> > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > >>>>> at > org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434) > >>>>> at > org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195) > >>>>> at > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) > >>>>> at > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) > >>>>> at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46) > >>>>> ... 21 more > >>>>> > >>>>> I have validated that the uber jar clearly contains the 1.12 > >>>>> dependencies. What is that magic combination of properties to get > >>>>> key.fields to work? Or is it not supported with avro? > >>>>> > >>>>> -- > >>>>> Thank You, > >>>>> Aeden > >>> > >>> > >>> > >> > > > |
Free forum by Nabble | Edit this page |