Hi, I want to test flink sql locally by consuming kafka data in flink 1.7, but it turns out an exception like below. Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' inthe classpath.Reason: No context matches.The following properties are requested:connector.properties.0.key=fetch.message.max.bytesconnector.properties.0.value=10485760connector.properties.1.key=zookeeper.connectconnector.properties.1.value=10.xxx.:2181/kafkaconnector.properties.2.key=group.idconnector.properties.2.value=d4b53966-796e-4a2d-b6eb-a5c489db2b21connector.properties.3.key=bootstrap.serversconnector.properties.3.value=10.xxx:9092connector.property-version=1connector.startup-mode=latest-offsetconnector.topic=-flink-testconnector.type=kafkaconnector.version=0.10format.derive-schema=trueformat.property-version=1format.type=jsonschema.0.name=rideIdschema.0.type=VARCHARschema.1.name=lonschema.1.type=VARCHARThe following factories have been considered:org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactoryorg.apache.flink.table.sources.CsvBatchTableSourceFactoryorg.apache.flink.table.sources.CsvAppendTableSourceFactoryorg.apache.flink.table.sinks.CsvBatchTableSinkFactoryorg.apache.flink.table.sinks.CsvAppendTableSinkFactoryorg.apache.flink.formats.json.JsonRowFormatFactoryat org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)at TableSourceFinder.main(TableSourceFinder.java:40) here is my code: public static void main(String[] args) throws Exception{ } And here is my pom.xml <dependency> </dependency>
<dependency> In my opinion, I have all the lib in pom, don't know why it would fail in test locally. Thank you for any hints. Yours Joshua |
Hi Jashua,
according to the property list, you
passed "connector.version=0.10" so a Kafka 0.8 factory will not
match.
Are you sure you are compiling the
right thing? There seems to be a mismatch between your screenshot
and the exception.
Regards,
Timo
Am 11.01.19 um 15:43 schrieb Joshua
Fan:
|
Hi Timo Thank you for your advice. It is truely a typo. After I fix it, the same exception remains. But when I add the inAppendMode() to the StreamTableDescriptor, the exception disappears, and it can find the proper kafka08factory. And another exception turns out. Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unexpected character ('-' (code 45)): Expected space separating root-level values at [Source: [B@69e1cfbe; line: 1, column: 6] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2355) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:94) But actually, I produced the json data to the topic, why flink can not deserialize it? It is weird. Yours Joshua On Fri, Jan 11, 2019 at 11:02 PM Timo Walther <[hidden email]> wrote:
|
May be you're generating non-standard JSON record.
-- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Zhenghua Yes, the topic is polluted somehow. After I create a new topic to consume, It is OK now. Yours sincerely Joshua On Tue, Jan 15, 2019 at 4:28 PM Zhenghua Gao <[hidden email]> wrote: May be you're generating non-standard JSON record. |
Free forum by Nabble | Edit this page |