Hi,
based on Gordons excellent advice on how to handle JSON messages with remote functions (https://www.mail-archive.com/user@.../msg34385.html) I was able to: 1) Deserialize JSON Messages from a Kafka Stream 2) Route the message to an embedded StatefulFunction 3) Serialize the Resulting Protobuf Message to JSON and write it back to a Kafka Stream Now, instead of 2), I would like to route the message to a remote function, handle it there and write it back to the Stream as json via the serializer defined in Java. From my understanding all this should work through address based communication within the Stateful Functions Application. Unfortunately I don't get it to work. See the relevant code and error message below. The rest of the project structure basically follows the walkthrough example from the documentation. Any ideas or input would be greatly appreciated. module.yaml: -------------------------------- ... spec: functions: - function: meta: kind: http type: demo/eventCounterPython spec: endpoint: http://python-worker:8000/statefun states: - name: count maxNumBatchRequests: 500 ... --------------------------------------- EventIO.java --------------------------------------------- final class EventIO { static final EgressIdentifier<GlobalEventCount> EVENT_COUNT_PYTHON_EGRESS_ID = new EgressIdentifier<>("demo", "eventCountPython", GlobalEventCount.class); .... EgressSpec<GlobalEventCount> getEventCountPythonEgressSpec() { return KafkaEgressBuilder.forIdentifier(EVENT_COUNT_PYTHON_EGRESS_ID) .withKafkaAddress(kafkaAddress) .withSerializer(GlobalEventCountPythonKafkaSerializer.class) .build(); } .... private static final class GlobalEventCountPythonKafkaSerializer implements KafkaEgressSerializer<GlobalEventCount> { private static final long serialVersionUID = 1L; @Override public ProducerRecord<byte[], byte[]> serialize(GlobalEventCount eventCount) { byte[] key = null; byte[] value = null; try { String json = JsonFormat .printer() .includingDefaultValueFields() .print(eventCount); value = json.getBytes(); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } return new ProducerRecord<>("eventCountPython", key, value); } } } -------------------------------------------- EventModule.java: -------------------------------------- public final class EventModule implements StatefulFunctionModule { @Override public void configure(Map<String, String> globalConfiguration, Binder binder) { EventIO ioModule = new EventIO("kafka:9092"); binder.bindIngress(ioModule.getIngressSpec()); binder.bindIngressRouter(EventIO.EVENT_INGRESS_ID, new EventRouter()); binder.bindEgress(ioModule.getEventCountJavaEgressSpec()); binder.bindEgress(ioModule.getEventCountPythonEgressSpec()); binder.bindFunctionProvider(EventCountJavaStatefulFunction.TYPE, unused -> new EventCountJavaStatefulFunction()); } } ------------------------------------------ EventRouter.java ------------------------------------------- final class EventRouter implements Router<Event> { static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new FunctionType("demo", "eventCounterPython"); static final FunctionType JAVA_EVENT_COUNTER_TYPE = new FunctionType("demo", "eventCounterJava"); @Override public void route(Event event, Downstream<Event> downstream) { downstream.forward( JAVA_EVENT_COUNTER_TYPE, "count", event) ; downstream.forward( new Address( PYTHON_EVENT_COUNTER_TYPE, "count" ), event ); } } ------------------------------------------ worker.py ------------------------------------------- @functions.bind("demo/eventCounterPython") def handle_event(context, _): state = context.state('count').unpack(GlobalEventCount) if not state: state = GlobalEventCount() state.value = 1 else: state.value += 1 context.state('count').pack(state) egress_message = kafka_egress_record(topic="eventCountPython", value=state) context.pack_and_send_egress("demo/eventCountPython", egress_message) ------------------------------------------ ERROR MESSAGE ------------------------------------------ worker_1 | 2020-07-02 14:38:35,436 INFO org.apache.flink.runtime.taskmanager.Task - feedback-union -> functions -> (Sink: demo-eventCountPython-egress, Sink: demo-eventCountJava-egress) (1/1) (15ea8a98c215693d6e7d5c80f3e5f8b6) switched from RUNNING to FAILED. worker_1 | java.lang.IllegalArgumentException: Unknown provider for type FunctionType(demo, eventCounterPython) worker_1 | at org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:44) worker_1 | at org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:63) worker_1 | at org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:56) worker_1 | at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73) worker_1 | at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50) worker_1 | at org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:135) worker_1 | at org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:130) worker_1 | at org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:82) worker_1 | at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550) worker_1 | at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527) worker_1 | at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487) worker_1 | at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) worker_1 | at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) worker_1 | at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:174) worker_1 | at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:80) worker_1 | at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) worker_1 | at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) worker_1 | at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) worker_1 | at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) worker_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310) worker_1 | at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) worker_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485) worker_1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469) worker_1 | at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) worker_1 | at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533 ------------------------------------------ Best regards and thanks for taking the time to read all this, Jan |
Hi Jan, Judging by the exception message it seems like the function type "demo/eventCounterPython" is not known to stateful functions. This could happen if the module.yaml (provided in your email) was accidentally excluded from the resulting artifact (Docker image or a jar-with-dependencies) Can you please verify that the module.yaml is present at runtime? Kind regards, Igal. On Thu, Jul 2, 2020 at 7:29 PM Jan Brusch <[hidden email]> wrote: Hi, |
Hi Igal, thanks for your reply. Initially I thought the same thing, but it
turns out I am able to call the remote function from an embedded
"wrapper" function using the exact same setup (Relevant Code
below). So that's one kind of solution to that Problem. But to me
it seems like it's a bit of a hack and not the idiomatic way to
solve this... From my understanding of the address based communication within
Flink Stateful Functions, I feel like it should be possible to
call that function from the router directly. But I am probably
either using the Router wrong or misunderstand some of the ideas
behind address based communication...
EventRouter.java ------------------------------------------------------------------------------------ final class EventRouter implements Router<Event> { --------------------------------------------------------------------------------------
EventCounterWrapper.java --------------------------------------------------------------------------------------- public class EventCounterWrapper implements StatefulFunction { static final FunctionType TYPE = new FunctionType("demo",
"eventCounterWrapper"); ----------------------------------------------------------------------------------- envelope = Any()
module.yaml --------------------------------------------------------- spec: ---------------------------------------------------------
Best Regards Jan
On 03.07.20 17:33, Igal Shilman wrote:
-- neuland – Büro für Informatik GmbH Konsul-Smidt-Str. 8g, 28217 Bremen Telefon (0421) 380107 57 Fax (0421) 380107 99 https://www.neuland-bfi.de https://twitter.com/neuland https://facebook.com/neulandbfi https://xing.com/company/neulandbfi Geschäftsführer: Thomas Gebauer, Jan Zander Registergericht: Amtsgericht Bremen, HRB 23395 HB USt-ID. DE 246585501 |
Hi Jan, Two followup questions: 1. Looking at the stack trace provided in your email, it does seem like the function type is unavailable, and I'd like to follow up on that: can you please share your Dockerfile, so we have the complete picture. If you are not comfortable sharing that, then you can please try to execute into the container and manually validate that the module.yaml is present both on the "worker" image and the "master" image, and it defines the remote function name correctly? 2. In your original email, the provided router does not route messages of type Any, but it actually forwards them as-in, the remote functions API requires that the message being sent to the remote function is of type Any. Can you try something like this: final class EventRouter implements Router<com.google.protobuf.Message > {
downstream.forward( In addition you would have to change the definition of your ingress identifier to have a produced type of com.google.protobuf.Message instead of an Event. Good luck! Igal On Fri, Jul 3, 2020 at 10:09 PM Jan Brusch <[hidden email]> wrote:
|
Hi igal, thanks for your comprehensive reply! As for 1. I will try and create a minimal reproduction of the case and share the code with you. It might be a few days until I get around to do it. As for 2. I will definitely give this a try. From the looks of it this seems to be the solution and this was the error in my thinking: Sending unwrapped messages to external functions...
Best regards and many thanks! Jan On 06.07.20 14:11, Igal Shilman wrote:
-- neuland – Büro für Informatik GmbH Konsul-Smidt-Str. 8g, 28217 Bremen Telefon (0421) 380107 57 Fax (0421) 380107 99 https://www.neuland-bfi.de https://twitter.com/neuland https://facebook.com/neulandbfi https://xing.com/company/neulandbfi Geschäftsführer: Thomas Gebauer, Jan Zander Registergericht: Amtsgericht Bremen, HRB 23395 HB USt-ID. DE 246585501 |
In reply to this post by Igal Shilman
Hi Igal, I got around to toying around with your proposed solutions today. Unfortunately I didn't get it to work. However, you asked me to share the code and I prepared an example that provides a minimal reproduction of my use case and the corresponding error. Please find it here: https://github.com/Bruschkov/statefun-remote-counter-example As I said in an earlier mail: I found a way to get my desired setup to work via a wrapper function. However, I am still interested if and how a "direct routing" solution would work. If you find the time to look at the code and give me some feedback, I would really appreciate it.
Best regards Jan On 06.07.20 14:11, Igal Shilman wrote:
-- neuland – Büro für Informatik GmbH Konsul-Smidt-Str. 8g, 28217 Bremen Telefon (0421) 380107 57 Fax (0421) 380107 99 https://www.neuland-bfi.de https://twitter.com/neuland https://facebook.com/neulandbfi https://xing.com/company/neulandbfi Geschäftsführer: Thomas Gebauer, Jan Zander Registergericht: Amtsgericht Bremen, HRB 23395 HB USt-ID. DE 246585501 |
Free forum by Nabble | Edit this page |