NoMatchingTableFactoryException when test flink sql with kafka in flink 1.7

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

NoMatchingTableFactoryException when test flink sql with kafka in flink 1.7

Joshua Fan
Hi, 

I want to test flink sql locally by consuming kafka data in flink 1.7, but it turns out an exception like below.

Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.

Reason: No context matches.

The following properties are requested:
connector.properties.0.key=fetch.message.max.bytes
connector.properties.0.value=10485760
connector.properties.1.key=zookeeper.connect
connector.properties.1.value=10.xxx.:2181/kafka
connector.properties.2.key=group.id
connector.properties.2.value=d4b53966-796e-4a2d-b6eb-a5c489db2b21
connector.properties.3.key=bootstrap.servers
connector.properties.3.value=10.xxx:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=-flink-test
connector.type=kafka
connector.version=0.10
format.derive-schema=true
format.property-version=1
format.type=json
schema.0.name=rideId
schema.0.type=VARCHAR
schema.1.name=lon
schema.1.type=VARCHAR

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.formats.json.JsonRowFormatFactory

at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
at TableSourceFinder.main(TableSourceFinder.java:40)

here is my code:
public static void main(String[] args) throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment stEnv = TableEnvironment.getTableEnvironment(env);
Kafka kafka = new Kafka();
Properties properties = new Properties();
String zkString = "10.xxx:2181/kafka";
String brokerList = "10.xxx:9092";

properties.setProperty("fetch.message.max.bytes", "10485760");
properties.setProperty("group.id", UUID.randomUUID().toString());
properties.setProperty("zookeeper.connect", zkString);
properties.setProperty("bootstrap.servers", brokerList);
kafka.version("0.8").topic("flink-test").properties(properties);
kafka.startFromLatest();
stEnv.connect(kafka).withSchema(
new Schema()
.field("rideId", Types.STRING())
.field("lon", Types.STRING()))
.withFormat(new Json().deriveSchema())

.registerTableSource("test");

Table table = stEnv.sqlQuery("select rideId from test");
DataStream ds = ((org.apache.flink.table.api.java.StreamTableEnvironment) stEnv).
toAppendStream(table,Types.STRING());
ds.print();
env.execute("KafkaSql");
} 

And here is my pom.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

In my opinion, I have all the lib in pom, don't know why it would fail in test locally.

Thank you for any hints.

Yours
Joshua
Reply | Threaded
Open this post in threaded view
|

Re: NoMatchingTableFactoryException when test flink sql with kafka in flink 1.7

Timo Walther
Hi Jashua,

according to the property list, you passed "connector.version=0.10" so a Kafka 0.8 factory will not match.

Are you sure you are compiling the right thing? There seems to be a mismatch between your screenshot and the exception.

Regards,
Timo

Am 11.01.19 um 15:43 schrieb Joshua Fan:
Hi, 

I want to test flink sql locally by consuming kafka data in flink 1.7, but it turns out an exception like below.

Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.

Reason: No context matches.

The following properties are requested:
connector.properties.0.key=fetch.message.max.bytes
connector.properties.0.value=10485760
connector.properties.1.key=zookeeper.connect
connector.properties.1.value=10.xxx.:2181/kafka
connector.properties.2.key=group.id
connector.properties.2.value=d4b53966-796e-4a2d-b6eb-a5c489db2b21
connector.properties.3.key=bootstrap.servers
connector.properties.3.value=10.xxx:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=-flink-test
connector.type=kafka
connector.version=0.10
format.derive-schema=true
format.property-version=1
format.type=json
schema.0.name=rideId
schema.0.type=VARCHAR
schema.1.name=lon
schema.1.type=VARCHAR

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.formats.json.JsonRowFormatFactory

at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
at TableSourceFinder.main(TableSourceFinder.java:40)

here is my code:
public static void main(String[] args) throws Exception{
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment stEnv = TableEnvironment.getTableEnvironment(env);
        Kafka kafka = new Kafka();
        Properties properties = new Properties();
        String zkString = "10.xxx:2181/kafka";
        String brokerList = "10.xxx:9092";

        properties.setProperty("fetch.message.max.bytes", "10485760");
        properties.setProperty("group.id", UUID.randomUUID().toString());
        properties.setProperty("zookeeper.connect", zkString);
        properties.setProperty("bootstrap.servers", brokerList);
        kafka.version("0.8").topic("flink-test").properties(properties);
        kafka.startFromLatest();
        stEnv.connect(kafka).withSchema(
                new Schema()
                        .field("rideId", Types.STRING())
                        .field("lon", Types.STRING()))
                .withFormat(new Json().deriveSchema())

                .registerTableSource("test");

        Table table = stEnv.sqlQuery("select rideId from test");
        DataStream ds = ((org.apache.flink.table.api.java.StreamTableEnvironment) stEnv).
                toAppendStream(table,Types.STRING());
        ds.print();
        env.execute("KafkaSql");
} 

And here is my pom.xml
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

In my opinion, I have all the lib in pom, don't know why it would fail in test locally.

Thank you for any hints.

Yours
Joshua


Reply | Threaded
Open this post in threaded view
|

Re: NoMatchingTableFactoryException when test flink sql with kafka in flink 1.7

Joshua Fan
Hi Timo

Thank you for your advice. It is truely a typo. After I fix it, the same exception remains.

But when I add the inAppendMode() to the StreamTableDescriptor, the exception disappears, and it can find the proper kafka08factory.

And another exception turns out.
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unexpected character ('-' (code 45)): Expected space separating root-level values
 at [Source: [B@69e1cfbe; line: 1, column: 6]
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2355)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:94)

But actually, I produced the json data to the topic, why flink can not deserialize it? It is weird.

Yours 
Joshua

On Fri, Jan 11, 2019 at 11:02 PM Timo Walther <[hidden email]> wrote:
Hi Jashua,

according to the property list, you passed "connector.version=0.10" so a Kafka 0.8 factory will not match.

Are you sure you are compiling the right thing? There seems to be a mismatch between your screenshot and the exception.

Regards,
Timo

Am 11.01.19 um 15:43 schrieb Joshua Fan:
Hi, 

I want to test flink sql locally by consuming kafka data in flink 1.7, but it turns out an exception like below.

Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.

Reason: No context matches.

The following properties are requested:
connector.properties.0.key=fetch.message.max.bytes
connector.properties.0.value=10485760
connector.properties.1.key=zookeeper.connect
connector.properties.1.value=10.xxx.:2181/kafka
connector.properties.2.key=group.id
connector.properties.2.value=d4b53966-796e-4a2d-b6eb-a5c489db2b21
connector.properties.3.key=bootstrap.servers
connector.properties.3.value=10.xxx:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=-flink-test
connector.type=kafka
connector.version=0.10
format.derive-schema=true
format.property-version=1
format.type=json
schema.0.name=rideId
schema.0.type=VARCHAR
schema.1.name=lon
schema.1.type=VARCHAR

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.formats.json.JsonRowFormatFactory

at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
at TableSourceFinder.main(TableSourceFinder.java:40)

here is my code:
public static void main(String[] args) throws Exception{
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment stEnv = TableEnvironment.getTableEnvironment(env);
        Kafka kafka = new Kafka();
        Properties properties = new Properties();
        String zkString = "10.xxx:2181/kafka";
        String brokerList = "10.xxx:9092";

        properties.setProperty("fetch.message.max.bytes", "10485760");
        properties.setProperty("group.id", UUID.randomUUID().toString());
        properties.setProperty("zookeeper.connect", zkString);
        properties.setProperty("bootstrap.servers", brokerList);
        kafka.version("0.8").topic("flink-test").properties(properties);
        kafka.startFromLatest();
        stEnv.connect(kafka).withSchema(
                new Schema()
                        .field("rideId", Types.STRING())
                        .field("lon", Types.STRING()))
                .withFormat(new Json().deriveSchema())

                .registerTableSource("test");

        Table table = stEnv.sqlQuery("select rideId from test");
        DataStream ds = ((org.apache.flink.table.api.java.StreamTableEnvironment) stEnv).
                toAppendStream(table,Types.STRING());
        ds.print();
        env.execute("KafkaSql");
} 

And here is my pom.xml
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

In my opinion, I have all the lib in pom, don't know why it would fail in test locally.

Thank you for any hints.

Yours
Joshua


Reply | Threaded
Open this post in threaded view
|

Re: NoMatchingTableFactoryException when test flink sql with kafka in flink 1.7

Zhenghua Gao
May be you're generating non-standard JSON record.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: NoMatchingTableFactoryException when test flink sql with kafka in flink 1.7

Joshua Fan
Hi Zhenghua

Yes, the topic is polluted somehow. After I create a new topic to consume, It is OK now.

Yours sincerely
Joshua

On Tue, Jan 15, 2019 at 4:28 PM Zhenghua Gao <[hidden email]> wrote:
May be you're generating non-standard JSON record.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/