I am trying to experiment how to use Hive to store metadata along using Flink SQL. I am running Hive inside a docker container locally, and running Flink SQL program through IDE.
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, settings);
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/opt/hive/conf/";
String version = "2.3.6";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.useDatabase("default");
but then I encountered:
Exception in thread "main" java.lang.IncompatibleClassChangeError: class org.apache.flink.sql.parser.validate.FlinkSqlConformance can not implement org.apache.calcite.sql.validate.SqlConformance, because it is not an interface (org.apache.calcite.sql.validate.SqlConformance is in unnamed module of loader 'app')
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1016)
at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:802)
at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:700)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:623)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
at org.apache.flink.table.planner.delegation.PlannerBase.<init>(PlannerBase.scala:113)
at org.apache.flink.table.planner.delegation.StreamPlanner.<init>(StreamPlanner.scala:47)
at org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:50)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:139)
at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:111)
at datatype.test.StreamMain.main(StreamMain.java:25).
The Exception is thrown when executing: StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, settings);
my pom dependencies include following, flink.version == 1.12.0, scala.binary.version == 2.1.1
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.3.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-hive-2.3.6_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
Thanks a lot!
Eleanore