State Processor API with Beam

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

State Processor API with Beam

Stephen Patel
I've got an apache beam pipeline running on flink (1.9.1).

I've been attempting to read a RocksDB savepoint taken from this beam-on-flink pipeline, using the state processor api, however it seems to have some incompatibilities around namespaces.  Beam for instance uses a String namespace, while the KeyedStateInputFormat uses the VoidNamespace. This manifests as an exception:
Caused by: org.apache.flink.util.StateMigrationException: The new namespace serializer must be compatible.
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:524)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
	at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) 
Is there any way to let the namespace type (and value) be specified by the user? 
Reply | Threaded
Open this post in threaded view
|

Re: State Processor API with Beam

Seth Wiesman
Hi Stephen,

You will need to implement a custom operator and user the `transform` method. It's not just that you need to specify the namespace type but you will also need to look into the beam internals to see how it stores data in flink state, how it translates between beam serializers and flink serializers, etc.

Seth

On Mon, Apr 6, 2020 at 1:02 PM Stephen Patel <[hidden email]> wrote:
I've got an apache beam pipeline running on flink (1.9.1).

I've been attempting to read a RocksDB savepoint taken from this beam-on-flink pipeline, using the state processor api, however it seems to have some incompatibilities around namespaces.  Beam for instance uses a String namespace, while the KeyedStateInputFormat uses the VoidNamespace. This manifests as an exception:
Caused by: org.apache.flink.util.StateMigrationException: The new namespace serializer must be compatible.
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:524)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
	at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) 
Is there any way to let the namespace type (and value) be specified by the user? 
Reply | Threaded
Open this post in threaded view
|

Re: State Processor API with Beam

Stephen Patel
Thanks Seth, I'll look into rolling my own KeyedStateInputFormat.  

On Mon, Apr 6, 2020 at 2:50 PM Seth Wiesman <[hidden email]> wrote:
Hi Stephen,

You will need to implement a custom operator and user the `transform` method. It's not just that you need to specify the namespace type but you will also need to look into the beam internals to see how it stores data in flink state, how it translates between beam serializers and flink serializers, etc.

Seth

On Mon, Apr 6, 2020 at 1:02 PM Stephen Patel <[hidden email]> wrote:
I've got an apache beam pipeline running on flink (1.9.1).

I've been attempting to read a RocksDB savepoint taken from this beam-on-flink pipeline, using the state processor api, however it seems to have some incompatibilities around namespaces.  Beam for instance uses a String namespace, while the KeyedStateInputFormat uses the VoidNamespace. This manifests as an exception:
Caused by: org.apache.flink.util.StateMigrationException: The new namespace serializer must be compatible.
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:524)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
	at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) 
Is there any way to let the namespace type (and value) be specified by the user?