Stateful Functions: Routing to remote functions

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Stateful Functions: Routing to remote functions

Jan Brusch
Hi,

based on Gordons excellent advice on how to handle JSON messages with
remote functions
(https://www.mail-archive.com/user@.../msg34385.html) I was
able to:

1) Deserialize JSON Messages from a Kafka Stream

2) Route the message to an embedded StatefulFunction

3) Serialize the Resulting Protobuf Message to JSON and write it back to
a Kafka Stream

Now, instead of 2), I would like to route the message to a remote
function, handle it there and write it back to the Stream as json via
the serializer defined in Java. From my understanding all this should
work through address based communication within the Stateful Functions
Application. Unfortunately I don't get it to work. See the relevant code
and error message below. The rest of the project structure basically
follows the walkthrough example from the documentation. Any ideas or
input would be greatly appreciated.


module.yaml:

--------------------------------

...

spec:
     functions:
       - function:
           meta:
             kind: http
             type: demo/eventCounterPython
           spec:
             endpoint: http://python-worker:8000/statefun
             states:
               - name: count
             maxNumBatchRequests: 500

...

---------------------------------------


EventIO.java

---------------------------------------------

final class EventIO {

   static final EgressIdentifier<GlobalEventCount>
EVENT_COUNT_PYTHON_EGRESS_ID =
       new EgressIdentifier<>("demo", "eventCountPython",
GlobalEventCount.class);

     ....

   EgressSpec<GlobalEventCount> getEventCountPythonEgressSpec() {
     return KafkaEgressBuilder.forIdentifier(EVENT_COUNT_PYTHON_EGRESS_ID)
         .withKafkaAddress(kafkaAddress)
.withSerializer(GlobalEventCountPythonKafkaSerializer.class)
         .build();
   }

   ....

   private static final class GlobalEventCountPythonKafkaSerializer
implements KafkaEgressSerializer<GlobalEventCount> {

     private static final long serialVersionUID = 1L;

     @Override
     public ProducerRecord<byte[], byte[]> serialize(GlobalEventCount
eventCount) {
       byte[] key = null;
       byte[] value = null;

       try {
         String json = JsonFormat
                 .printer()
                 .includingDefaultValueFields()
                 .print(eventCount);
         value = json.getBytes();
       } catch (InvalidProtocolBufferException e) {
         e.printStackTrace();
       }

       return new ProducerRecord<>("eventCountPython", key, value);
     }
   }

}

--------------------------------------------


EventModule.java:

--------------------------------------

public final class EventModule implements StatefulFunctionModule {

     @Override
     public void configure(Map<String, String> globalConfiguration,
Binder binder) {

         EventIO ioModule = new EventIO("kafka:9092");

         binder.bindIngress(ioModule.getIngressSpec());

         binder.bindIngressRouter(EventIO.EVENT_INGRESS_ID, new
EventRouter());

         binder.bindEgress(ioModule.getEventCountJavaEgressSpec());
binder.bindEgress(ioModule.getEventCountPythonEgressSpec());

binder.bindFunctionProvider(EventCountJavaStatefulFunction.TYPE, unused
-> new EventCountJavaStatefulFunction());
     }
}

------------------------------------------


EventRouter.java

-------------------------------------------

final class EventRouter implements Router<Event> {

     static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterPython");
     static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterJava");

     @Override
     public void route(Event event, Downstream<Event> downstream) {
         downstream.forward(
                 JAVA_EVENT_COUNTER_TYPE,
                 "count",
                 event)
         ;
         downstream.forward(
                 new Address(
                         PYTHON_EVENT_COUNTER_TYPE,
                         "count"
                 ),
                 event
         );
     }
}

------------------------------------------


worker.py

-------------------------------------------

@functions.bind("demo/eventCounterPython")
def handle_event(context, _):
     state = context.state('count').unpack(GlobalEventCount)
     if not state:
         state = GlobalEventCount()
         state.value = 1
     else:
         state.value += 1
     context.state('count').pack(state)

     egress_message = kafka_egress_record(topic="eventCountPython",
value=state)
     context.pack_and_send_egress("demo/eventCountPython", egress_message)

------------------------------------------


ERROR MESSAGE

------------------------------------------

worker_1         | 2020-07-02 14:38:35,436 INFO
org.apache.flink.runtime.taskmanager.Task                     -
feedback-union -> functions -> (Sink: demo-eventCountPython-egress,
Sink: demo-eventCountJava-egress) (1/1)
(15ea8a98c215693d6e7d5c80f3e5f8b6) switched from RUNNING to FAILED.
worker_1         | java.lang.IllegalArgumentException: Unknown provider
for type FunctionType(demo, eventCounterPython)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:44)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:63)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:56)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:135)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:130)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:82)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
worker_1         |     at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
worker_1         |     at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
worker_1         |     at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:174)
worker_1         |     at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:80)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
worker_1         |     at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
worker_1         |     at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
worker_1         |     at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
worker_1         |     at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
worker_1         |     at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:533

------------------------------------------


Best regards and thanks for taking the time to read all this,

Jan

Reply | Threaded
Open this post in threaded view
|

Re: Stateful Functions: Routing to remote functions

Igal Shilman
Hi Jan,
Judging by the exception message it seems like the function type "demo/eventCounterPython" is not known to stateful functions.
This could happen if the module.yaml (provided in your email) was accidentally excluded from the resulting artifact (Docker image or a jar-with-dependencies) 
Can you please verify that the module.yaml is present at runtime?

Kind regards,
Igal.

On Thu, Jul 2, 2020 at 7:29 PM Jan Brusch <[hidden email]> wrote:
Hi,

based on Gordons excellent advice on how to handle JSON messages with
remote functions
(https://www.mail-archive.com/user@.../msg34385.html) I was
able to:

1) Deserialize JSON Messages from a Kafka Stream

2) Route the message to an embedded StatefulFunction

3) Serialize the Resulting Protobuf Message to JSON and write it back to
a Kafka Stream

Now, instead of 2), I would like to route the message to a remote
function, handle it there and write it back to the Stream as json via
the serializer defined in Java. From my understanding all this should
work through address based communication within the Stateful Functions
Application. Unfortunately I don't get it to work. See the relevant code
and error message below. The rest of the project structure basically
follows the walkthrough example from the documentation. Any ideas or
input would be greatly appreciated.


module.yaml:

--------------------------------

...

spec:
     functions:
       - function:
           meta:
             kind: http
             type: demo/eventCounterPython
           spec:
             endpoint: http://python-worker:8000/statefun
             states:
               - name: count
             maxNumBatchRequests: 500

...

---------------------------------------


EventIO.java

---------------------------------------------

final class EventIO {

   static final EgressIdentifier<GlobalEventCount>
EVENT_COUNT_PYTHON_EGRESS_ID =
       new EgressIdentifier<>("demo", "eventCountPython",
GlobalEventCount.class);

     ....

   EgressSpec<GlobalEventCount> getEventCountPythonEgressSpec() {
     return KafkaEgressBuilder.forIdentifier(EVENT_COUNT_PYTHON_EGRESS_ID)
         .withKafkaAddress(kafkaAddress)
.withSerializer(GlobalEventCountPythonKafkaSerializer.class)
         .build();
   }

   ....

   private static final class GlobalEventCountPythonKafkaSerializer
implements KafkaEgressSerializer<GlobalEventCount> {

     private static final long serialVersionUID = 1L;

     @Override
     public ProducerRecord<byte[], byte[]> serialize(GlobalEventCount
eventCount) {
       byte[] key = null;
       byte[] value = null;

       try {
         String json = JsonFormat
                 .printer()
                 .includingDefaultValueFields()
                 .print(eventCount);
         value = json.getBytes();
       } catch (InvalidProtocolBufferException e) {
         e.printStackTrace();
       }

       return new ProducerRecord<>("eventCountPython", key, value);
     }
   }

}

--------------------------------------------


EventModule.java:

--------------------------------------

public final class EventModule implements StatefulFunctionModule {

     @Override
     public void configure(Map<String, String> globalConfiguration,
Binder binder) {

         EventIO ioModule = new EventIO("kafka:9092");

         binder.bindIngress(ioModule.getIngressSpec());

         binder.bindIngressRouter(EventIO.EVENT_INGRESS_ID, new
EventRouter());

         binder.bindEgress(ioModule.getEventCountJavaEgressSpec());
binder.bindEgress(ioModule.getEventCountPythonEgressSpec());

binder.bindFunctionProvider(EventCountJavaStatefulFunction.TYPE, unused
-> new EventCountJavaStatefulFunction());
     }
}

------------------------------------------


EventRouter.java

-------------------------------------------

final class EventRouter implements Router<Event> {

     static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterPython");
     static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterJava");

     @Override
     public void route(Event event, Downstream<Event> downstream) {
         downstream.forward(
                 JAVA_EVENT_COUNTER_TYPE,
                 "count",
                 event)
         ;
         downstream.forward(
                 new Address(
                         PYTHON_EVENT_COUNTER_TYPE,
                         "count"
                 ),
                 event
         );
     }
}

------------------------------------------


worker.py

-------------------------------------------

@functions.bind("demo/eventCounterPython")
def handle_event(context, _):
     state = context.state('count').unpack(GlobalEventCount)
     if not state:
         state = GlobalEventCount()
         state.value = 1
     else:
         state.value += 1
     context.state('count').pack(state)

     egress_message = kafka_egress_record(topic="eventCountPython",
value=state)
     context.pack_and_send_egress("demo/eventCountPython", egress_message)

------------------------------------------


ERROR MESSAGE

------------------------------------------

worker_1         | 2020-07-02 14:38:35,436 INFO
org.apache.flink.runtime.taskmanager.Task                     -
feedback-union -> functions -> (Sink: demo-eventCountPython-egress,
Sink: demo-eventCountJava-egress) (1/1)
(15ea8a98c215693d6e7d5c80f3e5f8b6) switched from RUNNING to FAILED.
worker_1         | java.lang.IllegalArgumentException: Unknown provider
for type FunctionType(demo, eventCounterPython)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:44)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:63)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:56)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:135)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:130)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:82)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
worker_1         |     at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
worker_1         |     at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
worker_1         |     at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:174)
worker_1         |     at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:80)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
worker_1         |     at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
worker_1         |     at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
worker_1         |     at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
worker_1         |     at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
worker_1         |     at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:533

------------------------------------------


Best regards and thanks for taking the time to read all this,

Jan

Reply | Threaded
Open this post in threaded view
|

Re: Stateful Functions: Routing to remote functions

Jan Brusch

Hi Igal,

thanks for your reply. Initially I thought the same thing, but it turns out I am able to call the remote function from an embedded "wrapper" function using the exact same setup (Relevant Code below). So that's one kind of solution to that Problem. But to me it seems like it's a bit of a hack and not the idiomatic way to solve this...

From my understanding of the address based communication within Flink Stateful Functions, I feel like it should be possible to call that function from the router directly. But I am probably either using the Router wrong or misunderstand some of the ideas behind address based communication...


EventRouter.java

------------------------------------------------------------------------------------

final class EventRouter implements Router<Event> {

  @Override
  public void route(Event event, Downstream<Event> downstream) {
    downstream.forward(EventCounterWrapper.TYPE, "_", event);
  }
}

--------------------------------------------------------------------------------------


EventCounterWrapper.java

---------------------------------------------------------------------------------------

public class EventCounterWrapper implements StatefulFunction {

    static final FunctionType TYPE = new FunctionType("demo", "eventCounterWrapper");
    public static final FunctionType REMOTE_FUNCTION_TYPE = new FunctionType("demo/external", "eventCounterPython");

