Hi, I’m trying to work through an example with Flink Stateful Functions in Python. I have a series of custom protobuf messages that I’ve defined but I’m struggling with how they should be provided to the runtime so the messages
in Kafka can be deserialized. I see,
type:
statefun.kafka.io/routable-protobuf-ingress
id:
example/names in the example, but how can I change that to my.namespace.com/IngressMessage? Do I need to provide the protobuf compiled JAR in my Python app? Thanks Dan
|
Hi Dan, I'm assuming that you have different Kafka topics, and each topic contains messages of a single protobuf type. In that case, you have to specify the mapping between a topic name to it's Protobuf message type. To do that, assume that you have a Kafka topic A that contains protobuf messages of type my.namespace.com/IngressMessageA, and a Kafka topic B that contains protobuf message of type my.namespace.com/IngressMessageB, then your ingress definition would look like this: - ingress: meta: type: statefun.kafka.io/routable-protobuf-ingress id: ... spec: ... topics: - topic: A typeUrl: com.googleapis/my.namespace.com/IngressMessageA targets: - example/greeter - topic: B typeUrl: com.googleapis/my.namespace.com/IngressMessageB targets: - example/greeter Now, your Python function (example/greeter) will receive a Protobuf Any type [1] with its typeUrl set according to the ingress definition, and you can safely unpack it. [2] @functions.bind("example/greeter") def greeter(context, message): if message.Is(IngressMessageA.DESCRIPTOR): ... elif message.Is(IngressMessageB.DESCRIPTOR): .. I hope that helps, Igal. On Wed, Sep 30, 2020 at 9:10 PM Clements, Danial C <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |