Login  Register

Failed to register Protobuf Kryo serialization

classic Classic list List threaded Threaded
3 messages Options Options
Embed post
Permalink
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Failed to register Protobuf Kryo serialization

Svend Vanderveken
1 post


Hi all,

I'm failing to setup an example of wire serialization with Protobuf, could you help me figure out what I'm doing wrong?

I'm using a simple protobuf schema:
```
syntax = "proto3";

import "google/protobuf/wrappers.proto";
option java_multiple_files = true;
message DemoUserEvent {
Metadata metadata = 1;
oneof payload {
Created created = 10;
Updated updated = 11;
}
  message Created {...}
  message Updated {...}
  ...
}
```

From which I'm generating java from this Gradle plugin:


```
plugins {
id "com.google.protobuf" version "0.8.15"
}
```

And I'm generating DemoUserEvent instances with Java Iterator looking like this:

```
public class UserEventGenerator implements Iterator<DemoUserEvent>, Serializable {
transient public final static Faker faker = new Faker();
...
@Override public DemoUserEvent next() {
return randomCreatedEvent();
     }
     ...
```

I read those two pieces of documentation:

And tried the demo app below:

```
import com.twitter.chill.protobuf.ProtobufSerializer;
...
public static void main(String[] args) {
final StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class, ProtobufSerializer.class);
flinkEnv.fromCollection(new UserEventGenerator(), DemoUserEvent.class).print();
}
```
But the serialization mechanism still fails to handle my protobuf class:
11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class live.schema.event.user.v1.DemoUserEvent does not contain a getter for field payloadCase_
11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class live.schema.event.user.v1.DemoUserEvent does not contain a setter for field payloadCase_
11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class class live.schema.event.user.v1.DemoUserEvent cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

I've also tried this, without success:

```
flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class, ProtobufSerializer.class);
```

I'm using those versions:

```
ext {
javaVersion = '11'
flinkVersion = '1.12.1'
scalaBinaryVersion = '2.12'
}
dependencies {
compileOnly "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
implementation ("com.twitter:chill-protobuf:0.9.5") {
exclude group: 'com.esotericsoftware.kryo', module: 'kryo'
}
implementation "com.google.protobuf:protobuf-java:3.14.0"
implementation 'com.github.javafaker:javafaker:1.0.2'
}
```

Any idea what I should try next?

Thanks in advance!
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: Failed to register Protobuf Kryo serialization

Dawid Wysakowicz-2
356 posts

Hey,

Why do you say the way you did it, does not work? The logs you posted say the classes cannot be handled by Flink's built-in mechanism for serializing POJOs and it falls back to a GenericType which is serialized with Kryo and should go through your registered serializer.

Best,

Dawid


On 14/02/2021 11:44, Svend Vanderveken wrote:


Hi all,

I'm failing to setup an example of wire serialization with Protobuf, could you help me figure out what I'm doing wrong?

I'm using a simple protobuf schema:
```
syntax = "proto3";

import "google/protobuf/wrappers.proto";
option java_multiple_files = true;
message DemoUserEvent {
  Metadata metadata = 1;
  oneof payload {
    Created created = 10;
    Updated updated = 11;
  }
  message Created {...}
  message Updated {...}
  ...
}
```

From which I'm generating java from this Gradle plugin:


```
plugins {
    id "com.google.protobuf" version "0.8.15" 
}
```

        
And I'm generating DemoUserEvent instances with Java Iterator looking like this:

          
```
public class UserEventGenerator implements Iterator<DemoUserEvent>, Serializable {
    transient public final static Faker faker = new Faker();
    ...
    @Override public DemoUserEvent next() {
        return randomCreatedEvent();
     }
     ...
```

I read those two pieces of documentation:

And tried the demo app below:

```
import com.twitter.chill.protobuf.ProtobufSerializer;
...
public static void main(String[] args) {
    final StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class, ProtobufSerializer.class);
    flinkEnv.fromCollection(new UserEventGenerator(), DemoUserEvent.class).print();
}
```
But the serialization mechanism still fails to handle my protobuf class:
11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class live.schema.event.user.v1.DemoUserEvent does not contain a getter for field payloadCase_
11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class live.schema.event.user.v1.DemoUserEvent does not contain a setter for field payloadCase_
11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class class live.schema.event.user.v1.DemoUserEvent cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

