Is CSV format supported for Kafka in Flink 1.10?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Is CSV format supported for Kafka in Flink 1.10?

kant kodali
Hi,

Is CSV format supported for Kafka in Flink 1.10? It says I need to specify connector.type as Filesystem but documentation says it is supported for Kafka?

import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class Test {

public static void main(String... args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setStateBackend((StateBackend) new RocksDBStateBackend("file:///tmp/rocksdb"));
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment, settings);

tableEnvironment
.connect(
new Kafka()
.version("0.11")
.topic("test-topic1")
)
.withFormat(new Csv())
.withSchema(new Schema().field("f0", DataTypes.STRING()))
.inAppendMode()
.createTemporaryTable("kafka_source");

Table resultTable = tableEnvironment.sqlQuery("select * from kafka_source");
tableEnvironment.toAppendStream(resultTable, Row.class).print();

tableEnvironment.execute("Sample Job");
}
}

This code generates the following error

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'

The following properties are requested:
connector.property-version=1
connector.topic=test-topic1
connector.type=kafka
connector.version=0.11
format.property-version=1
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=f0
update-mode=append

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
... 34 more


Reply | Threaded
Open this post in threaded view
|

Re: Is CSV format supported for Kafka in Flink 1.10?

Jark Wu-3
Hi Kant,

Csv is supported in Kafka, but you should download and load flink-csv sql jar into SQL CLI using `--library`.
Because, the Csv format factory is implemented in a separate module and not bundled by default. 


On Sun, 1 Mar 2020 at 03:48, kant kodali <[hidden email]> wrote:
Hi,

Is CSV format supported for Kafka in Flink 1.10? It says I need to specify connector.type as Filesystem but documentation says it is supported for Kafka?

import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class Test {

public static void main(String... args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setStateBackend((StateBackend) new RocksDBStateBackend("file:///tmp/rocksdb"));
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment, settings);

tableEnvironment
.connect(
new Kafka()
.version("0.11")
.topic("test-topic1")
)
.withFormat(new Csv())
.withSchema(new Schema().field("f0", DataTypes.STRING()))
.inAppendMode()
.createTemporaryTable("kafka_source");

Table resultTable = tableEnvironment.sqlQuery("select * from kafka_source");
tableEnvironment.toAppendStream(resultTable, Row.class).print();

tableEnvironment.execute("Sample Job");
}
}

This code generates the following error

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'

The following properties are requested:
connector.property-version=1
connector.topic=test-topic1
connector.type=kafka
connector.version=0.11
format.property-version=1
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=f0
update-mode=append

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
... 34 more