PyFlink:

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

PyFlink:

Schneider, Thilo

Dear list,

 

I’m having my first go at using Flink and quickly stumbled over a problem I find no easy way around. I hope you may help me.

 

I try to read an avro encoded kafka topic. Doing so, I do get a NoClassDefFoundError. Org.apache.avro.SchemaBuilder could not be found, but this should be included in the provided avro-1.9.2.jar. The jar is correctly picked up, as I do get “java.lang.ClassNotFoundException: org.apache.avro.generic.IndexedRecord” if I remove that dependency.

 

[…]

Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.avro.SchemaBuilder

                at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:240)

                at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:179)

                at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.open(AvroRowDataDeserializationSchema.java:136)

                at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:46)

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:694)

                at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

                at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)

                at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)

                at java.lang.Thread.run(Thread.java:748)

 

The code I am using is the following:

 

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(
environment_settings=env_settings)

jars = [
"flink-sql-connector-kafka_2.11-1.11.2.jar",
       
"flink-avro-1.11.2-sql-jar.jar",
       
"avro-1.9.2.jar"]

jar_base_path =
"file:///path/to/my/jars/"
table_env.get_config().get_configuration().set_string('pipeline.jars', ';'.join([jar_base_path + j for j in jars]))

table_env.execute_sql(
"""
CREATE TABLE test (
   
a STRING,
   
b INT,
   
c TIMESTAMP
) WITH (
'connector' = 'kafka',
'topic' = 'my_topic',
'properties.bootstrap.servers' = 'kafka.broker.com:9192',
'properties.group.id' = 'something',
'format' = 'avro'
)
"""
)

test = table_env.from_path(
'test')
test.to_pandas()

 

Any help would be appreciated.

 

Thanks in advance

Thilo


Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink:

Dian Fu
Hi Thilo,

You need also add the transitive dependencies of Avro, e.g. jackson-core-asl, jackson-mapper-asl, etc. It will provided a uber jar since 1.12, you could refer https://issues.apache.org/jira/browse/FLINK-18802 for more details.

Regards,
Dian

在 2020年10月17日,上午5:25,Schneider, Thilo <[hidden email]> 写道:

Dear list,
 
I’m having my first go at using Flink and quickly stumbled over a problem I find no easy way around. I hope you may help me.
 
I try to read an avro encoded kafka topic. Doing so, I do get a NoClassDefFoundError. Org.apache.avro.SchemaBuilder could not be found, but this should be included in the provided avro-1.9.2.jar. The jar is correctly picked up, as I do get “java.lang.ClassNotFoundException: org.apache.avro.generic.IndexedRecord” if I remove that dependency.
 
[…]
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.avro.SchemaBuilder
                at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:240)
                at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:179)
                at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.open(AvroRowDataDeserializationSchema.java:136)
                at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:46)
                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:694)
                at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
                at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
                at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
                at java.lang.Thread.run(Thread.java:748)
 
The code I am using is the following:
 
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(
environment_settings=env_settings)

jars = [
"flink-sql-connector-kafka_2.11-1.11.2.jar",
        
"flink-avro-1.11.2-sql-jar.jar",
        
"avro-1.9.2.jar"]

jar_base_path = 
"file:///path/to/my/jars/"
table_env.get_config().get_configuration().set_string('pipeline.jars',';'.join([jar_base_path + j for j in jars]))

table_env.execute_sql(
"""
CREATE TABLE test (
    
a STRING,
    
b INT,
    
c TIMESTAMP
) WITH (
'connector' = 'kafka',
'topic' = 'my_topic',
'properties.bootstrap.servers' = 'kafka.broker.com:9192',
'properties.group.id' = 'something',
'format' = 'avro'
)
"""
)

test = table_env.from_path(
'test')
test.to_pandas()
 
Any help would be appreciated. 
 
Thanks in advance
Thilo

Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang