hi:
I am using flink's table api, I receive data from kafka, then register it as a table, then I use sql statement to process, and finally convert the result back to a stream, write to a directory, the code looks like this: def main(args: Array[String]): Unit = { val sEnv = StreamExecutionEnvironment.getExecutionEnvironment sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(sEnv) tEnv.connect( new Kafka() .version("0.11") .topic("user") .startFromEarliest() .property("zookeeper.connect", "") .property("bootstrap.servers", "") ) .withFormat( new Json() .failOnMissingField(false) .deriveSchema() //使用表的 schema ) .withSchema( new Schema() .field("username_skey", Types.STRING) ) .inAppendMode() .registerTableSource("user") val userTest: Table = tEnv.sqlQuery( """ select ** form ** join **"".stripMargin) val endStream = tEnv.toRetractStream[Row](userTest) endStream.writeAsText("/tmp/sqlres",WriteMode.OVERWRITE) sEnv.execute("Test_New_Sign_Student") } I was successful in the local test, but when I submit the following command in the cluster, I get the following error: ======================================================= org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120) Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath. Reason: No factory implements 'org.apache.flink.table.factories.DeserializationSchemaFactory'. The following properties are requested: connector.properties.0.key=zookeeper.connect .... schema.9.name=roles schema.9.type=VARCHAR update-mode=append The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory at org.apache.flink.table.factories.TableFactoryService$.filterByFactoryClass(TableFactoryService.scala:176) at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:125) at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:100) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.scala) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144) at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:50) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:44) at org.clay.test.Test_New_Sign_Student$.main(Test_New_Sign_Student.scala:64) at org.clay.test.Test_New_Sign_Student.main(Test_New_Sign_Student.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) =================================== Can someone tell me what caused this? I am very confused about this........ -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, I've pulled in Timo, who can help you with your problem. Cheers, Till On Tue, Sep 25, 2018 at 12:02 PM clay4444 <[hidden email]> wrote: hi: |
hi Till:
thank you for your reply there are some comments: I summit my task to yarn with following command: ./bin/flink run -c org.clay.test.TestXXXXX flinkTools-1.0.jar my pon look like this: <properties> <flink.version>1.6.0</flink.version> <isLocal>provided</isLocal> </properties> <repositories> <repository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories> <dependencies> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> <dependency> <groupId>net.minidev</groupId> <artifactId>json-smart</artifactId> <version>2.3</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>${joda-time.version}</version> </dependency> <dependency> <groupId>org.json4s</groupId> <artifactId>json4s-jackson_2.11</artifactId> <version>3.2.11</version> <exclusions> <exclusion> <artifactId>jackson-databind</artifactId> <groupId>com.fasterxml.jackson.core</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.json4s</groupId> <artifactId>json4s-native_2.11</artifactId> <version>3.2.11</version> </dependency> <dependency> <groupId>org.json4s</groupId> <artifactId>json4s-core_2.11</artifactId> <version>3.2.11</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_2.11</artifactId> <version>2.6.5</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.6.5</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.3</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.11</artifactId> <version>${flink.version}</version> <scope>${isLocal}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <scope>${isLocal}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime_2.11</artifactId> <version>${flink.version}</version> <scope>${isLocal}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hbase_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> <version>4.4.10</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0</version> <scope>${isLocal}</scope> <exclusions> <exclusion> <artifactId>xml-apis</artifactId> <groupId>xml-apis</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> <scope>${isLocal}</scope> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.clay.test.Test_New_Sign_Student</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> as the document, I have copy ./opt/flink-Table to ./lib/ -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Till Rohrmann
hi Till:
I have solve the problem, this reason is the flink-json which is add to pom didn't work must copy the flink-json-xxx.jar to flink path ./lib/ ....... -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
actually it should not be neccessary to put the flink-json format into /lib. Is the class `org.apache.flink.formats.json.JsonRowFormatFactory` present in the jar file you are creating with Maven? There should also be an entry in `META_INF/services/org.apache.flink.table.factories.TableFactory` for this class. Thanks for reporting this. Regards, Timo Am 26.09.18 um 06:25 schrieb clay4444: > hi Till: > > I have solve the problem, > this reason is the flink-json which is add to pom didn't work > > must copy the flink-json-xxx.jar to flink path ./lib/ > ....... > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |