Flink 1.11 Table API cannot process Avro

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.11 Table API cannot process Avro

Lian Jiang
Hi,

According to https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html, avro is supported for table API but below code failed:
tEnv.executeSql("CREATE TABLE people (\n" +
" id INT,\n" +
" name STRING\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 'file:///data/test.avro',\n" +
" 'format' = 'avro',\n" +
" 'record-class' = 'avro.Person',\n" +
" 'property-version' = '1',\n" +
" 'properties.bootstrap.servers' = 'kafka:9092'\n" +
")");

But got:
 Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not find any factory for identifier 'avro' that implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
jobmanager_1      |
jobmanager_1      | Available factory identifiers are:
jobmanager_1      |
jobmanager_1      | csv
jobmanager_1      | json
jobmanager_1      | parquet
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     ... 10 more
jobmanager_1      | Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'avro' that implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.

Any idea? Thanks!

Regards
Leon




Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 Table API cannot process Avro

Jörn Franke
You are missing additional dependencies 


Am 11.07.2020 um 04:16 schrieb Lian Jiang <[hidden email]>:


Hi,

According to https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html, avro is supported for table API but below code failed:
tEnv.executeSql("CREATE TABLE people (\n" +
" id INT,\n" +
" name STRING\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 'file:///data/test.avro',\n" +
" 'format' = 'avro',\n" +
" 'record-class' = 'avro.Person',\n" +
" 'property-version' = '1',\n" +
" 'properties.bootstrap.servers' = 'kafka:9092'\n" +
")");

But got:
 Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not find any factory for identifier 'avro' that implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
jobmanager_1      |
jobmanager_1      | Available factory identifiers are:
jobmanager_1      |
jobmanager_1      | csv
jobmanager_1      | json
jobmanager_1      | parquet
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     ... 10 more
jobmanager_1      | Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'avro' that implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.

Any idea? Thanks!

Regards
Leon




Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 Table API cannot process Avro

Lian Jiang
Thanks Jörn!

I added the documented dependency in my pom.xml file:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.11.0</version>
</dependency>

The newly generated jar does have:

$ jar tf target//spend-report-1.0.0.jar | grep FileSystemFormatFactory
org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.class
org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory$ParquetInputFormat.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroWriterFactory.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroInputFormat.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory$1.class

But still got the same error.  Anything else is missing? Thanks. Regards!


More detailed exception:
jobmanager_1      | Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not find any factory for identifier 'avro' that implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
jobmanager_1      |
jobmanager_1      | Available factory identifiers are:
jobmanager_1      |
jobmanager_1      | csv
jobmanager_1      | json
jobmanager_1      | parquet
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     ... 10 more
jobmanager_1      | Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'avro' that implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
jobmanager_1      |
jobmanager_1      | Available factory identifiers are:
jobmanager_1      |
jobmanager_1      | csv
jobmanager_1      | json
jobmanager_1      | parquet
jobmanager_1      |     at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.filesystem.FileSystemTableFactory.createFormatFactory(FileSystemTableFactory.java:112) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.filesystem.FileSystemTableSource.getInputFormat(FileSystemTableSource.java:143) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.filesystem.FileSystemTableSource.getDataStream(FileSystemTableSource.java:127) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.PhysicalLegacyTableSourceScan.getSourceTransformation(PhysicalLegacyTableSourceScan.scala:82) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:98) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:63) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlan(StreamExecLegacyTableSourceScan.scala:63) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.playgrounds.spendreport.SpendReport.localavro_mysql(SpendReport.java:220) ~[?:?]
jobmanager_1      |     at org.apache.flink.playgrounds.spendreport.SpendReport.main(SpendReport.java:31) ~[?:?]
jobmanager_1      |     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_252]
jobmanager_1      |     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_252]
jobmanager_1      |     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_252]
jobmanager_1      |     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_252]
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-dist_2.11-1.11.0.jar:1.11.0]

On Sat, Jul 11, 2020 at 12:33 AM Jörn Franke <[hidden email]> wrote:
You are missing additional dependencies 


Am 11.07.2020 um 04:16 schrieb Lian Jiang <[hidden email]>:


Hi,

