Flink sql using Hive for metastore throws Exception

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink sql using Hive for metastore throws Exception

Eleanore Jin
Hi experts,
I am trying to experiment how to use Hive to store metadata along using Flink SQL. I am running Hive inside a docker container locally, and running Flink SQL program through IDE.

Flink version 1.12.0

the sample code looks like:
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, settings);
String name = "myhive";   
String defaultDatabase = "default";
String hiveConfDir = "/opt/hive/conf/";
String version = "2.3.6";


HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.useDatabase("default");
but then I encountered:
Exception in thread "main" java.lang.IncompatibleClassChangeError: class org.apache.flink.sql.parser.validate.FlinkSqlConformance can not implement org.apache.calcite.sql.validate.SqlConformance, because it is not an interface (org.apache.calcite.sql.validate.SqlConformance is in unnamed module of loader 'app')
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1016)
at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:802)
at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:700)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:623)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
at org.apache.flink.table.planner.delegation.PlannerBase.<init>(PlannerBase.scala:113)
at org.apache.flink.table.planner.delegation.StreamPlanner.<init>(StreamPlanner.scala:47)
at org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:50)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:139)
at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:111)
at datatype.test.StreamMain.main(StreamMain.java:25).
The Exception is thrown when executing: StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, settings);
my pom dependencies include following, flink.version == 1.12.0, scala.binary.version == 2.1.1
<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.3.6</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-hive-2.3.6_2.12</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

</dependencies>
Thanks a lot!
Eleanore

Reply | Threaded
Open this post in threaded view
|

Re: Flink sql using Hive for metastore throws Exception

rmetzger0
I'm not sure if this is related, but you are mixing scala 2.11 and 2.12 dependencies (and mentioning scala 2.1.1 dependencies).

On Tue, Feb 2, 2021 at 8:32 AM Eleanore Jin <[hidden email]> wrote:
Hi experts,
I am trying to experiment how to use Hive to store metadata along using Flink SQL. I am running Hive inside a docker container locally, and running Flink SQL program through IDE.

Flink version 1.12.0

the sample code looks like:
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, settings);
String name = "myhive";   
String defaultDatabase = "default";
String hiveConfDir = "/opt/hive/conf/";
String version = "2.3.6";


HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.useDatabase("default");
but then I encountered:
Exception in thread "main" java.lang.IncompatibleClassChangeError: class org.apache.flink.sql.parser.validate.FlinkSqlConformance can not implement org.apache.calcite.sql.validate.SqlConformance, because it is not an interface (org.apache.calcite.sql.validate.SqlConformance is in unnamed module of loader 'app')
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1016)
at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:802)
at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:700)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:623)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
at org.apache.flink.table.planner.delegation.PlannerBase.<init>(PlannerBase.scala:113)
at org.apache.flink.table.planner.delegation.StreamPlanner.<init>(StreamPlanner.scala:47)
at org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:50)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:139)
at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:111)
at datatype.test.StreamMain.main(StreamMain.java:25).
The Exception is thrown when executing: StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, settings);
my pom dependencies include following, flink.version == 1.12.0, scala.binary.version == 2.1.1
<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.3.6</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-hive-2.3.6_2.12</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

</dependencies>
Thanks a lot!
Eleanore