    @Override
    public void invoke(Context context, Object input) {
        if (input instanceof Event) {
            Event event = (Event) input;
            Any message = Any.pack(event);
            context.send(REMOTE_FUNCTION_TYPE, "_", message);
        }

        if (input instanceof Any) {
            final EventCount eventCount;
            try {
                eventCount = ((Any) input).unpack(EventCount.class);
            } catch (InvalidProtocolBufferException e) {
                throw new RuntimeException("Unexpected type", e);
            }
            context.send(SessionIO.EVENT_COUNT_EGRESS_ID, eventCount);
        }
    }
}

-----------------------------------------------------------------------------------


worker.py
----------------------------------------------------
@functions.bind("demo/external/eventCounterPython")
def handle_event(context, _):
     state = context.state('count').unpack(EventCount)
     if not state:
         state = EventCount()
         state.count = 1
     else:
         state.count += 1
     context.state('count').pack(state)
   

    envelope = Any()
    envelope.Pack(state)
    context.reply(envelope)
----------------------------------------------------


module.yaml

---------------------------------------------------------

spec:
    functions:
      - function:
          meta:
            kind: http
            type: demo/external/eventCounterPython
          spec:
            endpoint: http://python-worker:8000/statefun
            states:
              - count

---------------------------------------------------------


Best Regards

Jan


On 03.07.20 17:33, Igal Shilman wrote:
Hi Jan,
Judging by the exception message it seems like the function type "demo/eventCounterPython" is not known to stateful functions.
This could happen if the module.yaml (provided in your email) was accidentally excluded from the resulting artifact (Docker image or a jar-with-dependencies) 
Can you please verify that the module.yaml is present at runtime?

Kind regards,
Igal.

On Thu, Jul 2, 2020 at 7:29 PM Jan Brusch <[hidden email]> wrote:
Hi,

based on Gordons excellent advice on how to handle JSON messages with
remote functions
(https://www.mail-archive.com/user@.../msg34385.html) I was
able to:

1) Deserialize JSON Messages from a Kafka Stream

2) Route the message to an embedded StatefulFunction

3) Serialize the Resulting Protobuf Message to JSON and write it back to
a Kafka Stream

Now, instead of 2), I would like to route the message to a remote
function, handle it there and write it back to the Stream as json via
the serializer defined in Java. From my understanding all this should
work through address based communication within the Stateful Functions
Application. Unfortunately I don't get it to work. See the relevant code
and error message below. The rest of the project structure basically
follows the walkthrough example from the documentation. Any ideas or
input would be greatly appreciated.


module.yaml:

--------------------------------

...

spec:
     functions:
       - function:
           meta:
             kind: http
             type: demo/eventCounterPython
           spec:
             endpoint: http://python-worker:8000/statefun
             states:
               - name: count
             maxNumBatchRequests: 500

...

---------------------------------------


EventIO.java

---------------------------------------------

final class EventIO {

   static final EgressIdentifier<GlobalEventCount>
EVENT_COUNT_PYTHON_EGRESS_ID =
       new EgressIdentifier<>("demo", "eventCountPython",
GlobalEventCount.class);

     ....

   EgressSpec<GlobalEventCount> getEventCountPythonEgressSpec() {
     return KafkaEgressBuilder.forIdentifier(EVENT_COUNT_PYTHON_EGRESS_ID)
         .withKafkaAddress(kafkaAddress)
.withSerializer(GlobalEventCountPythonKafkaSerializer.class)
         .build();
   }

   ....

   private static final class GlobalEventCountPythonKafkaSerializer
implements KafkaEgressSerializer<GlobalEventCount> {

     private static final long serialVersionUID = 1L;

     @Override
     public ProducerRecord<byte[], byte[]> serialize(GlobalEventCount
eventCount) {
       byte[] key = null;
       byte[] value = null;

       try {
         String json = JsonFormat
                 .printer()
                 .includingDefaultValueFields()
                 .print(eventCount);
         value = json.getBytes();
       } catch (InvalidProtocolBufferException e) {
         e.printStackTrace();
       }

       return new ProducerRecord<>("eventCountPython", key, value);
     }
   }

}

--------------------------------------------


EventModule.java:

--------------------------------------

public final class EventModule implements StatefulFunctionModule {

     @Override
     public void configure(Map<String, String> globalConfiguration,
Binder binder) {

         EventIO ioModule = new EventIO("kafka:9092");

         binder.bindIngress(ioModule.getIngressSpec());

         binder.bindIngressRouter(EventIO.EVENT_INGRESS_ID, new
EventRouter());

         binder.bindEgress(ioModule.getEventCountJavaEgressSpec());
binder.bindEgress(ioModule.getEventCountPythonEgressSpec());

binder.bindFunctionProvider(EventCountJavaStatefulFunction.TYPE, unused
-> new EventCountJavaStatefulFunction());
     }
}

------------------------------------------


EventRouter.java

-------------------------------------------

final class EventRouter implements Router<Event> {

     static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterPython");
     static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterJava");

     @Override
     public void route(Event event, Downstream<Event> downstream) {
         downstream.forward(
                 JAVA_EVENT_COUNTER_TYPE,
                 "count",
                 event)
         ;
         downstream.forward(
                 new Address(
                         PYTHON_EVENT_COUNTER_TYPE,
                         "count"
                 ),
                 event
         );
     }
}

------------------------------------------


worker.py

-------------------------------------------

@functions.bind("demo/eventCounterPython")
def handle_event(context, _):
     state = context.state('count').unpack(GlobalEventCount)
     if not state:
         state = GlobalEventCount()
         state.value = 1
     else:
         state.value += 1
     context.state('count').pack(state)

     egress_message = kafka_egress_record(topic="eventCountPython",
value=state)
     context.pack_and_send_egress("demo/eventCountPython", egress_message)

------------------------------------------


ERROR MESSAGE

------------------------------------------