According to https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html, avro is supported for table API but below code failed:
tEnv.executeSql("CREATE TABLE people (\n" +
" id INT,\n" +
" name STRING\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 'file:///data/test.avro',\n" +
" 'format' = 'avro',\n" +
" 'record-class' = 'avro.Person',\n" +
" 'property-version' = '1',\n" +
" 'properties.bootstrap.servers' = 'kafka:9092'\n" +
")");

But got:
 Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not find any factory for identifier 'avro' that implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
jobmanager_1      |
jobmanager_1      | Available factory identifiers are:
jobmanager_1      |
jobmanager_1      | csv
jobmanager_1      | json
jobmanager_1      | parquet
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     ... 10 more
jobmanager_1      | Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'avro' that implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.

Any idea? Thanks!

Regards
Leon






--
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 Table API cannot process Avro

Lian Jiang
i am using flink playground as the base:

I observed "PhysicalLegacyTableSourceScan". Not sure whether this is related. Thanks. Regards!

On Sat, Jul 11, 2020 at 3:43 PM Lian Jiang <[hidden email]> wrote:
Thanks Jörn!

I added the documented dependency in my pom.xml file:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.11.0</version>
</dependency>

The newly generated jar does have:

$ jar tf target//spend-report-1.0.0.jar | grep FileSystemFormatFactory
org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.class
org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory$ParquetInputFormat.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroWriterFactory.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroInputFormat.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory$1.class

But still got the same error.  Anything else is missing? Thanks. Regards!


More detailed exception:
jobmanager_1      | Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not find any factory for identifier 'avro' that implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
jobmanager_1      |
jobmanager_1      | Available factory identifiers are:
jobmanager_1      |
jobmanager_1      | csv
jobmanager_1      | json
jobmanager_1      | parquet
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     ... 10 more
jobmanager_1      | Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'avro' that implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
jobmanager_1      |
jobmanager_1      | Available factory identifiers are:
jobmanager_1      |
jobmanager_1      | csv
jobmanager_1      | json
jobmanager_1      | parquet
jobmanager_1      |     at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.filesystem.FileSystemTableFactory.createFormatFactory(FileSystemTableFactory.java:112) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.filesystem.FileSystemTableSource.getInputFormat(FileSystemTableSource.java:143) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.filesystem.FileSystemTableSource.getDataStream(FileSystemTableSource.java:127) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.PhysicalLegacyTableSourceScan.getSourceTransformation(PhysicalLegacyTableSourceScan.scala:82) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:98) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:63) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlan(StreamExecLegacyTableSourceScan.scala:63) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.playgrounds.spendreport.SpendReport.localavro_mysql(SpendReport.java:220) ~[?:?]
jobmanager_1      |     at org.apache.flink.playgrounds.spendreport.SpendReport.main(SpendReport.java:31) ~[?:?]
jobmanager_1      |     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_252]
jobmanager_1      |     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_252]
jobmanager_1      |     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_252]
jobmanager_1      |     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_252]
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-dist_2.11-1.11.0.jar:1.11.0]

On Sat, Jul 11, 2020 at 12:33 AM Jörn Franke <[hidden email]> wrote:
You are missing additional dependencies 


Am 11.07.2020 um 04:16 schrieb Lian Jiang <[hidden email]>:


Hi,

According to https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html, avro is supported for table API but below code failed:
tEnv.executeSql("CREATE TABLE people (\n" +
" id INT,\n" +
" name STRING\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 'file:///data/test.avro',\n" +
" 'format' = 'avro',\n" +
" 'record-class' = 'avro.Person',\n" +
" 'property-version' = '1',\n" +
" 'properties.bootstrap.servers' = 'kafka:9092'\n" +
")");

But got:
 Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not find any factory for identifier 'avro' that implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
jobmanager_1      |
jobmanager_1      | Available factory identifiers are:
jobmanager_1      |
jobmanager_1      | csv
jobmanager_1      | json
jobmanager_1      | parquet
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     ... 10 more
jobmanager_1      | Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'avro' that implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.

Any idea? Thanks!

Regards
Leon






--


--
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 Table API cannot process Avro

Shengkai Fang
It seems that you don't add additional dependencies. 

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.8.2</version>
</dependency>

Lian Jiang <[hidden email]> 于2020年7月12日周日 下午1:08写道:
i am using flink playground as the base:

I observed "PhysicalLegacyTableSourceScan". Not sure whether this is related. Thanks. Regards!

On Sat, Jul 11, 2020 at 3:43 PM Lian Jiang <[hidden email]> wrote:
Thanks Jörn!

I added the documented dependency in my pom.xml file:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.11.0</version>
</dependency>

The newly generated jar does have:

$ jar tf target//spend-report-1.0.0.jar | grep FileSystemFormatFactory
org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.class
org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory$ParquetInputFormat.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroWriterFactory.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroInputFormat.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory$1.class

But still got the same error.  Anything else is missing? Thanks. Regards!


More detailed exception:
jobmanager_1      | Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not find any factory for identifier 'avro' that implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
jobmanager_1      |
jobmanager_1      | Available factory identifiers are:
jobmanager_1      |
jobmanager_1      | csv
jobmanager_1      | json
jobmanager_1      | parquet
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     ... 10 more
jobmanager_1      | Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'avro' that implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
jobmanager_1      |
jobmanager_1      | Available factory identifiers are:
jobmanager_1      |
jobmanager_1      | csv
jobmanager_1      | json
jobmanager_1      | parquet
jobmanager_1      |     at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.filesystem.FileSystemTableFactory.createFormatFactory(FileSystemTableFactory.java:112) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.filesystem.FileSystemTableSource.getInputFormat(FileSystemTableSource.java:143) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.filesystem.FileSystemTableSource.getDataStream(FileSystemTableSource.java:127) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.PhysicalLegacyTableSourceScan.getSourceTransformation(PhysicalLegacyTableSourceScan.scala:82) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:98) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:63) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlan(StreamExecLegacyTableSourceScan.scala:63) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.playgrounds.spendreport.SpendReport.localavro_mysql(SpendReport.java:220) ~[?:?]
jobmanager_1      |     at org.apache.flink.playgrounds.spendreport.SpendReport.main(SpendReport.java:31) ~[?:?]
jobmanager_1      |     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_252]
jobmanager_1      |     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_252]
jobmanager_1      |     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_252]
jobmanager_1      |     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_252]
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-dist_2.11-1.11.0.jar:1.11.0]

On Sat, Jul 11, 2020 at 12:33 AM Jörn Franke <[hidden email]> wrote:
You are missing additional dependencies 


Am 11.07.2020 um 04:16 schrieb Lian Jiang <[hidden email]>:


Hi,

According to https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html, avro is supported for table API but below code failed:
tEnv.executeSql("CREATE TABLE people (\n" +
" id INT,\n" +
" name STRING\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 'file:///data/test.avro',\n" +
" 'format' = 'avro',\n" +
" 'record-class' = 'avro.Person',\n" +
" 'property-version' = '1',\n" +
" 'properties.bootstrap.servers' = 'kafka:9092'\n" +
")");

But got:
 Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not find any factory for identifier 'avro' that implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
jobmanager_1      |
jobmanager_1      | Available factory identifiers are:
jobmanager_1      |
jobmanager_1      | csv
jobmanager_1      | json
jobmanager_1      | parquet
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     ... 10 more
jobmanager_1      | Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'avro' that implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.

Any idea? Thanks!

Regards
Leon






--


--
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 Table API cannot process Avro

Leonard Xu
In reply to this post by Lian Jiang
Hi, Jiang


jobmanager_1      | Available factory identifiers are:
jobmanager_1      | 
jobmanager_1      | csv
jobmanager_1      | json
jobmanager_1      | parquet

After added the flink-avro dependency, did you restart your cluster/sql-client? It looks flink-avro dependency did not load properly from the log.


Best,
Leonard Xu
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 Table API cannot process Avro

Lian Jiang
Thanks guys.

I missed the runtime dependencies. After adding below into https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/Dockerfile. The original issue of "Could not find any factory for identifier" is gone.

wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.11.0/flink-avro-1.11.0.jar; \
wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar; \
wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar; \
wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar; \
wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar;


However, I got various NoSuchMethodException related to JsonNode/JsonNull/GenricRecord/...  The most recent exception is:
 
 java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.avro.generic.GenericRecord.<init>()
