Avro format in pyFlink

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Avro format in pyFlink

rodrigobrochado
Hi guys,

Could you help me setting a Kafka sink connector using Avro format? I'm using pyFlink 1.11, OpenJDK 11, and executing the job locally (python my_script.py).

My code is simple with a call to a UDTF that now only yields a fixed small binary. I use source and sink kafka connectors. The source uses Json format that work as expected. The sink works with the Csv format:

(t_env.connect( # declare the external system to connect to
Kafka()
.version("universal")
.topic("output")
.property("bootstrap.servers", "localhost:9092")
.property("zookeeper.connect", "localhost:2181")
) \
.with_format( # declare a format for this system
Csv() # Csv convert bytes to base64 string....
) \
.with_schema( # declare the schema of the table
Schema()
.field("outf", DataTypes.BYTES())
) \
.create_temporary_table("mySink"))

But the Csv format converts the bytes (varbinary) to base64 string as expected, but is not desired.
With the Avro format, I just get errors.

- Just replacing Csv() on the code above for Avro(), and adding Avro's depency with
conf = t_env.get_config().get_configuration()
conf.set_string("pipeline.jars", "file://<path_to_kafka_connector>.jar;file://<path_to_kafka_connector>.jar";)
I've got:
org.apache.flink.table.api.ValidationException: A definition of an Avro specific record class or Avro schema is required.

- After looking in the pyFlink source code, I've also passed an avro_schema argument to the constructor:  Avro(avro_schema=<my_schema_in_string>) and got
java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter

- Using the SQL (DDL) declaration documented in [1],  I've got the same last error.

I also have some doubts about how to create the schema, but I need the Avro to works first.


Thanks,
Rodrigo

Reply | Threaded
Open this post in threaded view
|

Re: Avro format in pyFlink

Xingbo Huang
Hi Rodrigo,

Flink doesn't support an avro uber jar, so you need to add all dependency jars manually, such as avro, jackson-core-asl, jackson-mapper-asl and joda-time in release-1.11.
However, I found that there was a JIRA[1] that provided a default version of avro uber jar a few days ago.


[1] https://issues.apache.org/jira/browse/FLINK-18802

Best,
Xingbo

Rodrigo de Souza Oliveira Brochado <[hidden email]> 于2020年8月11日周二 上午7:30写道:
Hi guys,

Could you help me setting a Kafka sink connector using Avro format? I'm using pyFlink 1.11, OpenJDK 11, and executing the job locally (python my_script.py).

My code is simple with a call to a UDTF that now only yields a fixed small binary. I use source and sink kafka connectors. The source uses Json format that work as expected. The sink works with the Csv format:

(t_env.connect( # declare the external system to connect to
Kafka()
.version("universal")
.topic("output")
.property("bootstrap.servers", "localhost:9092")
.property("zookeeper.connect", "localhost:2181")
) \
.with_format( # declare a format for this system
Csv() # Csv convert bytes to base64 string....
) \
.with_schema( # declare the schema of the table
Schema()
.field("outf", DataTypes.BYTES())
) \
.create_temporary_table("mySink"))

But the Csv format converts the bytes (varbinary) to base64 string as expected, but is not desired.
With the Avro format, I just get errors.

- Just replacing Csv() on the code above for Avro(), and adding Avro's depency with
conf = t_env.get_config().get_configuration()
conf.set_string("pipeline.jars", "file://<path_to_kafka_connector>.jar;file://<path_to_kafka_connector>.jar";)
I've got:
org.apache.flink.table.api.ValidationException: A definition of an Avro specific record class or Avro schema is required.

- After looking in the pyFlink source code, I've also passed an avro_schema argument to the constructor:  Avro(avro_schema=<my_schema_in_string>) and got
java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter

- Using the SQL (DDL) declaration documented in [1],  I've got the same last error.

I also have some doubts about how to create the schema, but I need the Avro to works first.


Thanks,
Rodrigo

Reply | Threaded
Open this post in threaded view
|

Re: Avro format in pyFlink

rodrigobrochado
This post was updated on .
Thank you Xingbo.

I've managed to get it working adding the Avro jar and the three artifacts
from the *com.fasterxml.jackson.core* group [1]. Is it required to also add
the jackson-mapper-asl jar? About joda-time, I suppose that it'll not be
required, as I won't use date types in my Avro schema.

About using Avro, I'd like to know if pyFlink supports the Avro Union Type.
I've found this old e-mail [2] that mentions that, but for java. If pyFlink
supports it, how would I declare the schema? Can I define the schema on an
external .avsc file and import it, maybe through Avro(avro_schema)?

