Hi guys, Could you help me setting a Kafka sink connector using Avro format? I'm using pyFlink 1.11, OpenJDK 11, and executing the job locally (python my_script.py). My code is simple with a call to a UDTF that now only yields a fixed small binary. I use source and sink kafka connectors. The source uses Json format that work as expected. The sink works with the Csv format: (t_env.connect( # declare the external system to connect to Kafka() .version("universal") .topic("output") .property("bootstrap.servers", "localhost:9092") .property("zookeeper.connect", "localhost:2181") ) \ .with_format( # declare a format for this system Csv() # Csv convert bytes to base64 string.... ) \ .with_schema( # declare the schema of the table Schema() .field("outf", DataTypes.BYTES()) ) \ .create_temporary_table("mySink")) But the Csv format converts the bytes (varbinary) to base64 string as expected, but is not desired. With the Avro format, I just get errors. - Just replacing Csv() on the code above for Avro(), and adding Avro's depency with conf = t_env.get_config().get_configuration() conf.set_string("pipeline.jars", "file://<path_to_kafka_connector>.jar;file://<path_to_kafka_connector>.jar";) org.apache.flink.table.api.ValidationException: A definition of an Avro specific record class or Avro schema is required. - After looking in the pyFlink source code, I've also passed an avro_schema argument to the constructor: Avro(avro_schema=<my_schema_in_string>) and got java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter - Using the SQL (DDL) declaration documented in [1], I've got the same last error. I also have some doubts about how to create the schema, but I need the Avro to works first. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html Thanks, Rodrigo |
Hi Rodrigo, Flink doesn't support an avro uber jar, so you need to add all dependency jars manually, such as avro, jackson-core-asl, jackson-mapper-asl and joda-time in release-1.11. However, I found that there was a JIRA[1] that provided a default version of avro uber jar a few days ago. [1] https://issues.apache.org/jira/browse/FLINK-18802 Best, Xingbo Rodrigo de Souza Oliveira Brochado <[hidden email]> 于2020年8月11日周二 上午7:30写道:
|
This post was updated on .
Thank you Xingbo.
I've managed to get it working adding the Avro jar and the three artifacts from the *com.fasterxml.jackson.core* group [1]. Is it required to also add the jackson-mapper-asl jar? About joda-time, I suppose that it'll not be required, as I won't use date types in my Avro schema. About using Avro, I'd like to know if pyFlink supports the Avro Union Type. I've found this old e-mail [2] that mentions that, but for java. If pyFlink supports it, how would I declare the schema? Can I define the schema on an external .avsc file and import it, maybe through Avro(avro_schema)? [1] https://search.maven.org/search?q=g:com.fasterxml.jackson.core [2] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/AVRO-Union-type-support-in-Flink-td14318.html Thanks, Rodrigo On Tue, Aug 11, 2020 at 12:09 AM Xingbo Huang <hxbks2ks@gmail.com> wrote: > Hi Rodrigo, > > Flink doesn't support an avro uber jar, so you need to add all dependency > jars manually, such as avro, jackson-core-asl, jackson-mapper-asl and > joda-time in release-1.11. > However, I found that there was a JIRA[1] that provided a default version > of avro uber jar a few days ago. > > > [1] https://issues.apache.org/jira/browse/FLINK-18802 > > Best, > Xingbo > > Rodrigo de Souza Oliveira Brochado <rodrigo.brochado@predito.com.br> > 于2020年8月11日周二 上午7:30写道: > >> Hi guys, >> >> Could you help me setting a Kafka sink connector using Avro format? I'm >> using pyFlink 1.11, OpenJDK 11, and executing the job locally (python >> my_script.py). >> >> My code is simple with a call to a UDTF that now only yields a fixed >> small binary. I use source and sink kafka connectors. The source uses Json >> format that work as expected. The sink works with the Csv format: >> >> (t_env.connect( # declare the external system to connect to >> Kafka() >> .version("universal") >> .topic("output") >> .property("bootstrap.servers", "localhost:9092") >> .property("zookeeper.connect", "localhost:2181") >> ) \ >> .with_format( # declare a format for this system >> Csv() # Csv convert bytes to base64 string.... >> ) \ >> .with_schema( # declare the schema of the table >> Schema() >> .field("outf", DataTypes.BYTES()) >> ) \ >> .create_temporary_table("mySink")) >> >> But the Csv format converts the bytes (varbinary) to base64 string as >> expected, but is not desired. >> With the Avro format, I just get errors. >> >> - Just replacing Csv() on the code above for Avro(), and adding Avro's >> depency with >> conf = t_env.get_config().get_configuration() >> conf.set_string("pipeline.jars", >> "file://<path_to_kafka_connector>.jar;file://<path_to_kafka_connector>.jar"; >> ) >> I've got: >> *org.apache.flink.table.api.ValidationException: A definition of an Avro >> specific record class or Avro schema is required.* >> >> - After looking in the pyFlink source code, I've also passed an >> avro_schema argument to the constructor: >> Avro(avro_schema=<my_schema_in_string>) and got >> *java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter* >> >> - Using the SQL (DDL) declaration documented in [1], I've got the same >> last error. >> >> I also have some doubts about how to create the schema, but I need the >> Avro to works first. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html >> >> Thanks, >> Rodrigo >> >> |
The upload of the schema through Avro(avro_schema) worked, but I had to select one type from the union type to put in Schema.field(field_type) inside t_env.connect(). If my dict has long and double values, and I declare Schema.field(DataTypes.Double()), all the int values are cast to double. My maps will also have string values and the job will crash using this configuration. Is there any workaround? If not, I thought of serializing it on the UDTF using the python avro lib and sending it as bytes to the sink. The problem is that all serialization formats change the original schema: the CSV format use the base64 encoding for bytes; the JSON format adds a key, to form a key/value pair, where the value will the binary; and the Avro format adds 3 bytes at the beginning of the message. Thanks, Rodrigo -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Rodrigo, For the connectors, Pyflink just wraps the java implementation. And I am not an expert on Avro and corresponding connectors, but as far as I know, DataTypes really cannot declare the type of union you mentioned. Regarding the bytes encoding you mentioned, I actually have no good suggestions. I think we need a Avro expert to answer your question. Best, Xingbo rodrigobrochado <[hidden email]> 于2020年8月14日周五 上午10:07写道:
|
Free forum by Nabble | Edit this page |