jobmanager_1      |     at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) ~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) ~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) ~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) ~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) ~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) ~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) ~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) ~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) ~[flink-avro-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(AvroFileSystemFormatFactory.java:200) ~[flink-avro-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.0.jar:1.11.0]

Is there a uber jar or a list of runtime dependencies so that developers can easily make the above example of Flink SQL for avro work? Thanks.





On Sat, Jul 11, 2020 at 11:39 PM Leonard Xu <[hidden email]> wrote:
Hi, Jiang


jobmanager_1      | Available factory identifiers are:
jobmanager_1      | 
jobmanager_1      | csv
jobmanager_1      | json
jobmanager_1      | parquet

After added the flink-avro dependency, did you restart your cluster/sql-client? It looks flink-avro dependency did not load properly from the log.


Best,
Leonard Xu


--
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 Table API cannot process Avro

Jark Wu-3
From the latest exception message, it seems that the avro factory problem has been resolved. 
The new exception indicates that you don't have proper Apache Avro dependencies (because flink-avro doesn't bundle Apache Avro), 
so you have to add Apache Avro into your project dependency, or export HADOOP_CLASSPATH if hadoop is installed in your environment.

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.8.2</version>
</dependency>

Best,
Jark

On Mon, 13 Jul 2020 at 03:04, Lian Jiang <[hidden email]> wrote:
Thanks guys.

I missed the runtime dependencies. After adding below into https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/Dockerfile. The original issue of "Could not find any factory for identifier" is gone.

wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.11.0/flink-avro-1.11.0.jar; \
wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar; \
wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar; \
wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar; \
wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar;


However, I got various NoSuchMethodException related to JsonNode/JsonNull/GenricRecord/...  The most recent exception is:
 
 java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.avro.generic.GenericRecord.<init>()
jobmanager_1      |     at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) ~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) ~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) ~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) ~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) ~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) ~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) ~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) ~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) ~[flink-avro-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(AvroFileSystemFormatFactory.java:200) ~[flink-avro-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.0.jar:1.11.0]

Is there a uber jar or a list of runtime dependencies so that developers can easily make the above example of Flink SQL for avro work? Thanks.





On Sat, Jul 11, 2020 at 11:39 PM Leonard Xu <[hidden email]> wrote:
Hi, Jiang


jobmanager_1      | Available factory identifiers are:
jobmanager_1      | 
jobmanager_1      | csv
jobmanager_1      | json
jobmanager_1      | parquet

After added the flink-avro dependency, did you restart your cluster/sql-client? It looks flink-avro dependency did not load properly from the log.


Best,
Leonard Xu


--
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 Table API cannot process Avro

Leonard Xu
In reply to this post by Lian Jiang
Hi, Jiang
Is there a uber jar or a list of runtime dependencies so that developers can easily make the above example of Flink SQL for avro work? Thanks.

The dependency list for using Avro in Flink SQL  is simple and has not a uber jar AFAIK, we only need to add `flink-avro` and `avro` dependency, the `avro` dependency is mannaged which means you do not need to add it if your dependency list has contained a `avro` dependency. I wrote a simple demo[1], hope it can help you.

Best,
Leonard Xu



 
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 Table API cannot process Avro

Lian Jiang
Thanks Leonard and Jark.

Here is my repo for your repro: https://bitbucket.org/jiangok/flink-playgrounds/src/0d242a51f02083711218d3810267117e6ce4260c/table-walkthrough/pom.xml#lines-131. As you can see, my pom.xml has already added flink-avro and avro dependencies.

You can run this repro by:

git clone [hidden email]:jiangok/flink-playgrounds.git
cd flink-playgrounds/table-walkthrough
. scripts/ops.sh  # this script has some helper commands.
rebuild               # this will build artifacts, docker and run.
log jobmanager  # this will print job manager log which has the exception.

Appreciate very much for your help!



table-walkthrough



On Sun, Jul 12, 2020 at 8:00 PM Leonard Xu <[hidden email]> wrote:
Hi, Jiang
Is there a uber jar or a list of runtime dependencies so that developers can easily make the above example of Flink SQL for avro work? Thanks.

The dependency list for using Avro in Flink SQL  is simple and has not a uber jar AFAIK, we only need to add `flink-avro` and `avro` dependency, the `avro` dependency is mannaged which means you do not need to add it if your dependency list has contained a `avro` dependency. I wrote a simple demo[1], hope it can help you.

Best,
Leonard Xu



 


--