[1] https://search.maven.org/search?q=g:com.fasterxml.jackson.core
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/AVRO-Union-type-support-in-Flink-td14318.html

Thanks,
Rodrigo

On Tue, Aug 11, 2020 at 12:09 AM Xingbo Huang <hxbks2ks@gmail.com> wrote:

> Hi Rodrigo,
>
> Flink doesn't support an avro uber jar, so you need to add all dependency
> jars manually, such as avro, jackson-core-asl, jackson-mapper-asl and
> joda-time in release-1.11.
> However, I found that there was a JIRA[1] that provided a default version
> of avro uber jar a few days ago.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-18802
>
> Best,
> Xingbo
>
> Rodrigo de Souza Oliveira Brochado <rodrigo.brochado@predito.com.br>
> 于2020年8月11日周二 上午7:30写道:
>
>> Hi guys,
>>
>> Could you help me setting a Kafka sink connector using Avro format? I'm
>> using pyFlink 1.11, OpenJDK 11, and executing the job locally (python
>> my_script.py).
>>
>> My code is simple with a call to a UDTF that now only yields a fixed
>> small binary. I use source and sink kafka connectors. The source uses Json
>> format that work as expected. The sink works with the Csv format:
>>
>> (t_env.connect( # declare the external system to connect to
>> Kafka()
>> .version("universal")
>> .topic("output")
>> .property("bootstrap.servers", "localhost:9092")
>> .property("zookeeper.connect", "localhost:2181")
>> ) \
>> .with_format( # declare a format for this system
>> Csv() # Csv convert bytes to base64 string....
>> ) \
>> .with_schema( # declare the schema of the table
>> Schema()
>> .field("outf", DataTypes.BYTES())
>> ) \
>> .create_temporary_table("mySink"))
>>
>> But the Csv format converts the bytes (varbinary) to base64 string as
>> expected, but is not desired.
>> With the Avro format, I just get errors.
>>
>> - Just replacing Csv() on the code above for Avro(), and adding Avro's
>> depency with
>> conf = t_env.get_config().get_configuration()
>> conf.set_string("pipeline.jars",
>> "file://<path_to_kafka_connector>.jar;file://<path_to_kafka_connector>.jar";
>> )
>> I've got:
>> *org.apache.flink.table.api.ValidationException: A definition of an Avro
>> specific record class or Avro schema is required.*
>>
>> - After looking in the pyFlink source code, I've also passed an
>> avro_schema argument to the constructor:
>> Avro(avro_schema=<my_schema_in_string>) and got
>> *java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter*
>>
>> - Using the SQL (DDL) declaration documented in [1],  I've got the same
>> last error.
>>
>> I also have some doubts about how to create the schema, but I need the
>> Avro to works first.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html
>>
>> Thanks,
>> Rodrigo
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Avro format in pyFlink

rodrigobrochado

The upload of the schema through Avro(avro_schema) worked, but I had to
select one type from the union type to put in Schema.field(field_type)
inside t_env.connect(). If my dict has long and double values, and I declare
Schema.field(DataTypes.Double()), all the int values are cast to double. My
maps will also have string values and the job will crash using this
configuration.

Is there any workaround? If not, I thought of serializing it on the UDTF
using the python avro lib and sending it as bytes to the sink. The problem
is that all serialization formats change the original schema: the CSV format
use the base64 encoding for bytes; the JSON format adds a key, to form a
key/value pair, where the value will the binary; and the Avro format adds 3
bytes at the beginning of the message.

Thanks,
Rodrigo



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Avro format in pyFlink

Xingbo Huang
Hi Rodrigo,
For the connectors, Pyflink just wraps the java implementation.
And I am not an expert on Avro and corresponding connectors, but as far as I know, DataTypes really cannot declare the type of union you mentioned. Regarding the bytes encoding you mentioned, I actually have no good suggestions.
I think we need a Avro expert to answer your question. 

Best,
Xingbo

rodrigobrochado <[hidden email]> 于2020年8月14日周五 上午10:07写道:

The upload of the schema through Avro(avro_schema) worked, but I had to
select one type from the union type to put in Schema.field(field_type)
inside t_env.connect(). If my dict has long and double values, and I declare
Schema.field(DataTypes.Double()), all the int values are cast to double. My
maps will also have string values and the job will crash using this
configuration.

Is there any workaround? If not, I thought of serializing it on the UDTF
using the python avro lib and sending it as bytes to the sink. The problem
is that all serialization formats change the original schema: the CSV format
use the base64 encoding for bytes; the JSON format adds a key, to form a
key/value pair, where the value will the binary; and the Avro format adds 3
bytes at the beginning of the message.

Thanks,
Rodrigo



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/