ByteSerializationSchema in PyFlink

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

ByteSerializationSchema in PyFlink

Wouter Zorgdrager
Hi all,

I have a PyFlink job connected to a KafkaConsumer and Producer. I want to directly work with the bytes from and to Kafka because I want to serialize/deserialize in my Python code rather than the JVM environment. Therefore, I can't use the SimpleStringSchema for (de)serialization (the messages aren't strings anyways). I've tried to create a TypeInformationSerializer with Types.BYTE(), see the code snippet below:

class ByteSerializer(SerializationSchema, DeserializationSchema):
    def __init__(self, execution_environment):
        gate_way = get_gateway()

        j_byte_string_schema = gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
            Types.BYTE().get_java_type_info(),
            get_j_env_configuration(execution_environment),
        )
        SerializationSchema.__init__(self, j_serialization_schema=j_byte_string_schema)
        DeserializationSchema.__init__(
            self, j_deserialization_schema=j_byte_string_schema
        )
The ByteSerializer is used like this:

return FlinkKafkaConsumer(
            ["client_request", "internal"],
            ByteSerializer(self.env._j_stream_execution_environment),
            {
                "bootstrap.servers": "localhost:9092",
                "auto.offset.reset": "latest",
                "group.id": str(uuid.uuid4()),
            },
        )

However, this does not seem to work. I think the error is thrown in the JVM environment, which makes it a bit hard to parse in my Python stack trace,
but I think it boils down to this stacktrace part:

answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat java.base/java.lang.Thread.run(Thread.java:834)\\n'
gateway_client = <py4j.java_gateway.GatewayClient object at 0x140c43550>
target_id = None
name = 'org.apache.flink.api.common.serialization.TypeInformationSerializationSchema'

    def get_return_value(answer, gateway_client, target_id=None, name=None):
        """Converts an answer received from the Java gateway into a Python object.
   
        For example, string representation of integers are converted to Python
        integer, string representation of objects are converted to JavaObject
        instances, etc.
   
        :param answer: the string returned by the Java gateway
        :param gateway_client: the gateway client used to communicate with the Java
            Gateway. Only necessary if the answer is a reference (e.g., object,
            list, map)
        :param target_id: the name of the object from which the answer comes from
            (e.g., *object1* in `object1.hello()`). Optional.
        :param name: the name of the member from which the answer comes from
            (e.g., *hello* in `object1.hello()`). Optional.
        """
        if is_error(answer)[0]:
            if len(answer) > 1:
                type = answer[1]
                value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
                if answer[1] == REFERENCE_TYPE:
                    raise Py4JJavaError(
                        "An error occurred while calling {0}{1}{2}.\n".
                        format(target_id, ".", name), value)
                else:
>                   raise Py4JError(
                        "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
                        format(target_id, ".", name, value))
E                   py4j.protocol.Py4JError: An error occurred while calling None.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema. Trace:
E                   org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class org.apache.flink.configuration.Configuration]) does not exist
E                   at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
E                   at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
E                   at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:237)
E                   at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
E                   at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
E                   at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
E                   at java.base/java.lang.Thread.run(Thread.java:834) I hope you can help me out!

Thanks in advance,
Wouter
Reply | Threaded
Open this post in threaded view
|

Re: ByteSerializationSchema in PyFlink

Dian Fu
Hi Wouter,

E                   org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class org.apache.flink.configuration.Configuration]) does not exist

As the exception indicate, the constructor doesn’t exists.


Could you try with the following:

```
j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info()
j_type_serializer= j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig())
j_byte_string_schema = gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info, j_type_serializer)
```

Regards,
Dian

2021年6月3日 下午8:51,Wouter Zorgdrager <[hidden email]> 写道:

Hi all,

I have a PyFlink job connected to a KafkaConsumer and Producer. I want to directly work with the bytes from and to Kafka because I want to serialize/deserialize in my Python code rather than the JVM environment. Therefore, I can't use the SimpleStringSchema for (de)serialization (the messages aren't strings anyways). I've tried to create a TypeInformationSerializer with Types.BYTE(), see the code snippet below:

class ByteSerializer(SerializationSchema, DeserializationSchema):
    def __init__(self, execution_environment):
        gate_way = get_gateway()

        j_byte_string_schema = gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
            Types.BYTE().get_java_type_info(),
            get_j_env_configuration(execution_environment),
        )
        SerializationSchema.__init__(self, j_serialization_schema=j_byte_string_schema)
        DeserializationSchema.__init__(
            self, j_deserialization_schema=j_byte_string_schema
        )
The ByteSerializer is used like this:

return FlinkKafkaConsumer(
            ["client_request", "internal"],
            ByteSerializer(self.env._j_stream_execution_environment),
            {
                "bootstrap.servers": "localhost:9092",
                "auto.offset.reset": "latest",
                "group.id": str(uuid.uuid4()),
            },
        )

However, this does not seem to work. I think the error is thrown in the JVM environment, which makes it a bit hard to parse in my Python stack trace,
but I think it boils down to this stacktrace part:

answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat java.base/java.lang.Thread.run(Thread.java:834)\\n'
gateway_client = <py4j.java_gateway.GatewayClient object at 0x140c43550>
target_id = None
name = 'org.apache.flink.api.common.serialization.TypeInformationSerializationSchema'

    def get_return_value(answer, gateway_client, target_id=None, name=None):
        """Converts an answer received from the Java gateway into a Python object.
   
        For example, string representation of integers are converted to Python
        integer, string representation of objects are converted to JavaObject
        instances, etc.
   
        :param answer: the string returned by the Java gateway
        :param gateway_client: the gateway client used to communicate with the Java
            Gateway. Only necessary if the answer is a reference (e.g., object,
            list, map)
        :param target_id: the name of the object from which the answer comes from
            (e.g., *object1* in `object1.hello()`). Optional.
        :param name: the name of the member from which the answer comes from
            (e.g., *hello* in `object1.hello()`). Optional.
        """
        if is_error(answer)[0]:
            if len(answer) > 1:
                type = answer[1]
                value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
                if answer[1] == REFERENCE_TYPE:
                    raise Py4JJavaError(
                        "An error occurred while calling {0}{1}{2}.\n".
                        format(target_id, ".", name), value)
                else:
>                   raise Py4JError(
                        "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
                        format(target_id, ".", name, value))
E                   py4j.protocol.Py4JError: An error occurred while calling None.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema. Trace:
E                   org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class org.apache.flink.configuration.Configuration]) does not exist
E                   at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
E                   at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
E                   at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:237)
E                   at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
E                   at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
E                   at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
E                   at java.base/java.lang.Thread.run(Thread.java:834) I hope you can help me out!

Thanks in advance,
Wouter

Reply | Threaded
Open this post in threaded view
|

Re: ByteSerializationSchema in PyFlink

Wouter Zorgdrager
Hi Dian, all,

Thanks for your suggestion. Unfortunately, it does not seem to work. I get the following exception:

Caused by: java.lang.NegativeArraySizeException: -2147183315
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:31)
at org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.deserialize(TypeInformationSerializationSchema.java:92)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

To be more precise, the messages in my Kafka topic are pickled Python objects. Maybe that is the reason for the exception, I also tried using Types.PICKLED_BYTE_ARRAY().get_java_type_info() but I think that has the same serializer because I get the same exception.

Any suggestions? Thanks for your help!

Regards,
Wouter

On Fri, 4 Jun 2021 at 08:24, Dian Fu <[hidden email]> wrote:
Hi Wouter,

E                   org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class org.apache.flink.configuration.Configuration]) does not exist

As the exception indicate, the constructor doesn’t exists.


Could you try with the following:

```
j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info()
j_type_serializer= j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig())
j_byte_string_schema = gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info, j_type_serializer)
```

Regards,
Dian

2021年6月3日 下午8:51,Wouter Zorgdrager <[hidden email]> 写道:

Hi all,

I have a PyFlink job connected to a KafkaConsumer and Producer. I want to directly work with the bytes from and to Kafka because I want to serialize/deserialize in my Python code rather than the JVM environment. Therefore, I can't use the SimpleStringSchema for (de)serialization (the messages aren't strings anyways). I've tried to create a TypeInformationSerializer with Types.BYTE(), see the code snippet below:

class ByteSerializer(SerializationSchema, DeserializationSchema):
    def __init__(self, execution_environment):
        gate_way = get_gateway()

        j_byte_string_schema = gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
            Types.BYTE().get_java_type_info(),
            get_j_env_configuration(execution_environment),
        )
        SerializationSchema.__init__(self, j_serialization_schema=j_byte_string_schema)
        DeserializationSchema.__init__(
            self, j_deserialization_schema=j_byte_string_schema
        )
The ByteSerializer is used like this:

return FlinkKafkaConsumer(
            ["client_request", "internal"],
            ByteSerializer(self.env._j_stream_execution_environment),
            {
                "bootstrap.servers": "localhost:9092",
                "auto.offset.reset": "latest",
                "group.id": str(uuid.uuid4()),
            },
        )

However, this does not seem to work. I think the error is thrown in the JVM environment, which makes it a bit hard to parse in my Python stack trace,
but I think it boils down to this stacktrace part:

answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat java.base/java.lang.Thread.run(Thread.java:834)\\n'
gateway_client = <py4j.java_gateway.GatewayClient object at 0x140c43550>
target_id = None
name = 'org.apache.flink.api.common.serialization.TypeInformationSerializationSchema'

    def get_return_value(answer, gateway_client, target_id=None, name=None):
        """Converts an answer received from the Java gateway into a Python object.
   
        For example, string representation of integers are converted to Python
        integer, string representation of objects are converted to JavaObject
        instances, etc.
   
        :param answer: the string returned by the Java gateway
        :param gateway_client: the gateway client used to communicate with the Java
            Gateway. Only necessary if the answer is a reference (e.g., object,
            list, map)
        :param target_id: the name of the object from which the answer comes from
            (e.g., *object1* in `object1.hello()`). Optional.
        :param name: the name of the member from which the answer comes from
            (e.g., *hello* in `object1.hello()`). Optional.
        """
        if is_error(answer)[0]:
            if len(answer) > 1:
                type = answer[1]
                value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
                if answer[1] == REFERENCE_TYPE:
                    raise Py4JJavaError(
                        "An error occurred while calling {0}{1}{2}.\n".
                        format(target_id, ".", name), value)
                else:
>                   raise Py4JError(
                        "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
                        format(target_id, ".", name, value))
E                   py4j.protocol.Py4JError: An error occurred while calling None.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema. Trace:
E                   org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class org.apache.flink.configuration.Configuration]) does not exist
E                   at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
E                   at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
E                   at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:237)
E                   at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
E                   at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
E                   at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
E                   at java.base/java.lang.Thread.run(Thread.java:834) I hope you can help me out!

Thanks in advance,
Wouter

Reply | Threaded
Open this post in threaded view
|

Re: ByteSerializationSchema in PyFlink

Wouter Zorgdrager
Hi Dian, all,

The way I resolved right now, is to write my own custom serializer which only maps from bytes to bytes. See the code below:
public class KafkaBytesSerializer implements SerializationSchema<byte[]>, DeserializationSchema<byte[]> {

    @Override
    public byte[] deserialize(byte[] bytes) throws IOException {
        return bytes;
    }

    @Override
    public boolean isEndOfStream(byte[] bytes) {
        return false;
    }

    @Override
    public byte[] serialize(byte[] bytes) {
        return bytes;
    }

    @Override
    public TypeInformation<byte[]> getProducedType() {
        return TypeInformation.of(byte[].class);
    }
}


This code is packaged in a jar and uploaded through env.add_jars. That works like a charm! 

Thanks for the help!
Wouter

On Fri, 4 Jun 2021 at 14:40, Wouter Zorgdrager <[hidden email]> wrote:
Hi Dian, all,

Thanks for your suggestion. Unfortunately, it does not seem to work. I get the following exception:

Caused by: java.lang.NegativeArraySizeException: -2147183315
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:31)
at org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.deserialize(TypeInformationSerializationSchema.java:92)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

To be more precise, the messages in my Kafka topic are pickled Python objects. Maybe that is the reason for the exception, I also tried using Types.PICKLED_BYTE_ARRAY().get_java_type_info() but I think that has the same serializer because I get the same exception.

Any suggestions? Thanks for your help!

Regards,
Wouter

On Fri, 4 Jun 2021 at 08:24, Dian Fu <[hidden email]> wrote:
Hi Wouter,

E                   org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class org.apache.flink.configuration.Configuration]) does not exist

As the exception indicate, the constructor doesn’t exists.


Could you try with the following:

```
j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info()
j_type_serializer= j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig())
j_byte_string_schema = gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info, j_type_serializer)
```

Regards,
Dian

2021年6月3日 下午8:51,Wouter Zorgdrager <[hidden email]> 写道:

Hi all,

I have a PyFlink job connected to a KafkaConsumer and Producer. I want to directly work with the bytes from and to Kafka because I want to serialize/deserialize in my Python code rather than the JVM environment. Therefore, I can't use the SimpleStringSchema for (de)serialization (the messages aren't strings anyways). I've tried to create a TypeInformationSerializer with Types.BYTE(), see the code snippet below:

class ByteSerializer(SerializationSchema, DeserializationSchema):
    def __init__(self, execution_environment):
        gate_way = get_gateway()

        j_byte_string_schema = gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
            Types.BYTE().get_java_type_info(),
            get_j_env_configuration(execution_environment),
        )
        SerializationSchema.__init__(self, j_serialization_schema=j_byte_string_schema)
        DeserializationSchema.__init__(
            self, j_deserialization_schema=j_byte_string_schema
        )
The ByteSerializer is used like this:

return FlinkKafkaConsumer(
            ["client_request", "internal"],
            ByteSerializer(self.env._j_stream_execution_environment),
            {
                "bootstrap.servers": "localhost:9092",
                "auto.offset.reset": "latest",
                "group.id": str(uuid.uuid4()),
            },
        )

However, this does not seem to work. I think the error is thrown in the JVM environment, which makes it a bit hard to parse in my Python stack trace,
but I think it boils down to this stacktrace part:

answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat java.base/java.lang.Thread.run(Thread.java:834)\\n'
gateway_client = <py4j.java_gateway.GatewayClient object at 0x140c43550>
target_id = None
name = 'org.apache.flink.api.common.serialization.TypeInformationSerializationSchema'

    def get_return_value(answer, gateway_client, target_id=None, name=None):
        """Converts an answer received from the Java gateway into a Python object.
   
        For example, string representation of integers are converted to Python
        integer, string representation of objects are converted to JavaObject
        instances, etc.
   
        :param answer: the string returned by the Java gateway
        :param gateway_client: the gateway client used to communicate with the Java
            Gateway. Only necessary if the answer is a reference (e.g., object,
            list, map)
        :param target_id: the name of the object from which the answer comes from
            (e.g., *object1* in `object1.hello()`). Optional.
        :param name: the name of the member from which the answer comes from
            (e.g., *hello* in `object1.hello()`). Optional.
        """
        if is_error(answer)[0]:
            if len(answer) > 1:
                type = answer[1]
                value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
                if answer[1] == REFERENCE_TYPE:
                    raise Py4JJavaError(
                        "An error occurred while calling {0}{1}{2}.\n".
                        format(target_id, ".", name), value)
                else:
>                   raise Py4JError(
                        "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
                        format(target_id, ".", name, value))
E                   py4j.protocol.Py4JError: An error occurred while calling None.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema. Trace:
E                   org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class org.apache.flink.configuration.Configuration]) does not exist
E                   at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
E                   at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
E                   at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:237)
E                   at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
E                   at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
E                   at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
E                   at java.base/java.lang.Thread.run(Thread.java:834) I hope you can help me out!

Thanks in advance,
Wouter

Reply | Threaded
Open this post in threaded view
|

Re: ByteSerializationSchema in PyFlink

Dian Fu
Hi Wouter,

Great to hear and thanks for the sharing!

Regards,
Dian

2021年6月8日 下午4:44,Wouter Zorgdrager <[hidden email]> 写道:

Hi Dian, all,

The way I resolved right now, is to write my own custom serializer which only maps from bytes to bytes. See the code below:
public class KafkaBytesSerializer implements SerializationSchema<byte[]>, DeserializationSchema<byte[]> {

    @Override
    public byte[] deserialize(byte[] bytes) throws IOException {
        return bytes;
    }

    @Override
    public boolean isEndOfStream(byte[] bytes) {
        return false;
    }

    @Override
    public byte[] serialize(byte[] bytes) {
        return bytes;
    }

    @Override
    public TypeInformation<byte[]> getProducedType() {
        return TypeInformation.of(byte[].class);
    }
}


This code is packaged in a jar and uploaded through env.add_jars. That works like a charm! 