I've also tried this, without success:

```
flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class, ProtobufSerializer.class);
```

I'm using those versions:

```
ext {
    javaVersion = '11'
    flinkVersion = '1.12.1'
    scalaBinaryVersion = '2.12'
}
dependencies {
    compileOnly "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
    implementation ("com.twitter:chill-protobuf:0.9.5") {
        exclude group: 'com.esotericsoftware.kryo', module: 'kryo'
    }
    implementation "com.google.protobuf:protobuf-java:3.14.0"
    implementation 'com.github.javafaker:javafaker:1.0.2'
}
```

Any idea what I should try next?

Thanks in advance!

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: Failed to register Protobuf Kryo serialization

Dawid Wysakowicz-2
356 posts

Improvements to the documentation are always welcome.

In this particular case we actually need to be really careful, as it is not always the expected behavior. As you are registering your own kryo serializer it is expected in your case.

However more often the case is, you don't want to use the GenericType, but a PojoType and this message helps you to identify a problem with your POJO declaration.

Best,

Dawid

On 15/02/2021 11:50, Svend Vanderveken wrote:
Oh!

Indeed, my program was just not starting because I omitted the flink.execute() part ! I confirms it works now. 

Thanks for the quick response.

Do you mind if I submit a small PR to the Flink doc to clarify that those INFO logs are indeed the expected behavior? For example here: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html

Svend



On Mon, Feb 15, 2021 at 10:03 AM Dawid Wysakowicz <[hidden email]> wrote:

Hey,

Why do you say the way you did it, does not work? The logs you posted say the classes cannot be handled by Flink's built-in mechanism for serializing POJOs and it falls back to a GenericType which is serialized with Kryo and should go through your registered serializer.

Best,

Dawid


On 14/02/2021 11:44, Svend Vanderveken wrote:


Hi all,

I'm failing to setup an example of wire serialization with Protobuf, could you help me figure out what I'm doing wrong?

I'm using a simple protobuf schema:
```
syntax = "proto3";

import "google/protobuf/wrappers.proto";
option java_multiple_files = true;
message DemoUserEvent {
  Metadata metadata = 1;
  oneof payload {
    Created created = 10;
    Updated updated = 11;
  }
  message Created {...}
  message Updated {...}
  ...
}
```

From which I'm generating java from this Gradle plugin:


```
plugins {
    id "com.google.protobuf" version "0.8.15" 
}
```
And I'm generating DemoUserEvent instances with Java Iterator looking like this:
```
public class UserEventGenerator implements Iterator<DemoUserEvent>, Serializable {
    transient public final static Faker faker = new Faker();
    ...
    @Override public DemoUserEvent next() {
        return randomCreatedEvent();
     }
     ...
```

I read those two pieces of documentation:

And tried the demo app below:

```
import com.twitter.chill.protobuf.ProtobufSerializer;
...
public static void main(String[] args) {
    final StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class, ProtobufSerializer.class);
    flinkEnv.fromCollection(new UserEventGenerator(), DemoUserEvent.class).print();
}
```
But the serialization mechanism still fails to handle my protobuf class:
11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class live.schema.event.user.v1.DemoUserEvent does not contain a getter for field payloadCase_
11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class live.schema.event.user.v1.DemoUserEvent does not contain a setter for field payloadCase_
11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class class live.schema.event.user.v1.DemoUserEvent cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

I've also tried this, without success:

```
flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class, ProtobufSerializer.class);
```

I'm using those versions:

```
ext {
    javaVersion = '11'
    flinkVersion = '1.12.1'
    scalaBinaryVersion = '2.12'
}
dependencies {
    compileOnly "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
    implementation ("com.twitter:chill-protobuf:0.9.5") {
        exclude group: 'com.esotericsoftware.kryo', module: 'kryo'
    }
    implementation "com.google.protobuf:protobuf-java:3.14.0"
    implementation 'com.github.javafaker:javafaker:1.0.2'
}
```

Any idea what I should try next?

Thanks in advance!


--
Svend Vanderveken
Kelesia SPRL - BE 0839 049 010
Twitter: @sv3ndk


signature.asc (849 bytes) Download Attachment