I've been trying to write to the avro format with pyflink 1.12.2 on ubuntu, I've tested my code with an iterator writing to csv and everything works as expected. Reading through the flink documentation I see that I should add jar dependencies to work with avro. I downloaded three jar files that I believe are required for avro like so: table_env\ .get_config()\ .get_configuration()\ .set_string( "pipeline.jars", rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar" ) I suspect I'm not loading the jar files correctly, but it's unclear what I'm supposed to do as I'm not familiar with java and when I switch the sink format to avro I get some unexpected errors: Py4JJavaError: An error occurred while calling o746.executeInsert. : java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter at org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71) at org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61) at org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373) at org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365) at org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163) at org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139) at org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134) at org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95) at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87) at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42) at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86) at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676) at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) My sample code as follows: sink_ddl = f""" create table Results( a STRING, b STRING ) with ( 'connector' = 'filesystem', 'path' = '{result_path}', 'format' = 'avro' ) """ table_env.execute_sql(sink_ddl) table.execute_insert("Results").wait() Could someone help or point me in the right direction to look? |
I guess you only need file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar. Could you remove flink-avro-1.12.2.jar and avro-1.10.2.jar and try again?
Regards, Dian
|
Hi Dian, I tried your suggestion but had the same error message unfortunately. I also tried file:/ and file:// with the same error, not sure what's going on, I assume writing to avro works fine in java and scala? Eddie On Sat, Apr 24, 2021 at 10:03 PM Dian Fu <[hidden email]> wrote:
|
Hi Eddie,
I have tried your program with the following changes and it could execute successfully: - Replace `rf"<a href="file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar”`" class="">file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar”` with rf`"file:///Users/dianfu/code/src/apache/flink/flink-sql-avro-1.12.3.jar”` - Use flink-sql-avro-1.12.3.jar [1] instead of flink-sql-avro-1.12.2.jar as I encountered issue FLINK-21012 [2] which has been addressed in 1.12.3 For your problem, I suspect if `file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar` really exists. Could you double check that? Regards, Dian
|
Hi Dian, Thanks for trying it out, it ruled out a problem with the python code. I double checked the jar path and only included the jar you referenced without any luck. However, I tried creating a python 3.7 (had 3.8) environment for pyflink and the code worked without any errors! On Sun, Apr 25, 2021, 10:09 PM Dian Fu <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |