Writing to Avro from pyflink

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Writing to Avro from pyflink

ryeyoo
I've been trying to write to the avro format with pyflink 1.12.2 on ubuntu, I've tested my code with an iterator writing to csv and everything works as expected. Reading through the flink documentation I see that I should add jar dependencies to work with avro. I downloaded three jar files that I believe are required for avro like so:

table_env\
    .get_config()\
    .get_configuration()\
    .set_string(
    "pipeline.jars",
    rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar"
)

I suspect I'm not loading the jar files correctly, but it's unclear what I'm supposed to do as I'm not familiar with java and when I switch the sink format to avro I get some unexpected errors:
Py4JJavaError: An error occurred while calling o746.executeInsert.
: java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
	at org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
	at org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
	at org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
	at org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
	at org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
	at org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
	at org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
	at org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
	at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
	at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
	at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
	at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
	at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
	at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)

My sample code as follows:

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, EnvironmentSettings

env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)

table_env\
    .get_config()\
    .get_configuration()\
    .set_string(
    "pipeline.jars",
    rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar"
)

table = table_env.from_elements(
    a,
    schema=DataTypes.ROW([
        DataTypes.FIELD('text', DataTypes.STRING()),
        DataTypes.FIELD('text1', DataTypes.STRING())
    ])
)

sink_ddl = f"""
    create table Results(
        a STRING,
        b STRING
    ) with (
        'connector' = 'filesystem',
        'path' = '{result_path}',
        'format' = 'avro'
    )
"""

table_env.execute_sql(sink_ddl)
table.execute_insert("Results").wait()

Could someone help or point me in the right direction to look?
Reply | Threaded
Open this post in threaded view
|

Re: Writing to Avro from pyflink

Dian Fu
I guess you only need file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jarCould you remove flink-avro-1.12.2.jar and avro-1.10.2.jar and try again? 

Regards,
Dian

2021年4月24日 上午8:29,Edward Yang <[hidden email]> 写道:

I've been trying to write to the avro format with pyflink 1.12.2 on ubuntu, I've tested my code with an iterator writing to csv and everything works as expected. Reading through the flink documentation I see that I should add jar dependencies to work with avro. I downloaded three jar files that I believe are required for avro like so:

table_env\
    .get_config()\
    .get_configuration()\
    .set_string(
    "pipeline.jars",
    rf"<a href="file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar" class="">file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar"
)

I suspect I'm not loading the jar files correctly, but it's unclear what I'm supposed to do as I'm not familiar with java and when I switch the sink format to avro I get some unexpected errors:
Py4JJavaError: An error occurred while calling o746.executeInsert.
: java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
	at org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
	at org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
	at org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
	at org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
	at org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
	at org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
	at org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
	at org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
	at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
	at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
	at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
	at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
	at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
	at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)

My sample code as follows:

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, EnvironmentSettings

env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)

table_env\
    .get_config()\
    .get_configuration()\
    .set_string(
    "pipeline.jars",
    rf"<a href="file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar" class="">file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar"
)

table = table_env.from_elements(
    a,
    schema=DataTypes.ROW([
        DataTypes.FIELD('text', DataTypes.STRING()),
        DataTypes.FIELD('text1', DataTypes.STRING())
    ])
)

sink_ddl = f"""
    create table Results(
        a STRING,
        b STRING
    ) with (
        'connector' = 'filesystem',
        'path' = '{result_path}',
        'format' = 'avro'
    )
"""

table_env.execute_sql(sink_ddl)
table.execute_insert("Results").wait()

Could someone help or point me in the right direction to look?

Reply | Threaded
Open this post in threaded view
|

Re: Writing to Avro from pyflink

ryeyoo
Hi Dian,

I tried your suggestion but had the same error message unfortunately. I also tried file:/ and file:// with the same error, not sure what's going on, I assume writing to avro works fine in java and scala?

Eddie

On Sat, Apr 24, 2021 at 10:03 PM Dian Fu <[hidden email]> wrote:
I guess you only need file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jarCould you remove flink-avro-1.12.2.jar and avro-1.10.2.jar and try again? 

Regards,
Dian

2021年4月24日 上午8:29,Edward Yang <[hidden email]> 写道:

I've been trying to write to the avro format with pyflink 1.12.2 on ubuntu, I've tested my code with an iterator writing to csv and everything works as expected. Reading through the flink documentation I see that I should add jar dependencies to work with avro. I downloaded three jar files that I believe are required for avro like so:

table_env\
    .get_config()\
    .get_configuration()\
    .set_string(
    "pipeline.jars",
    rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar"
)

I suspect I'm not loading the jar files correctly, but it's unclear what I'm supposed to do as I'm not familiar with java and when I switch the sink format to avro I get some unexpected errors:
Py4JJavaError: An error occurred while calling o746.executeInsert.
: java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
	at org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
	at org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
	at org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
	at org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
	at org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
	at org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
	at org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
	at org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
	at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
	at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
	at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
	at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
	at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
	at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)

My sample code as follows:

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, EnvironmentSettings

env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)

table_env\
    .get_config()\
    .get_configuration()\
    .set_string(
    "pipeline.jars",
    rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar"
)

table = table_env.from_elements(
    a,
    schema=DataTypes.ROW([
        DataTypes.FIELD('text', DataTypes.STRING()),
        DataTypes.FIELD('text1', DataTypes.STRING())
    ])
)

sink_ddl = f"""
    create table Results(
        a STRING,
        b STRING
    ) with (
        'connector' = 'filesystem',
        'path' = '{result_path}',
        'format' = 'avro'
    )
"""

table_env.execute_sql(sink_ddl)
table.execute_insert("Results").wait()

Could someone help or point me in the right direction to look?

