Hi,
Leon
The exception comes from Hadoop side, looks like you missed some Hadoop dependencies.
Hadoop is needed for Parquet, compared to add Hadoop-related dependencies directly, it’s recommended to set HADOOP_CLASSPATH or use flink shaded hadoop uber jar[1].
Best,
Leonard Xu
Hi,
I am trying Table API in Flink 1.11:
tEnv.executeSql("CREATE TABLE people (\n" +
" id INT,\n" +
" name STRING\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 'file:///data/test.parquet',\n" +
" 'format' = 'parquet',\n" +
" 'properties.bootstrap.servers' = 'kafka:9092'\n" +
")");
It failed:
jobmanager_1 | java.io.IOException: No FileSystem for scheme: file
jobmanager_1 | at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2799) ~[?:?]
jobmanager_1 | at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2810) ~[?:?]
jobmanager_1 | at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100) ~[?:?]
jobmanager_1 | at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849) ~[?:?]
jobmanager_1 | at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831) ~[?:?]
jobmanager_1 | at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389) ~[?:?]
jobmanager_1 | at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356) ~[?:?]
jobmanager_1 | at org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38) ~[?:?]
jobmanager_1 | at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:448) ~[?:?]
jobmanager_1 | at org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReader.<init>(ParquetColumnarRowSplitReader.java:125) ~[?:?]
jobmanager_1 | at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:131) ~[?:?]
jobmanager_1 | at org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory$ParquetInputFormat.open(ParquetFileSystemFormatFactory.java:173) ~[?:?]
jobmanager_1 | at org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory$ParquetInputFormat.open(ParquetFileSystemFormatFactory.java:128) ~[?:?]
jobmanager_1 | at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:85) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
job
The pom.xml has below dependencies:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.12</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.4</version>
</dependency>
Any idea? Thanks!
Regards
Leon