Support for sending generic class

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Support for sending generic class

flint-stone
Hello!

I'm trying to figure out whether Flink Statefun supports sending object with class that has generic parameter types (and potentially nested types). For example, I send a message that looks like this:

context.send(SINK_EVENT, idString, new Tuple3<>(someLongObject, listOfLongObject, Long));

And obviously I'm getting complaints like this:

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract TypeInformation from Class alone, because generic parameters are missing. Please use TypeInformation.of(TypeHint) instead, or another equivalent method in the API that accepts a TypeHint instead of a Class. For example for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'.
at org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:214)
at org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.typeInformation(DynamicallyRegisteredTypes.java:60)
at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
at org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.registerType(DynamicallyRegisteredTypes.java:49)
at org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateTableAccessor(FlinkState.java:100)
at org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindTable(FlinkStateBinder.java:54)
at org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:39)
at org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:42)
at org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:74)
at org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)
at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:75)
at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:52)
at org.apache.flink.statefun.flink.core.functions.LocalSink.accept(LocalSink.java:36)
at org.apache.flink.statefun.flink.core.functions.ReusableContext.send(ReusableContext.java:92)
at org.apache.flink.statefun.sdk.Context.send(Context.java:88)
at benchmark.HotItemsPersisted$ParseEventFunction.invoke(HotItemsPersisted.java:292)
at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)


Is there any API function that statefun support for parameterized class like this or does the user function need to handle the serialization process -- or is there anyway to quickly modify statefun message interface to support this functionality.

Thanks!

Le




Reply | Threaded
Open this post in threaded view
|

Re: Support for sending generic class

Till Rohrmann
Hi Le,

I am pulling in Gordon who might be able to help you with your question.

Looking at the interface Context, it looks that you cannot easily specify a TypeHint for the message you want to send. Hence, I guess that you explicitly need to register these types.

Cheers,
Till

On Tue, Mar 30, 2021 at 8:20 AM Le Xu <[hidden email]> wrote:
Hello!

I'm trying to figure out whether Flink Statefun supports sending object with class that has generic parameter types (and potentially nested types). For example, I send a message that looks like this:

context.send(SINK_EVENT, idString, new Tuple3<>(someLongObject, listOfLongObject, Long));

And obviously I'm getting complaints like this:

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract TypeInformation from Class alone, because generic parameters are missing. Please use TypeInformation.of(TypeHint) instead, or another equivalent method in the API that accepts a TypeHint instead of a Class. For example for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'.
at org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:214)
at org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.typeInformation(DynamicallyRegisteredTypes.java:60)
at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
at org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.registerType(DynamicallyRegisteredTypes.java:49)
at org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateTableAccessor(FlinkState.java:100)
at org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindTable(FlinkStateBinder.java:54)
at org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:39)
at org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:42)
at org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:74)
at org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)
at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:75)
at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:52)
at org.apache.flink.statefun.flink.core.functions.LocalSink.accept(LocalSink.java:36)
at org.apache.flink.statefun.flink.core.functions.ReusableContext.send(ReusableContext.java:92)
at org.apache.flink.statefun.sdk.Context.send(Context.java:88)
at benchmark.HotItemsPersisted$ParseEventFunction.invoke(HotItemsPersisted.java:292)
at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)


Is there any API function that statefun support for parameterized class like this or does the user function need to handle the serialization process -- or is there anyway to quickly modify statefun message interface to support this functionality.

Thanks!

Le




Reply | Threaded
Open this post in threaded view
|

Re: Support for sending generic class

Tzu-Li (Gordon) Tai
Hi Le,

Thanks for reaching out with this question! It's actually a good segue to allow me to introduce you to StateFun 3.0.0 :)

StateFun 3.0+ comes with a new type system that would eliminate this hassle. You can take a sneak peek here [1].
This is part 1 of a series of tutorials on fundamentals on the upcoming new Java SDK (you can find tutorials for other languages there as well), and it guides you through a bit on the new type system.

For your specific case, what you would do is implement a `Type` for your Tuple3 messages. The `Type` contains information including a typename to identify the data type, and a serializer for de-/serializing the data.
This `Type` can then be used when creating messages to be sent to other functions and egresses, or used as the type specification for persisted state values.

If you're not in production usage already, I would highly suggest waiting a bit for StateFun 3.0.0 as it is just around the corner with an ongoing release candidate vote [2] and is expected to be available within 1-2 weeks.

Let me know if this helps!

Cheers,
Gordon

[1] https://github.com/apache/flink-statefun-playground/blob/dev/java/showcase/src/main/java/org/apache/flink/statefun/playground/java/showcase/part1/types/TypeSystemShowcaseFn.java
[2] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Stateful-Functions-3-0-0-release-candidate-1-td49821.html

On Tue, Mar 30, 2021 at 8:17 PM Till Rohrmann <[hidden email]> wrote:
Hi Le,

I am pulling in Gordon who might be able to help you with your question.

Looking at the interface Context, it looks that you cannot easily specify a TypeHint for the message you want to send. Hence, I guess that you explicitly need to register these types.

Cheers,
Till

On Tue, Mar 30, 2021 at 8:20 AM Le Xu <[hidden email]> wrote:
Hello!

I'm trying to figure out whether Flink Statefun supports sending object with class that has generic parameter types (and potentially nested types). For example, I send a message that looks like this:

context.send(SINK_EVENT, idString, new Tuple3<>(someLongObject, listOfLongObject, Long));

And obviously I'm getting complaints like this:

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract TypeInformation from Class alone, because generic parameters are missing. Please use TypeInformation.of(TypeHint) instead, or another equivalent method in the API that accepts a TypeHint instead of a Class. For example for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'.
at org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:214)
at org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.typeInformation(DynamicallyRegisteredTypes.java:60)
at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
at org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.registerType(DynamicallyRegisteredTypes.java:49)
at org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateTableAccessor(FlinkState.java:100)
at org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindTable(FlinkStateBinder.java:54)
at org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:39)
at org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:42)
at org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:74)
at org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)
at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:75)
at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:52)
at org.apache.flink.statefun.flink.core.functions.LocalSink.accept(LocalSink.java:36)
at org.apache.flink.statefun.flink.core.functions.ReusableContext.send(ReusableContext.java:92)
at org.apache.flink.statefun.sdk.Context.send(Context.java:88)
at benchmark.HotItemsPersisted$ParseEventFunction.invoke(HotItemsPersisted.java:292)
at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)


Is there any API function that statefun support for parameterized class like this or does the user function need to handle the serialization process -- or is there anyway to quickly modify statefun message interface to support this functionality.

Thanks!

Le




Reply | Threaded
Open this post in threaded view
|

Re: Support for sending generic class

flint-stone
Hi Gordon and Till:

Thanks for pointing me to the new version! The code I'm using is for a research project so it's not on any production deadline. However I do like to know any upcoming updates so there won't be any duplicated works. Couple of questions I have now:
1. Does 3.0 support context.send(customized) from DataStream interface? If I implement a statefun application in DataStream API, is there an example on how to specify customized user types for Datastream application?
2. I noticed that the 3.0 version supports async operation as specified in this. In the older version I noticed that the async operation is done by registerAsyncOperation which ensures the function continuation is still executed by the Flink worker. It seems in the new version I can simply use Java's standard async API. Does Statefun still ensure everything within the whenComplete.() scope is executed as a Statefun message? (Let's say I uses a separate thread pool as a storage client).

Thanks for the help!

Le

On Tue, Mar 30, 2021 at 8:39 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Le,

Thanks for reaching out with this question! It's actually a good segue to allow me to introduce you to StateFun 3.0.0 :)

StateFun 3.0+ comes with a new type system that would eliminate this hassle. You can take a sneak peek here [1].
This is part 1 of a series of tutorials on fundamentals on the upcoming new Java SDK (you can find tutorials for other languages there as well), and it guides you through a bit on the new type system.

For your specific case, what you would do is implement a `Type` for your Tuple3 messages. The `Type` contains information including a typename to identify the data type, and a serializer for de-/serializing the data.
This `Type` can then be used when creating messages to be sent to other functions and egresses, or used as the type specification for persisted state values.

If you're not in production usage already, I would highly suggest waiting a bit for StateFun 3.0.0 as it is just around the corner with an ongoing release candidate vote [2] and is expected to be available within 1-2 weeks.

Let me know if this helps!

Cheers,
Gordon

[1] https://github.com/apache/flink-statefun-playground/blob/dev/java/showcase/src/main/java/org/apache/flink/statefun/playground/java/showcase/part1/types/TypeSystemShowcaseFn.java
[2] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Stateful-Functions-3-0-0-release-candidate-1-td49821.html

On Tue, Mar 30, 2021 at 8:17 PM Till Rohrmann <[hidden email]> wrote:
Hi Le,

I am pulling in Gordon who might be able to help you with your question.

Looking at the interface Context, it looks that you cannot easily specify a TypeHint for the message you want to send. Hence, I guess that you explicitly need to register these types.

Cheers,
Till

On Tue, Mar 30, 2021 at 8:20 AM Le Xu <[hidden email]> wrote:
Hello!

I'm trying to figure out whether Flink Statefun supports sending object with class that has generic parameter types (and potentially nested types). For example, I send a message that looks like this:

context.send(SINK_EVENT, idString, new Tuple3<>(someLongObject, listOfLongObject, Long));

And obviously I'm getting complaints like this:

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract TypeInformation from Class alone, because generic parameters are missing. Please use TypeInformation.of(TypeHint) instead, or another equivalent method in the API that accepts a TypeHint instead of a Class. For example for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'.
at org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:214)
at org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.typeInformation(DynamicallyRegisteredTypes.java:60)
at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
at org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.registerType(DynamicallyRegisteredTypes.java:49)
at org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateTableAccessor(FlinkState.java:100)
at org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindTable(FlinkStateBinder.java:54)
at org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:39)
at org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:42)
at org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:74)
at org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)
at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:75)
at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:52)
at org.apache.flink.statefun.flink.core.functions.LocalSink.accept(LocalSink.java:36)
at org.apache.flink.statefun.flink.core.functions.ReusableContext.send(ReusableContext.java:92)
at org.apache.flink.statefun.sdk.Context.send(Context.java:88)
at benchmark.HotItemsPersisted$ParseEventFunction.invoke(HotItemsPersisted.java:292)
at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)


Is there any API function that statefun support for parameterized class like this or does the user function need to handle the serialization process -- or is there anyway to quickly modify statefun message interface to support this functionality.

Thanks!

Le