sql program throw exception when new kafka with csv format

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

sql program throw exception when new kafka with csv format

Marvin777
Register kafka message source with csv format,  the error message is as follows:

Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No context matches.

BTW, the flink version is 1.6.2 .  

Thanks Marvin.      

image.png

Reply | Threaded
Open this post in threaded view
|

Re: sql program throw exception when new kafka with csv format

Hequn Cheng
Hi Marvin,

I had taken a look at the Flink code. It seems we can't use CSV format for Kafka. You can use JSON instead.
As the exception shows, Flink can't find a suitable DeserializationSchemaFactory. Currently, only JSON and Avro support DeserializationSchemaFactory.

Best, Hequn

On Tue, Dec 11, 2018 at 5:48 PM Marvin777 <[hidden email]> wrote:
Register kafka message source with csv format,  the error message is as follows:

Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No context matches.

BTW, the flink version is 1.6.2 .  

Thanks Marvin.      



Reply | Threaded
Open this post in threaded view
|

Re: sql program throw exception when new kafka with csv format

Timo Walther
In reply to this post by Marvin777
Hi Marvin,

the CSV format is not supported for Kafka so far. Only formats that have the tag `DeserializationSchema` in the docs are supported.

Right now you have to implement you own DeserializationSchemaFactory or use JSON or Avro.

You can follow [1] to get informed once the CSV format is supported. I'm sure it will be merge for Flink 1.8.

Regards,
Timo



Am 11.12.18 um 10:41 schrieb Marvin777:
Register kafka message source with csv format,  the error message is as follows:

Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No context matches.

BTW, the flink version is 1.6.2 .  

Thanks Marvin.      

image.png