Re: Support for sending generic class

Posted by Tzu-Li (Gordon) Tai on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Support-for-sending-generic-class-tp42624p42641.html

Hi Le,

Thanks for reaching out with this question! It's actually a good segue to allow me to introduce you to StateFun 3.0.0 :)

StateFun 3.0+ comes with a new type system that would eliminate this hassle. You can take a sneak peek here [1].
This is part 1 of a series of tutorials on fundamentals on the upcoming new Java SDK (you can find tutorials for other languages there as well), and it guides you through a bit on the new type system.

For your specific case, what you would do is implement a `Type` for your Tuple3 messages. The `Type` contains information including a typename to identify the data type, and a serializer for de-/serializing the data.
This `Type` can then be used when creating messages to be sent to other functions and egresses, or used as the type specification for persisted state values.

If you're not in production usage already, I would highly suggest waiting a bit for StateFun 3.0.0 as it is just around the corner with an ongoing release candidate vote [2] and is expected to be available within 1-2 weeks.

Let me know if this helps!

Cheers,
Gordon

[1] https://github.com/apache/flink-statefun-playground/blob/dev/java/showcase/src/main/java/org/apache/flink/statefun/playground/java/showcase/part1/types/TypeSystemShowcaseFn.java
[2] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Stateful-Functions-3-0-0-release-candidate-1-td49821.html

On Tue, Mar 30, 2021 at 8:17 PM Till Rohrmann <[hidden email]> wrote:
Hi Le,

I am pulling in Gordon who might be able to help you with your question.

Looking at the interface Context, it looks that you cannot easily specify a TypeHint for the message you want to send. Hence, I guess that you explicitly need to register these types.

Cheers,
Till

On Tue, Mar 30, 2021 at 8:20 AM Le Xu <[hidden email]> wrote:
Hello!

I'm trying to figure out whether Flink Statefun supports sending object with class that has generic parameter types (and potentially nested types). For example, I send a message that looks like this:

context.send(SINK_EVENT, idString, new Tuple3<>(someLongObject, listOfLongObject, Long));

And obviously I'm getting complaints like this:

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract TypeInformation from Class alone, because generic parameters are missing. Please use TypeInformation.of(TypeHint) instead, or another equivalent method in the API that accepts a TypeHint instead of a Class. For example for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'.
at org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:214)
at org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.typeInformation(DynamicallyRegisteredTypes.java:60)
at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
at org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.registerType(DynamicallyRegisteredTypes.java:49)
at org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateTableAccessor(FlinkState.java:100)
at org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindTable(FlinkStateBinder.java:54)
at org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:39)
at org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:42)
at org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:74)
at org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)
at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:75)
at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:52)
at org.apache.flink.statefun.flink.core.functions.LocalSink.accept(LocalSink.java:36)
at org.apache.flink.statefun.flink.core.functions.ReusableContext.send(ReusableContext.java:92)
at org.apache.flink.statefun.sdk.Context.send(Context.java:88)
at benchmark.HotItemsPersisted$ParseEventFunction.invoke(HotItemsPersisted.java:292)
at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)


Is there any API function that statefun support for parameterized class like this or does the user function need to handle the serialization process -- or is there anyway to quickly modify statefun message interface to support this functionality.

Thanks!

Le