Reply | Threaded
Open this post in threaded view
|

Re: Writing to Avro from pyflink

Dian Fu
Hi Eddie,

I have tried your program with the following changes and it could execute successfully:
- Replace `rf"<a href="file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar”`" class="">file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar”` with rf`"file:///Users/dianfu/code/src/apache/flink/flink-sql-avro-1.12.3.jar”`
- Use flink-sql-avro-1.12.3.jar [1] instead of flink-sql-avro-1.12.2.jar as I encountered issue FLINK-21012 [2] which has been addressed in 1.12.3 

For your problem, I suspect if `file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar` really exists. Could you double check that?


Regards,
Dian

2021年4月25日 下午11:56,Edward Yang <[hidden email]> 写道:

Hi Dian,

I tried your suggestion but had the same error message unfortunately. I also tried file:/ and file:// with the same error, not sure what's going on, I assume writing to avro works fine in java and scala?

Eddie

On Sat, Apr 24, 2021 at 10:03 PM Dian Fu <[hidden email]> wrote:
I guess you only need file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jarCould you remove flink-avro-1.12.2.jar and avro-1.10.2.jar and try again? 

Regards,
Dian

2021年4月24日 上午8:29,Edward Yang <[hidden email]> 写道:

I've been trying to write to the avro format with pyflink 1.12.2 on ubuntu, I've tested my code with an iterator writing to csv and everything works as expected. Reading through the flink documentation I see that I should add jar dependencies to work with avro. I downloaded three jar files that I believe are required for avro like so:

table_env\
    .get_config()\
    .get_configuration()\
    .set_string(
    "pipeline.jars",
    rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar"
)

I suspect I'm not loading the jar files correctly, but it's unclear what I'm supposed to do as I'm not familiar with java and when I switch the sink format to avro I get some unexpected errors:
Py4JJavaError: An error occurred while calling o746.executeInsert.
: java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
	at org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
	at org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
	at org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
	at org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
	at org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
	at org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
	at org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
	at org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
	at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
	at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
	at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
	at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
	at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
	at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)

My sample code as follows:

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, EnvironmentSettings

env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)

table_env\
    .get_config()\
    .get_configuration()\
    .set_string(
    "pipeline.jars",
    rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar"
)

table = table_env.from_elements(
    a,
    schema=DataTypes.ROW([
        DataTypes.FIELD('text', DataTypes.STRING()),
        DataTypes.FIELD('text1', DataTypes.STRING())
    ])
)

sink_ddl = f"""
    create table Results(
        a STRING,
        b STRING
    ) with (
        'connector' = 'filesystem',
        'path' = '{result_path}',
        'format' = 'avro'
    )
"""

table_env.execute_sql(sink_ddl)
table.execute_insert("Results").wait()

Could someone help or point me in the right direction to look?


Reply | Threaded
Open this post in threaded view
|

Re: Writing to Avro from pyflink

ryeyoo
Hi Dian,

Thanks for trying it out, it ruled out a problem with the python code. I double checked the jar path and only included the jar you referenced without any luck. However, I tried creating a python 3.7 (had 3.8) environment for pyflink and the code worked without any errors! 


On Sun, Apr 25, 2021, 10:09 PM Dian Fu <[hidden email]> wrote:
Hi Eddie,

I have tried your program with the following changes and it could execute successfully:
- Use flink-sql-avro-1.12.3.jar [1] instead of flink-sql-avro-1.12.2.jar as I encountered issue FLINK-21012 [2] which has been addressed in 1.12.3 

For your problem, I suspect if `file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar` really exists. Could you double check that?


Regards,
Dian

2021年4月25日 下午11:56,Edward Yang <[hidden email]> 写道:

Hi Dian,

I tried your suggestion but had the same error message unfortunately. I also tried file:/ and file:// with the same error, not sure what's going on, I assume writing to avro works fine in java and scala?

Eddie

On Sat, Apr 24, 2021 at 10:03 PM Dian Fu <[hidden email]> wrote:
I guess you only need file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jarCould you remove flink-avro-1.12.2.jar and avro-1.10.2.jar and try again? 

Regards,
Dian

2021年4月24日 上午8:29,Edward Yang <[hidden email]> 写道:

I've been trying to write to the avro format with pyflink 1.12.2 on ubuntu, I've tested my code with an iterator writing to csv and everything works as expected. Reading through the flink documentation I see that I should add jar dependencies to work with avro. I downloaded three jar files that I believe are required for avro like so:

table_env\
    .get_config()\
    .get_configuration()\
    .set_string(
    "pipeline.jars",
    rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar"
)

I suspect I'm not loading the jar files correctly, but it's unclear what I'm supposed to do as I'm not familiar with java and when I switch the sink format to avro I get some unexpected errors:
Py4JJavaError: An error occurred while calling o746.executeInsert.
: java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
	at org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
	at org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
	at org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
	at org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
	at org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
	at org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
	at org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
	at org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
	at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
	at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
	at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
	at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
	at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
	at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)

My sample code as follows:

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, EnvironmentSettings

env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)

table_env\
    .get_config()\
    .get_configuration()\
    .set_string(
    "pipeline.jars",
    rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar"
)

table = table_env.from_elements(
    a,
    schema=DataTypes.ROW([
        DataTypes.FIELD('text', DataTypes.STRING()),
        DataTypes.FIELD('text1', DataTypes.STRING())
    ])
)

sink_ddl = f"""
    create table Results(
        a STRING,
        b STRING
    ) with (
        'connector' = 'filesystem',
        'path' = '{result_path}',
        'format' = 'avro'
    )
"""

table_env.execute_sql(sink_ddl)
table.execute_insert("Results").wait()

Could someone help or point me in the right direction to look?