worker_1         | 2020-07-02 14:38:35,436 INFO
org.apache.flink.runtime.taskmanager.Task                     -
feedback-union -> functions -> (Sink: demo-eventCountPython-egress,
Sink: demo-eventCountJava-egress) (1/1)
(15ea8a98c215693d6e7d5c80f3e5f8b6) switched from RUNNING to FAILED.
worker_1         | java.lang.IllegalArgumentException: Unknown provider
for type FunctionType(demo, eventCounterPython)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:44)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:63)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:56)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:135)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:130)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:82)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
worker_1         |     at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
worker_1         |     at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
worker_1         |     at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:174)
worker_1         |     at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:80)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
worker_1         |     at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
worker_1         |     at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
worker_1         |     at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
worker_1         |     at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
worker_1         |     at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:533

------------------------------------------


Best regards and thanks for taking the time to read all this,

Jan

-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501
Reply | Threaded
Open this post in threaded view
|

Re: Stateful Functions: Routing to remote functions

Igal Shilman
Hi Jan,

Two followup questions:

1. Looking at the stack trace provided in your email, it does seem like the function type is unavailable, and I'd like to follow up on that: can you please share your Dockerfile, so
we have the complete picture. If you are not comfortable sharing that, then you can please try to execute into the container and manually validate that the module.yaml is present 
both on the "worker" image and the "master" image, and it defines the remote function name correctly?

2. In your original email, the provided router does not route messages of type Any, but it actually 
forwards them as-in, the remote functions API requires that the message being sent to the remote function
is of type Any.  Can you try something like this:

final class EventRouter implements Router<com.google.protobuf.Message > {

     static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterPython");
     static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterJava");
     @Override
     public void route(com.google.protobuf.Message event, Downstream<com.google.protobuf.Message> downstream) {
         downstream.forward(
                 JAVA_EVENT_COUNTER_TYPE,
                 "count",
                 event)
         ;
         downstream.forward(
                 new Address(
                         PYTHON_EVENT_COUNTER_TYPE,
                         "count"
                 ),
                 Any.pack(event)
         );
     }
}


In addition you would have to change the definition of your ingress identifier to have a produced type of com.google.protobuf.Message 
instead of an Event.


Good luck!
Igal

On Fri, Jul 3, 2020 at 10:09 PM Jan Brusch <[hidden email]> wrote:

Hi Igal,

thanks for your reply. Initially I thought the same thing, but it turns out I am able to call the remote function from an embedded "wrapper" function using the exact same setup (Relevant Code below). So that's one kind of solution to that Problem. But to me it seems like it's a bit of a hack and not the idiomatic way to solve this...

From my understanding of the address based communication within Flink Stateful Functions, I feel like it should be possible to call that function from the router directly. But I am probably either using the Router wrong or misunderstand some of the ideas behind address based communication...


EventRouter.java

------------------------------------------------------------------------------------

final class EventRouter implements Router<Event> {

  @Override
  public void route(Event event, Downstream<Event> downstream) {
    downstream.forward(EventCounterWrapper.TYPE, "_", event);
  }
}

--------------------------------------------------------------------------------------


EventCounterWrapper.java

---------------------------------------------------------------------------------------

public class EventCounterWrapper implements StatefulFunction {

    static final FunctionType TYPE = new FunctionType("demo", "eventCounterWrapper");
    public static final FunctionType REMOTE_FUNCTION_TYPE = new FunctionType("demo/external", "eventCounterPython");

    @Override
    public void invoke(Context context, Object input) {
        if (input instanceof Event) {
            Event event = (Event) input;
            Any message = Any.pack(event);
            context.send(REMOTE_FUNCTION_TYPE, "_", message);
        }

        if (input instanceof Any) {
            final EventCount eventCount;
            try {
                eventCount = ((Any) input).unpack(EventCount.class);
            } catch (InvalidProtocolBufferException e) {
                throw new RuntimeException("Unexpected type", e);
            }
            context.send(SessionIO.EVENT_COUNT_EGRESS_ID, eventCount);
        }
    }
}

-----------------------------------------------------------------------------------


worker.py
----------------------------------------------------
@functions.bind("demo/external/eventCounterPython")
def handle_event(context, _):
     state = context.state('count').unpack(EventCount)
     if not state:
         state = EventCount()
         state.count = 1
     else:
         state.count += 1
     context.state('count').pack(state)
   

    envelope = Any()
    envelope.Pack(state)
    context.reply(envelope)
----------------------------------------------------


module.yaml

---------------------------------------------------------

spec:
    functions:
      - function:
          meta:
            kind: http
            type: demo/external/eventCounterPython
          spec:
            endpoint: http://python-worker:8000/statefun
            states:
              - count

---------------------------------------------------------


Best Regards

Jan


On 03.07.20 17:33, Igal Shilman wrote:
Hi Jan,
Judging by the exception message it seems like the function type "demo/eventCounterPython" is not known to stateful functions.
This could happen if the module.yaml (provided in your email) was accidentally excluded from the resulting artifact (Docker image or a jar-with-dependencies) 
Can you please verify that the module.yaml is present at runtime?

Kind regards,
Igal.

On Thu, Jul 2, 2020 at 7:29 PM Jan Brusch <[hidden email]> wrote:
Hi,

based on Gordons excellent advice on how to handle JSON messages with
remote functions
(https://www.mail-archive.com/user@.../msg34385.html) I was
able to:

1) Deserialize JSON Messages from a Kafka Stream

2) Route the message to an embedded StatefulFunction

3) Serialize the Resulting Protobuf Message to JSON and write it back to
a Kafka Stream

Now, instead of 2), I would like to route the message to a remote
function, handle it there and write it back to the Stream as json via
the serializer defined in Java. From my understanding all this should
work through address based communication within the Stateful Functions
Application. Unfortunately I don't get it to work. See the relevant code
and error message below. The rest of the project structure basically
follows the walkthrough example from the documentation. Any ideas or
input would be greatly appreciated.


module.yaml:

--------------------------------

...

spec:
     functions:
       - function:
           meta:
             kind: http
             type: demo/eventCounterPython
           spec:
             endpoint: http://python-worker:8000/statefun
             states:
               - name: count
             maxNumBatchRequests: 500

...

---------------------------------------


EventIO.java

---------------------------------------------

final class EventIO {

   static final EgressIdentifier<GlobalEventCount>
EVENT_COUNT_PYTHON_EGRESS_ID =
       new EgressIdentifier<>("demo", "eventCountPython",
GlobalEventCount.class);

     ....

   EgressSpec<GlobalEventCount> getEventCountPythonEgressSpec() {
     return KafkaEgressBuilder.forIdentifier(EVENT_COUNT_PYTHON_EGRESS_ID)
         .withKafkaAddress(kafkaAddress)
.withSerializer(GlobalEventCountPythonKafkaSerializer.class)
         .build();
   }

   ....

   private static final class GlobalEventCountPythonKafkaSerializer
implements KafkaEgressSerializer<GlobalEventCount> {

     private static final long serialVersionUID = 1L;

     @Override
     public ProducerRecord<byte[], byte[]> serialize(GlobalEventCount
eventCount) {
       byte[] key = null;
       byte[] value = null;

       try {
         String json = JsonFormat
                 .printer()
                 .includingDefaultValueFields()
                 .print(eventCount);
         value = json.getBytes();
       } catch (InvalidProtocolBufferException e) {
         e.printStackTrace();
       }

       return new ProducerRecord<>("eventCountPython", key, value);
     }
   }

}

--------------------------------------------


EventModule.java:

--------------------------------------

public final class EventModule implements StatefulFunctionModule {

     @Override
     public void configure(Map<String, String> globalConfiguration,
Binder binder) {

         EventIO ioModule = new EventIO("kafka:9092");

         binder.bindIngress(ioModule.getIngressSpec());

         binder.bindIngressRouter(EventIO.EVENT_INGRESS_ID, new
EventRouter());

         binder.bindEgress(ioModule.getEventCountJavaEgressSpec());
binder.bindEgress(ioModule.getEventCountPythonEgressSpec());

binder.bindFunctionProvider(EventCountJavaStatefulFunction.TYPE, unused
-> new EventCountJavaStatefulFunction());
     }
}

------------------------------------------


EventRouter.java

-------------------------------------------

final class EventRouter implements Router<Event> {

     static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterPython");
     static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterJava");

     @Override
     public void route(Event event, Downstream<Event> downstream) {
         downstream.forward(
                 JAVA_EVENT_COUNTER_TYPE,
                 "count",
                 event)
         ;
         downstream.forward(
                 new Address(
                         PYTHON_EVENT_COUNTER_TYPE,
                         "count"
                 ),
                 event
         );
     }
}

------------------------------------------


worker.py

-------------------------------------------

@functions.bind("demo/eventCounterPython")
def handle_event(context, _):
     state = context.state('count').unpack(GlobalEventCount)
     if not state:
         state = GlobalEventCount()
         state.value = 1
     else:
         state.value += 1
     context.state('count').pack(state)

     egress_message = kafka_egress_record(topic="eventCountPython",
value=state)
     context.pack_and_send_egress("demo/eventCountPython", egress_message)

------------------------------------------


ERROR MESSAGE

------------------------------------------

worker_1         | 2020-07-02 14:38:35,436 INFO
org.apache.flink.runtime.taskmanager.Task                     -
feedback-union -> functions -> (Sink: demo-eventCountPython-egress,
Sink: demo-eventCountJava-egress) (1/1)
(15ea8a98c215693d6e7d5c80f3e5f8b6) switched from RUNNING to FAILED.
worker_1         | java.lang.IllegalArgumentException: Unknown provider
for type FunctionType(demo, eventCounterPython)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:44)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:63)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:56)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:135)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:130)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:82)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
worker_1         |     at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
worker_1         |     at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
worker_1         |     at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:174)
worker_1         |     at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:80)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
worker_1         |     at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
worker_1         |     at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
worker_1         |     at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
worker_1         |     at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
worker_1         |     at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:533

------------------------------------------


Best regards and thanks for taking the time to read all this,

Jan

-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501
Reply | Threaded
Open this post in threaded view
|

Re: Stateful Functions: Routing to remote functions

Jan Brusch

Hi igal,

thanks for your comprehensive reply!

As for 1. I will try and create a minimal reproduction of the case and share the code with you. It might be a few days until I get around to do it.

As for 2. I will definitely give this a try. From the looks of it this seems to be the solution and this was the error in my thinking: Sending unwrapped messages to external functions...


Best regards and many thanks!

Jan

On 06.07.20 14:11, Igal Shilman wrote:
Hi Jan,

Two followup questions:

1. Looking at the stack trace provided in your email, it does seem like the function type is unavailable, and I'd like to follow up on that: can you please share your Dockerfile, so
we have the complete picture. If you are not comfortable sharing that, then you can please try to execute into the container and manually validate that the module.yaml is present 
both on the "worker" image and the "master" image, and it defines the remote function name correctly?

2. In your original email, the provided router does not route messages of type Any, but it actually 
forwards them as-in, the remote functions API requires that the message being sent to the remote function
is of type Any.  Can you try something like this:

final class EventRouter implements Router<com.google.protobuf.Message > {

     static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterPython");
     static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterJava");
     @Override
     public void route(com.google.protobuf.Message event, Downstream<com.google.protobuf.Message> downstream) {
         downstream.forward(
                 JAVA_EVENT_COUNTER_TYPE,
                 "count",
                 event)
         ;
         downstream.forward(
                 new Address(
                         PYTHON_EVENT_COUNTER_TYPE,
                         "count"
                 ),
                 Any.pack(event)
         );
     }
}


In addition you would have to change the definition of your ingress identifier to have a produced type of com.google.protobuf.Message 
instead of an Event.


Good luck!
Igal

On Fri, Jul 3, 2020 at 10:09 PM Jan Brusch <[hidden email]> wrote:

Hi Igal,

thanks for your reply. Initially I thought the same thing, but it turns out I am able to call the remote function from an embedded "wrapper" function using the exact same setup (Relevant Code below). So that's one kind of solution to that Problem. But to me it seems like it's a bit of a hack and not the idiomatic way to solve this...

From my understanding of the address based communication within Flink Stateful Functions, I feel like it should be possible to call that function from the router directly. But I am probably either using the Router wrong or misunderstand some of the ideas behind address based communication...


EventRouter.java

------------------------------------------------------------------------------------

final class EventRouter implements Router<Event> {

  @Override
  public void route(Event event, Downstream<Event> downstream) {
    downstream.forward(EventCounterWrapper.TYPE, "_", event);
  }
}

--------------------------------------------------------------------------------------


EventCounterWrapper.java

---------------------------------------------------------------------------------------

public class EventCounterWrapper implements StatefulFunction {

    static final FunctionType TYPE = new FunctionType("demo", "eventCounterWrapper");
    public static final FunctionType REMOTE_FUNCTION_TYPE = new FunctionType("demo/external", "eventCounterPython");

    @Override
    public void invoke(Context context, Object input) {
        if (input instanceof Event) {
            Event event = (Event) input;
            Any message = Any.pack(event);
            context.send(REMOTE_FUNCTION_TYPE, "_", message);
        }

        if (input instanceof Any) {
            final EventCount eventCount;
            try {
                eventCount = ((Any) input).unpack(EventCount.class);
            } catch (InvalidProtocolBufferException e) {
                throw new RuntimeException("Unexpected type", e);
            }
            context.send(SessionIO.EVENT_COUNT_EGRESS_ID, eventCount);
        }
    }
}

-----------------------------------------------------------------------------------


worker.py
----------------------------------------------------
@functions.bind("demo/external/eventCounterPython")
def handle_event(context, _):
     state = context.state('count').unpack(EventCount)
     if not state:
         state = EventCount()
         state.count = 1
     else:
         state.count += 1
     context.state('count').pack(state)
   

    envelope = Any()
    envelope.Pack(state)
    context.reply(envelope)
----------------------------------------------------


module.yaml

---------------------------------------------------------

spec:
    functions:
      - function:
          meta:
            kind: http
            type: demo/external/eventCounterPython
          spec:
            endpoint: http://python-worker:8000/statefun
            states:
              - count

---------------------------------------------------------


Best Regards

Jan


On 03.07.20 17:33, Igal Shilman wrote:
Hi Jan,
Judging by the exception message it seems like the function type "demo/eventCounterPython" is not known to stateful functions.
This could happen if the module.yaml (provided in your email) was accidentally excluded from the resulting artifact (Docker image or a jar-with-dependencies) 
Can you please verify that the module.yaml is present at runtime?

Kind regards,
Igal.

On Thu, Jul 2, 2020 at 7:29 PM Jan Brusch <[hidden email]> wrote:
Hi,

based on Gordons excellent advice on how to handle JSON messages with
remote functions
(https://www.mail-archive.com/user@.../msg34385.html) I was
able to:

1) Deserialize JSON Messages from a Kafka Stream

2) Route the message to an embedded StatefulFunction

3) Serialize the Resulting Protobuf Message to JSON and write it back to
a Kafka Stream

Now, instead of 2), I would like to route the message to a remote
function, handle it there and write it back to the Stream as json via
the serializer defined in Java. From my understanding all this should
work through address based communication within the Stateful Functions
Application. Unfortunately I don't get it to work. See the relevant code
and error message below. The rest of the project structure basically
follows the walkthrough example from the documentation. Any ideas or
input would be greatly appreciated.


module.yaml:

--------------------------------

...

spec:
     functions:
       - function:
           meta:
             kind: http
             type: demo/eventCounterPython
           spec:
             endpoint: http://python-worker:8000/statefun
             states:
               - name: count
             maxNumBatchRequests: 500

...

---------------------------------------


EventIO.java

---------------------------------------------

final class EventIO {

   static final EgressIdentifier<GlobalEventCount>
EVENT_COUNT_PYTHON_EGRESS_ID =
       new EgressIdentifier<>("demo", "eventCountPython",
GlobalEventCount.class);

     ....

   EgressSpec<GlobalEventCount> getEventCountPythonEgressSpec() {
     return KafkaEgressBuilder.forIdentifier(EVENT_COUNT_PYTHON_EGRESS_ID)
         .withKafkaAddress(kafkaAddress)
.withSerializer(GlobalEventCountPythonKafkaSerializer.class)
         .build();
   }

   ....

   private static final class GlobalEventCountPythonKafkaSerializer
implements KafkaEgressSerializer<GlobalEventCount> {

     private static final long serialVersionUID = 1L;

     @Override
     public ProducerRecord<byte[], byte[]> serialize(GlobalEventCount
eventCount) {
       byte[] key = null;
       byte[] value = null;

       try {
         String json = JsonFormat
                 .printer()
                 .includingDefaultValueFields()
                 .print(eventCount);
         value = json.getBytes();
       } catch (InvalidProtocolBufferException e) {
         e.printStackTrace();
       }

       return new ProducerRecord<>("eventCountPython", key, value);
     }
   }

}

--------------------------------------------


EventModule.java:

--------------------------------------

public final class EventModule implements StatefulFunctionModule {

     @Override
     public void configure(Map<String, String> globalConfiguration,
Binder binder) {

         EventIO ioModule = new EventIO("kafka:9092");

         binder.bindIngress(ioModule.getIngressSpec());

         binder.bindIngressRouter(EventIO.EVENT_INGRESS_ID, new
EventRouter());

         binder.bindEgress(ioModule.getEventCountJavaEgressSpec());
binder.bindEgress(ioModule.getEventCountPythonEgressSpec());

binder.bindFunctionProvider(EventCountJavaStatefulFunction.TYPE, unused
-> new EventCountJavaStatefulFunction());
     }
}

------------------------------------------


EventRouter.java

-------------------------------------------

final class EventRouter implements Router<Event> {

     static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterPython");
     static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterJava");

     @Override
     public void route(Event event, Downstream<Event> downstream) {
         downstream.forward(
                 JAVA_EVENT_COUNTER_TYPE,
                 "count",
                 event)
         ;
         downstream.forward(
                 new Address(
                         PYTHON_EVENT_COUNTER_TYPE,
                         "count"
                 ),
                 event
         );
     }
}

------------------------------------------


worker.py

-------------------------------------------

@functions.bind("demo/eventCounterPython")
def handle_event(context, _):
     state = context.state('count').unpack(GlobalEventCount)
     if not state:
         state = GlobalEventCount()
         state.value = 1
     else:
         state.value += 1
     context.state('count').pack(state)

     egress_message = kafka_egress_record(topic="eventCountPython",
value=state)
     context.pack_and_send_egress("demo/eventCountPython", egress_message)

------------------------------------------


ERROR MESSAGE

------------------------------------------

worker_1         | 2020-07-02 14:38:35,436 INFO
org.apache.flink.runtime.taskmanager.Task                     -
feedback-union -> functions -> (Sink: demo-eventCountPython-egress,
Sink: demo-eventCountJava-egress) (1/1)
(15ea8a98c215693d6e7d5c80f3e5f8b6) switched from RUNNING to FAILED.
worker_1         | java.lang.IllegalArgumentException: Unknown provider
for type FunctionType(demo, eventCounterPython)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:44)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:63)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:56)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:135)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:130)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:82)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
worker_1         |     at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
worker_1         |     at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
worker_1         |     at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:174)
worker_1         |     at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:80)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
worker_1         |     at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
worker_1         |     at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
worker_1         |     at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
worker_1         |     at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
worker_1         |     at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:533

------------------------------------------


Best regards and thanks for taking the time to read all this,

Jan

-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501
-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501
Reply | Threaded
Open this post in threaded view
|

Re: Stateful Functions: Routing to remote functions

Jan Brusch
In reply to this post by Igal Shilman

Hi Igal,

I got around to toying around with your proposed solutions today. Unfortunately I didn't get it to work. However, you asked me to share the code and I prepared an example that provides a minimal reproduction of my use case and the corresponding error.

Please find it here: https://github.com/Bruschkov/statefun-remote-counter-example

As I said in an earlier mail: I found a way to get my desired setup to work via a wrapper function. However, I am still interested if and how a "direct routing" solution would work.

If you find the time to look at the code and give me some feedback, I would really appreciate it.


Best regards

Jan

On 06.07.20 14:11, Igal Shilman wrote:
Hi Jan,

Two followup questions:

1. Looking at the stack trace provided in your email, it does seem like the function type is unavailable, and I'd like to follow up on that: can you please share your Dockerfile, so
we have the complete picture. If you are not comfortable sharing that, then you can please try to execute into the container and manually validate that the module.yaml is present 
both on the "worker" image and the "master" image, and it defines the remote function name correctly?

2. In your original email, the provided router does not route messages of type Any, but it actually 
forwards them as-in, the remote functions API requires that the message being sent to the remote function
is of type Any.  Can you try something like this:

final class EventRouter implements Router<com.google.protobuf.Message > {

     static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterPython");
     static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterJava");
     @Override
     public void route(com.google.protobuf.Message event, Downstream<com.google.protobuf.Message> downstream) {
         downstream.forward(
                 JAVA_EVENT_COUNTER_TYPE,
                 "count",
                 event)
         ;
         downstream.forward(
                 new Address(
                         PYTHON_EVENT_COUNTER_TYPE,
                         "count"
                 ),
                 Any.pack(event)
         );
     }
}


In addition you would have to change the definition of your ingress identifier to have a produced type of com.google.protobuf.Message 
instead of an Event.


Good luck!
Igal

On Fri, Jul 3, 2020 at 10:09 PM Jan Brusch <[hidden email]> wrote:

Hi Igal,

thanks for your reply. Initially I thought the same thing, but it turns out I am able to call the remote function from an embedded "wrapper" function using the exact same setup (Relevant Code below). So that's one kind of solution to that Problem. But to me it seems like it's a bit of a hack and not the idiomatic way to solve this...

From my understanding of the address based communication within Flink Stateful Functions, I feel like it should be possible to call that function from the router directly. But I am probably either using the Router wrong or misunderstand some of the ideas behind address based communication...


EventRouter.java

------------------------------------------------------------------------------------

final class EventRouter implements Router<Event> {

  @Override
  public void route(Event event, Downstream<Event> downstream) {
    downstream.forward(EventCounterWrapper.TYPE, "_", event);
  }
}

--------------------------------------------------------------------------------------


EventCounterWrapper.java

---------------------------------------------------------------------------------------

public class EventCounterWrapper implements StatefulFunction {

    static final FunctionType TYPE = new FunctionType("demo", "eventCounterWrapper");
    public static final FunctionType REMOTE_FUNCTION_TYPE = new FunctionType("demo/external", "eventCounterPython");

    @Override
    public void invoke(Context context, Object input) {
        if (input instanceof Event) {
            Event event = (Event) input;
            Any message = Any.pack(event);
            context.send(REMOTE_FUNCTION_TYPE, "_", message);
        }

        if (input instanceof Any) {
            final EventCount eventCount;
            try {
                eventCount = ((Any) input).unpack(EventCount.class);
            } catch (InvalidProtocolBufferException e) {
                throw new RuntimeException("Unexpected type", e);
            }
            context.send(SessionIO.EVENT_COUNT_EGRESS_ID, eventCount);
        }
    }
}

-----------------------------------------------------------------------------------


worker.py
----------------------------------------------------
@functions.bind("demo/external/eventCounterPython")
def handle_event(context, _):
     state = context.state('count').unpack(EventCount)
     if not state:
         state = EventCount()
         state.count = 1
     else:
         state.count += 1
     context.state('count').pack(state)
   

    envelope = Any()
    envelope.Pack(state)
    context.reply(envelope)
----------------------------------------------------


module.yaml

---------------------------------------------------------

spec:
    functions:
      - function:
          meta:
            kind: http
            type: demo/external/eventCounterPython
          spec:
            endpoint: http://python-worker:8000/statefun
            states:
              - count

---------------------------------------------------------


Best Regards

Jan


On 03.07.20 17:33, Igal Shilman wrote:
Hi Jan,
Judging by the exception message it seems like the function type "demo/eventCounterPython" is not known to stateful functions.
This could happen if the module.yaml (provided in your email) was accidentally excluded from the resulting artifact (Docker image or a jar-with-dependencies) 
Can you please verify that the module.yaml is present at runtime?

Kind regards,
Igal.

On Thu, Jul 2, 2020 at 7:29 PM Jan Brusch <[hidden email]> wrote:
Hi,

based on Gordons excellent advice on how to handle JSON messages with
remote functions
(https://www.mail-archive.com/user@.../msg34385.html) I was
able to:

1) Deserialize JSON Messages from a Kafka Stream

2) Route the message to an embedded StatefulFunction

3) Serialize the Resulting Protobuf Message to JSON and write it back to
a Kafka Stream

Now, instead of 2), I would like to route the message to a remote
function, handle it there and write it back to the Stream as json via
the serializer defined in Java. From my understanding all this should
work through address based communication within the Stateful Functions
Application. Unfortunately I don't get it to work. See the relevant code
and error message below. The rest of the project structure basically
follows the walkthrough example from the documentation. Any ideas or
input would be greatly appreciated.


module.yaml:

--------------------------------

...

spec:
     functions:
       - function:
           meta:
             kind: http
             type: demo/eventCounterPython
           spec:
             endpoint: http://python-worker:8000/statefun
             states:
               - name: count
             maxNumBatchRequests: 500

...

---------------------------------------


EventIO.java

---------------------------------------------

final class EventIO {

   static final EgressIdentifier<GlobalEventCount>
EVENT_COUNT_PYTHON_EGRESS_ID =
       new EgressIdentifier<>("demo", "eventCountPython",
GlobalEventCount.class);

     ....

   EgressSpec<GlobalEventCount> getEventCountPythonEgressSpec() {
     return KafkaEgressBuilder.forIdentifier(EVENT_COUNT_PYTHON_EGRESS_ID)
         .withKafkaAddress(kafkaAddress)
.withSerializer(GlobalEventCountPythonKafkaSerializer.class)
         .build();
   }

   ....

   private static final class GlobalEventCountPythonKafkaSerializer
implements KafkaEgressSerializer<GlobalEventCount> {

     private static final long serialVersionUID = 1L;

     @Override
     public ProducerRecord<byte[], byte[]> serialize(GlobalEventCount
eventCount) {
       byte[] key = null;
       byte[] value = null;

       try {
         String json = JsonFormat
                 .printer()
                 .includingDefaultValueFields()
                 .print(eventCount);
         value = json.getBytes();
       } catch (InvalidProtocolBufferException e) {
         e.printStackTrace();
       }

       return new ProducerRecord<>("eventCountPython", key, value);
     }
   }

}

--------------------------------------------


EventModule.java:

--------------------------------------

public final class EventModule implements StatefulFunctionModule {

     @Override
     public void configure(Map<String, String> globalConfiguration,
Binder binder) {

         EventIO ioModule = new EventIO("kafka:9092");

         binder.bindIngress(ioModule.getIngressSpec());

         binder.bindIngressRouter(EventIO.EVENT_INGRESS_ID, new
EventRouter());

         binder.bindEgress(ioModule.getEventCountJavaEgressSpec());
binder.bindEgress(ioModule.getEventCountPythonEgressSpec());

binder.bindFunctionProvider(EventCountJavaStatefulFunction.TYPE, unused
-> new EventCountJavaStatefulFunction());
     }
}

------------------------------------------


EventRouter.java

-------------------------------------------

final class EventRouter implements Router<Event> {

     static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterPython");
     static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterJava");

     @Override
     public void route(Event event, Downstream<Event> downstream) {
         downstream.forward(
                 JAVA_EVENT_COUNTER_TYPE,
                 "count",
                 event)
         ;
         downstream.forward(
                 new Address(
                         PYTHON_EVENT_COUNTER_TYPE,
                         "count"
                 ),
                 event
         );
     }
}

------------------------------------------


worker.py

-------------------------------------------

@functions.bind("demo/eventCounterPython")
def handle_event(context, _):
     state = context.state('count').unpack(GlobalEventCount)
     if not state:
         state = GlobalEventCount()
         state.value = 1
     else:
         state.value += 1
     context.state('count').pack(state)

     egress_message = kafka_egress_record(topic="eventCountPython",
value=state)
     context.pack_and_send_egress("demo/eventCountPython", egress_message)

------------------------------------------


ERROR MESSAGE

------------------------------------------

worker_1         | 2020-07-02 14:38:35,436 INFO
org.apache.flink.runtime.taskmanager.Task                     -
feedback-union -> functions -> (Sink: demo-eventCountPython-egress,
Sink: demo-eventCountJava-egress) (1/1)
(15ea8a98c215693d6e7d5c80f3e5f8b6) switched from RUNNING to FAILED.
worker_1         | java.lang.IllegalArgumentException: Unknown provider
for type FunctionType(demo, eventCounterPython)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:44)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:63)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:56)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:135)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:130)
worker_1         |     at
org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:82)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
worker_1         |     at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
worker_1         |     at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
worker_1         |     at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:174)
worker_1         |     at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:80)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
worker_1         |     at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
worker_1         |     at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
worker_1         |     at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
worker_1         |     at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
worker_1         |     at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
worker_1         |     at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:533

------------------------------------------


Best regards and thanks for taking the time to read all this,

Jan

-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501
-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501