Process with guava cache

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Process with guava cache

Juan Gentile

Hello!

 

I’m trying to have a process with a cache (using guava) and following this

https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html

 

But when I run it I get the following exception:

 

com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
localCache (com.criteo.internal.shaded.com.google.common.cache.LocalCache$LocalManualCache)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:272)
        at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
        at com.criteo.flink.CachedOutput.processElement(CachedOutput.scala:28)
        at com.criteo.flink.CachedOutput.processElement(CachedOutput.scala:16)
        at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at com.criteo.internal.shaded.com.google.common.cache.LocalCache.hash(LocalCache.java:1839)
        at com.criteo.internal.shaded.com.google.common.cache.LocalCache.put(LocalCache.java:4148)
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:144)
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)

 

Which I guess it’s a problem trying to serialize the state to update it?

Do you know how I could solve this?

 

Btw I’m trying to not output records already processed while using a sliding window.

 

Thanks!

Juan