Thanks for the help!
Wouter

On Fri, 4 Jun 2021 at 14:40, Wouter Zorgdrager <[hidden email]> wrote:
Hi Dian, all,

Thanks for your suggestion. Unfortunately, it does not seem to work. I get the following exception:

Caused by: java.lang.NegativeArraySizeException: -2147183315
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:31)
at org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.deserialize(TypeInformationSerializationSchema.java:92)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

To be more precise, the messages in my Kafka topic are pickled Python objects. Maybe that is the reason for the exception, I also tried using Types.PICKLED_BYTE_ARRAY().get_java_type_info() but I think that has the same serializer because I get the same exception.

Any suggestions? Thanks for your help!

Regards,
Wouter

On Fri, 4 Jun 2021 at 08:24, Dian Fu <[hidden email]> wrote:
Hi Wouter,

E                   org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class org.apache.flink.configuration.Configuration]) does not exist

As the exception indicate, the constructor doesn’t exists.


Could you try with the following:

```
j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info()
j_type_serializer= j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig())
j_byte_string_schema = gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info, j_type_serializer)
```

Regards,
Dian

2021年6月3日 下午8:51,Wouter Zorgdrager <[hidden email]> 写道:

Hi all,

I have a PyFlink job connected to a KafkaConsumer and Producer. I want to directly work with the bytes from and to Kafka because I want to serialize/deserialize in my Python code rather than the JVM environment. Therefore, I can't use the SimpleStringSchema for (de)serialization (the messages aren't strings anyways). I've tried to create a TypeInformationSerializer with Types.BYTE(), see the code snippet below:

class ByteSerializer(SerializationSchema, DeserializationSchema):
    def __init__(self, execution_environment):
        gate_way = get_gateway()

        j_byte_string_schema = gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
            Types.BYTE().get_java_type_info(),
            get_j_env_configuration(execution_environment),
        )
        SerializationSchema.__init__(self, j_serialization_schema=j_byte_string_schema)
        DeserializationSchema.__init__(
            self, j_deserialization_schema=j_byte_string_schema
        )
The ByteSerializer is used like this:

return FlinkKafkaConsumer(
            ["client_request", "internal"],
            ByteSerializer(self.env._j_stream_execution_environment),
            {
                "bootstrap.servers": "localhost:9092",
                "auto.offset.reset": "latest",
                "group.id": str(uuid.uuid4()),
            },
        )

However, this does not seem to work. I think the error is thrown in the JVM environment, which makes it a bit hard to parse in my Python stack trace,
but I think it boils down to this stacktrace part:

answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat java.base/java.lang.Thread.run(Thread.java:834)\\n'
gateway_client = <py4j.java_gateway.GatewayClient object at 0x140c43550>
target_id = None
name = 'org.apache.flink.api.common.serialization.TypeInformationSerializationSchema'

    def get_return_value(answer, gateway_client, target_id=None, name=None):
        """Converts an answer received from the Java gateway into a Python object.
   
        For example, string representation of integers are converted to Python
        integer, string representation of objects are converted to JavaObject
        instances, etc.
   
        :param answer: the string returned by the Java gateway
        :param gateway_client: the gateway client used to communicate with the Java
            Gateway. Only necessary if the answer is a reference (e.g., object,
            list, map)
        :param target_id: the name of the object from which the answer comes from
            (e.g., *object1* in `object1.hello()`). Optional.
        :param name: the name of the member from which the answer comes from
            (e.g., *hello* in `object1.hello()`). Optional.
        """
        if is_error(answer)[0]:
            if len(answer) > 1:
                type = answer[1]
                value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
                if answer[1] == REFERENCE_TYPE:
                    raise Py4JJavaError(
                        "An error occurred while calling {0}{1}{2}.\n".
                        format(target_id, ".", name), value)
                else:
>                   raise Py4JError(
                        "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
                        format(target_id, ".", name, value))
E                   py4j.protocol.Py4JError: An error occurred while calling None.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema. Trace:
E                   org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class org.apache.flink.configuration.Configuration]) does not exist
E                   at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
E                   at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
E                   at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:237)
E                   at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
E                   at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
E                   at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
E                   at java.base/java.lang.Thread.run(Thread.java:834) I hope you can help me out!

Thanks in advance,
Wouter