Help with Python Stateful Functions Types

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Help with Python Stateful Functions Types

Clements, Danial C

Hi,

 

I’m trying to work through an example with Flink Stateful Functions in Python.  I have a series of custom protobuf messages that I’ve defined but I’m struggling with how they should be provided to the runtime so the messages in Kafka can be deserialized.  I see,

 

type: statefun.kafka.io/routable-protobuf-ingress

id: example/names

in the example, but how can I change that to my.namespace.com/IngressMessage?  Do I need to provide the protobuf compiled JAR in my Python app?

 

Thanks

Dan


This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.

Reply | Threaded
Open this post in threaded view
|

Re: Help with Python Stateful Functions Types

Igal Shilman
Hi Dan,

I'm assuming that you have different Kafka topics, and each topic contains messages of a single protobuf type.

In that case, you have to specify the mapping between a topic name to it's Protobuf message type.
To do that, assume that you have a Kafka topic A that contains protobuf messages of type my.namespace.com/IngressMessageA, and
a Kafka topic B that contains protobuf message of type my.namespace.com/IngressMessageB, then your ingress definition would look like this:

- ingress:
    meta:
      type: statefun.kafka.io/routable-protobuf-ingress
      id: ...
    spec:
      ...
      topics:
        - topic: A
          typeUrl: com.googleapis/my.namespace.com/IngressMessageA
          targets:
            - example/greeter
       - topic: B
        typeUrl: com.googleapis/my.namespace.com/IngressMessageB
        targets:
            - example/greeter

Now, your Python function (example/greeter) will receive a Protobuf Any type [1]
with its typeUrl set according to the ingress definition, and you can safely unpack it. [2]

@functions.bind("example/greeter")
def greeter(context, message):
    if message.Is(IngressMessageA.DESCRIPTOR):
      ...
   elif message.Is(IngressMessageB.DESCRIPTOR):
      ..




I hope that helps,
Igal.

On Wed, Sep 30, 2020 at 9:10 PM Clements, Danial C <[hidden email]> wrote:

Hi,

 

I’m trying to work through an example with Flink Stateful Functions in Python.  I have a series of custom protobuf messages that I’ve defined but I’m struggling with how they should be provided to the runtime so the messages in Kafka can be deserialized.  I see,

 

type: statefun.kafka.io/routable-protobuf-ingress

id: example/names

in the example, but how can I change that to my.namespace.com/IngressMessage?  Do I need to provide the protobuf compiled JAR in my Python app?

 

Thanks

Dan


This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.