[Stateful Functions] Help for calling remote stateful function (written in Python)

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

[Stateful Functions] Help for calling remote stateful function (written in Python)

Bonino Dario

Dear List,

I am trying to call a sample stateful function defined in Python, using the Stateful Function Python SDK, from a Flink pipeline. I am building upon the examples provided for the  SDK for Flink DataStream Integration but I am currently stuck on a type cast issue that I am not able to overcome, even by looking at the flink-statefun sources. I am sure that I am probably doing something wrong.

In the flink pipeline (of which an excerpt is reported below), I load a set of users from a CSV file and create a Datastream<User> where User is a protobuf v3 generated class. Given this stream, the base idea is to forward the stream to a remote function (written in python using the sdk) that basically unpacks the user object, extracts the user id and provides it back as a String.


val REMOTE_GREET = FunctionType("com.me.try", "echo_user_id")
val GREETINGS = EgressIdentifier<String>("com.me.try", "out", String::class.java)

@JvmStatic
fun main(args: Array<String>) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    val usersCsv = env.readTextFile("input/users.csv")
    val users = createUsersStream(usersCsv).shuffle()

    val statefunConfig = StatefulFunctionsConfig.fromEnvironment(env)
    statefunConfig.factoryType = MessageFactoryType.WITH_PROTOBUF_PAYLOADS

    val usersIngress: DataStream<RoutableMessage> = users.map { user ->
        RoutableMessageBuilder.builder()
            .withTargetAddress(REMOTE_GREET, user.userId.toString())
            .withMessageBody(user)
            .build()
    }


    val predictEgress = StatefulFunctionDataStreamBuilder.builder("test")
        .withDataStreamAsIngress(usersIngress)
        .withRequestReplyRemoteFunction(
            RequestReplyFunctionBuilder
                .requestReplyFunctionBuilder(REMOTE_GREET, URI.create("http://127.0.0.1:8000/statefun"))
                .withMaxRequestDuration(Duration.ofSeconds(15))
                .withMaxNumBatchRequests(500)
        )
        .withEgressId(GREETINGS)
        .withConfiguration(statefunConfig)
        .build(env)

    val output = predictEgress.getDataStreamForEgressId(GREETINGS)

    output.print()
    env.execute("Hello stateful!!")
}

Unfortunately, while the Python function seems to be working (tests build by following the Ververica workshop repository about Stateful functions are up and consistently running) and it is listening at the provided address (http://127.0.0.1:8000/statefun), the Kotlin pipeline (above) fails with a type cast error, which occurs before actually calling the remote function, at line 90 of the org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction. The reported exception is:


Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) at jdk.internal.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException: An error occurred when attempting to invoke function FunctionType(com.me.try, echo_user_id). at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50) at org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:73) at org.apache.flink.statefun.flink.core.functions.FunctionActivation.applyNextPendingEnvelope(FunctionActivation.java:50) at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:61) at org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:161) at org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:146) at org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:186) at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.ClassCastException: class com.lambda.User cannot be cast to class org.apache.flink.statefun.sdk.reqreply.generated.TypedValue (com.lambda.User and org.apache.flink.statefun.sdk.reqreply.generated.TypedValue are in unnamed module of loader 'app') at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:90) at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48) ... 24 more Process finished with exit code 1


      According to the above exception, it appears that the object being
      sent is a plain User, which is not a TypedValue, while I was
      expecting that the first map operator used to define the
      userIngress stream should suffice to correctly set-up the data to
      be sent to the stateful function. Can you spot something I am
      doing wrong?

Waiting for a kind reply,
Best regards

Dario Bonino
-- 
Ing. Dario Bonino, Ph.D

e-m@il: [hidden email] 
www: https://www.linkedin.com/in/dariobonino
<foaf:Person>
	<foaf:firstName>Dario</foaf:firstName>
	<foaf:surname>Bonino</foaf:surname>
	<foaf:msnChatID>[hidden email]</foaf:msnChatID>
</foaf:Person> 
<script type="application/ld+json"> { "@context": "http://schema.org", "@type": "Person", "name": "Dario Bonino", "jobTitle": "POst-doc Researcher", "affiliation": "PHPower s.r.l.", "additionalName": "Dario", "url": "https://www.linkedin.com/in/dariobonino", } </script>
Reply | Threaded
Open this post in threaded view
|

Re: [Stateful Functions] Help for calling remote stateful function (written in Python)

Igal Shilman
Hello!

Your analysis is correct, indeed what is passed is whatever is being handed to withMessageBody(..).
Starting with StateFun 3.0, if you need to send a message to a remote function the message needs to be a TypedValue.

You can create an instance of TypedValue manually, or you can add a dependency on the Java SDK and use the MessageBuilder to
extract TypedValues.

I think that you are right, the DataStream <-> Remote functions could be a little bit improved, I will file a JIRA issue
for that!

A side question is there a particular reason that you chose to use the DataStream SDK v.s the other deployment options?

Thanks,
Igal.





On Tue, Apr 27, 2021 at 5:31 PM Bonino Dario <[hidden email]> wrote:

Dear List,

I am trying to call a sample stateful function defined in Python, using the Stateful Function Python SDK, from a Flink pipeline. I am building upon the examples provided for the  SDK for Flink DataStream Integration but I am currently stuck on a type cast issue that I am not able to overcome, even by looking at the flink-statefun sources. I am sure that I am probably doing something wrong.

In the flink pipeline (of which an excerpt is reported below), I load a set of users from a CSV file and create a Datastream<User> where User is a protobuf v3 generated class. Given this stream, the base idea is to forward the stream to a remote function (written in python using the sdk) that basically unpacks the user object, extracts the user id and provides it back as a String.


val REMOTE_GREET = FunctionType("com.me.try", "echo_user_id")
val GREETINGS = EgressIdentifier<String>("com.me.try", "out", String::class.java)

@JvmStatic
fun main(args: Array<String>) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    val usersCsv = env.readTextFile("input/users.csv")
    val users = createUsersStream(usersCsv).shuffle()

    val statefunConfig = StatefulFunctionsConfig.fromEnvironment(env)
    statefunConfig.factoryType = MessageFactoryType.WITH_PROTOBUF_PAYLOADS

    val usersIngress: DataStream<RoutableMessage> = users.map { user ->
        RoutableMessageBuilder.builder()
            .withTargetAddress(REMOTE_GREET, user.userId.toString())
            .withMessageBody(user)
            .build()
    }


    val predictEgress = StatefulFunctionDataStreamBuilder.builder("test")
        .withDataStreamAsIngress(usersIngress)
        .withRequestReplyRemoteFunction(
            RequestReplyFunctionBuilder
                .requestReplyFunctionBuilder(REMOTE_GREET, URI.create("http://127.0.0.1:8000/statefun"))
                .withMaxRequestDuration(Duration.ofSeconds(15))
                .withMaxNumBatchRequests(500)
        )
        .withEgressId(GREETINGS)
        .withConfiguration(statefunConfig)
        .build(env)

    val output = predictEgress.getDataStreamForEgressId(GREETINGS)

    output.print()
    env.execute("Hello stateful!!")
}

Unfortunately, while the Python function seems to be working (tests build by following the Ververica workshop repository about Stateful functions are up and consistently running) and it is listening at the provided address (http://127.0.0.1:8000/statefun), the Kotlin pipeline (above) fails with a type cast error, which occurs before actually calling the remote function, at line 90 of the org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction. The reported exception is:


Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) at jdk.internal.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException: An error occurred when attempting to invoke function FunctionType(com.me.try, echo_user_id). at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50) at org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:73) at org.apache.flink.statefun.flink.core.functions.FunctionActivation.applyNextPendingEnvelope(FunctionActivation.java:50) at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:61) at org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:161) at org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:146) at org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:186) at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.ClassCastException: class com.lambda.User cannot be cast to class org.apache.flink.statefun.sdk.reqreply.generated.TypedValue (com.lambda.User and org.apache.flink.statefun.sdk.reqreply.generated.TypedValue are in unnamed module of loader 'app') at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:90) at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48) ... 24 more Process finished with exit code 1


      According to the above exception, it appears that the object being
      sent is a plain User, which is not a TypedValue, while I was
      expecting that the first map operator used to define the
      userIngress stream should suffice to correctly set-up the data to
      be sent to the stateful function. Can you spot something I am
      doing wrong?

Waiting for a kind reply,
Best regards

Dario Bonino
-- 
Ing. Dario Bonino, Ph.D

e-m@il: [hidden email] 
www: https://www.linkedin.com/in/dariobonino
<foaf:Person>
	<foaf:firstName>Dario</foaf:firstName>
	<foaf:surname>Bonino</foaf:surname>
	<foaf:msnChatID>[hidden email]</foaf:msnChatID>
</foaf:Person> 
Reply | Threaded
Open this post in threaded view
|

Re: [Stateful Functions] Help for calling remote stateful function (written in Python)

Bonino Dario

Dear Igal, dear List

Thank you very much for your reply. Your advice was crucial to overcome the issue. I have now created a TypedValue manually and successfully managed to communicate with the remote function in Python. Nevertheless, I am still facing a strange behavior regarding the invocation of the remote function.

I load messages to send to the remote function from a test CSV file. If the number of messages is high, i.e., over 6k my pipeline receives the function output as expected unless that the last entries of the file (3 or 4, depending on the run) are missing, even if they appear to be processed on the Python side. However, if I reduce the number of objects sent to the function, say to 1 or 2, I do not see any output from the Kotlin pipeline (of which an excerpt is reported it below).

private val REMOTE_GREET = FunctionType("com.me.try", "echo_user_id")
        private val GREETINGS = EgressIdentifier<TypedValue>(
            "
com.me.try",
            "out",
            TypedValue::class.java
        )

        @JvmStatic
        fun main(args: Array<String>) {
            val env = StreamExecutionEnvironment.getExecutionEnvironment()
            env.parallelism = 1
            val usersCsv = env.readTextFile("input/users.csv")
            val users = createUsersStream(usersCsv)
            val statefunConfig = StatefulFunctionsConfig.fromEnvironment(env)
            statefunConfig.factoryType = MessageFactoryType.WITH_KRYO_PAYLOADS
            val usersIngress: DataStream<RoutableMessage> = users.map { payload ->
                RoutableMessageBuilder.builder()
                    .withTargetAddress(REMOTE_GREET, "test")
                    .withMessageBody(
                        payload.f1
                    )
                    .build()
            }

            val predictEgress = StatefulFunctionDataStreamBuilder.builder("test")
                .withDataStreamAsIngress(usersIngress)
                .withRequestReplyRemoteFunction(
                    RequestReplyFunctionBuilder
                        .requestReplyFunctionBuilder(REMOTE_GREET, URI.create("http://127.0.0.1:8000/statefun"))
                        .withMaxRequestDuration(Duration.ofSeconds(15))
                        .withMaxNumBatchRequests(2)

                )
                .withEgressId(GREETINGS)
                .withConfiguration(statefunConfig)
                .build(env)

            val output = predictEgress.getDataStreamForEgressId(GREETINGS)
            val convertedOut = output.map { typedValue -> com.google.protobuf.StringValue.parseFrom(typedValue.value) }
            convertedOut.print()
            env.execute("Hello stateful!!")
        }

        private fun createUsersStream(usersCsv: DataStreamSource<String>): DataStream<Tuple2<Int, TypedValue>> {
            return usersCsv.flatMap { value, out ->
                val fields = value.split("\t")
                val random = 0
                val user = User.newBuilder().apply {
                    this.userId = fields[1].toInt()
                    this.gender = fields[2]
                    this.zipcode = fields[3].toInt()
                    this.ageDesc = fields[4]
                    this.occDesc = fields[5]
                }.build()
                for (i in 0..random) {
                    val ub = TypedValue.newBuilder()
                        .setValue(user.toByteString())
                        .setTypename("
com.me.try/user")
                        .setHasValue(true)
                        .build()
                    out.collect(Tuple2(user.userId, ub))
                }
            }
        }
    }

By investigating what happens on the Python side, it appears that in the case of few messages the state spec is missing from the received request, while being specified in the Python function binding (as reported below),

@functions.bind(typename='com.me.try/echo_user_id',
                specs=[ValueSpec(name='count', type=IntType)])

thus causing the handle_async method to exit before actually calling the function implementation (at line 213), see excerpt below:

 if res.missing_specs:
            pb_from_function = collect_failure(res.missing_specs)
            return pb_from_function.SerializeToString()


Actually, by looking at the RequestReplyFunction.java code, on the Kotlin pipeline side, the inner PersistedRemoteFunctionValues managedStates, is empty when requests are issued, while it seems to me (tell me if I am wrong) that it should not be empty. Searching for discrepancies between my implementation and the examples reported in the documentation (see below) I have identified a single call missing in my code: the .withPersistedState("<state_name>"). Unfortunately, however, the call is missing in the statefun release 3.0.0  as the corresponding method of the RequestReplyFunctionBuilder is missing.

StatefulFunctionEgressStreams egresses =
    StatefulFunctionDataStreamBuilder.builder("example")
        .withDataStreamAsIngress(namesIngress)
        .withRequestReplyRemoteFunction(
            RequestReplyFunctionBuilder.requestReplyFunctionBuilder(
                    REMOTE_GREET, URI.create("http://localhost:5000/statefun"))
                .withPersistedState("seen_count")
                .withMaxRequestDuration(Duration.ofSeconds(15))
                .withMaxNumBatchRequests(500))
        .withEgressId(GREETINGS)
        .build(env);


Am I on the right track? If yes, could you please tell me how to perform the operation in the release 3.0.0? If you spotted something wrong in what I have done, could you please tell me where I am failing to do the right steps?

Thank you very much,

Best regards

Dario Bonino

On 4/27/21 6:05 PM, Igal Shilman wrote:
Hello!

Your analysis is correct, indeed what is passed is whatever is being handed to withMessageBody(..).
Starting with StateFun 3.0, if you need to send a message to a remote function the message needs to be a TypedValue.

You can create an instance of TypedValue manually, or you can add a dependency on the Java SDK and use the MessageBuilder to
extract TypedValues.

I think that you are right, the DataStream <-> Remote functions could be a little bit improved, I will file a JIRA issue
for that!

A side question is there a particular reason that you chose to use the DataStream SDK v.s the other deployment options?

Thanks,
Igal.





On Tue, Apr 27, 2021 at 5:31 PM Bonino Dario <[hidden email]> wrote:

Dear List,

I am trying to call a sample stateful function defined in Python, using the Stateful Function Python SDK, from a Flink pipeline. I am building upon the examples provided for the  SDK for Flink DataStream Integration but I am currently stuck on a type cast issue that I am not able to overcome, even by looking at the flink-statefun sources. I am sure that I am probably doing something wrong.

In the flink pipeline (of which an excerpt is reported below), I load a set of users from a CSV file and create a Datastream<User> where User is a protobuf v3 generated class. Given this stream, the base idea is to forward the stream to a remote function (written in Python using the sdk) that basically unpacks the user object, extracts the user id and provides it back as a String.


val REMOTE_GREET = FunctionType("com.me.try", "echo_user_id")
val GREETINGS = EgressIdentifier<String>("com.me.try", "out", String::class.java)

@JvmStatic
fun main(args: Array<String>) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    val usersCsv = env.readTextFile("input/users.csv")
    val users = createUsersStream(usersCsv).shuffle()

    val statefunConfig = StatefulFunctionsConfig.fromEnvironment(env)
    statefunConfig.factoryType = MessageFactoryType.WITH_PROTOBUF_PAYLOADS

    val usersIngress: DataStream<RoutableMessage> = users.map { user ->
        RoutableMessageBuilder.builder()
            .withTargetAddress(REMOTE_GREET, user.userId.toString())
            .withMessageBody(user)
            .build()
    }


    val predictEgress = StatefulFunctionDataStreamBuilder.builder("test")
        .withDataStreamAsIngress(usersIngress)
        .withRequestReplyRemoteFunction(
            RequestReplyFunctionBuilder
                .requestReplyFunctionBuilder(REMOTE_GREET, URI.create("http://127.0.0.1:8000/statefun"))
                .withMaxRequestDuration(Duration.ofSeconds(15))
                .withMaxNumBatchRequests(500)
        )
        .withEgressId(GREETINGS)
        .withConfiguration(statefunConfig)
        .build(env)

    val output = predictEgress.getDataStreamForEgressId(GREETINGS)

    output.print()
    env.execute("Hello stateful!!")
}

Unfortunately, while the Python function seems to be working (tests build by following the Ververica workshop repository about Stateful functions are up and consistently running) and it is listening at the provided address (http://127.0.0.1:8000/statefun), the Kotlin pipeline (above) fails with a type cast error, which occurs before actually calling the remote function, at line 90 of the org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction. The reported exception is:


Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) at jdk.internal.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException: An error occurred when attempting to invoke function FunctionType(com.me.try, echo_user_id). at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50) at org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:73) at org.apache.flink.statefun.flink.core.functions.FunctionActivation.applyNextPendingEnvelope(FunctionActivation.java:50) at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:61) at org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:161) at org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:146) at org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:186) at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.ClassCastException: class com.lambda.User cannot be cast to class org.apache.flink.statefun.sdk.reqreply.generated.TypedValue (com.lambda.User and org.apache.flink.statefun.sdk.reqreply.generated.TypedValue are in unnamed module of loader 'app') at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:90) at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48) ... 24 more Process finished with exit code 1

According to the above exception, it appears that the object being sent is a plain User, which is not a TypedValue, while I was expecting that the first map operator used to define the userIngress stream should suffice to correctly set-up the data to be sent to the stateful function. Can you spot something I am doing wrong?

Waiting for a kind reply,
Best regards

Dario Bonino
-- 
Ing. Dario Bonino, Ph.D

e-m@il: [hidden email] 
www: https://www.linkedin.com/in/dariobonino
<foaf:Person>
	<foaf:firstName>Dario</foaf:firstName>
	<foaf:surname>Bonino</foaf:surname>
	<foaf:msnChatID>[hidden email]</foaf:msnChatID>
</foaf:Person> 
-- 
Ing. Dario Bonino, Ph.D

e-m@il: [hidden email] 
www: https://www.linkedin.com/in/dariobonino
<foaf:Person>
	<foaf:firstName>Dario</foaf:firstName>
	<foaf:surname>Bonino</foaf:surname>
	<foaf:msnChatID>[hidden email]</foaf:msnChatID>
</foaf:Person> 
<script type="application/ld+json"> { "@context": "http://schema.org", "@type": "Person", "name": "Dario Bonino", "jobTitle": "POst-doc Researcher", "affiliation": "PHPower s.r.l.", "additionalName": "Dario", "url": "https://www.linkedin.com/in/dariobonino", } </script>
Reply | Threaded
Open this post in threaded view
|

Re: [Stateful Functions] Help for calling remote stateful function (written in Python)

Igal Shilman
Hi Bonino,

What you are experiencing is "expected" the situation is that the finite streaming job is completing too fast. StateFun is designed to run continuously, and
fault tolerance and corrections is achieved by checkpointing its internal state into a durable storage.
You can verify this by simply adding a sleep at the end of your Main method.
Having said that, I do think we need to improve the way finite jobs are currently supported in StateFun, and I while create a JIRA issue to track this down.

Thanks for reporting this!


On Wed, Apr 28, 2021 at 5:20 PM Bonino Dario <[hidden email]> wrote:

Dear Igal, dear List

Thank you very much for your reply. Your advice was crucial to overcome the issue. I have now created a TypedValue manually and successfully managed to communicate with the remote function in Python. Nevertheless, I am still facing a strange behavior regarding the invocation of the remote function.

I load messages to send to the remote function from a test CSV file. If the number of messages is high, i.e., over 6k my pipeline receives the function output as expected unless that the last entries of the file (3 or 4, depending on the run) are missing, even if they appear to be processed on the Python side. However, if I reduce the number of objects sent to the function, say to 1 or 2, I do not see any output from the Kotlin pipeline (of which an excerpt is reported it below).

private val REMOTE_GREET = FunctionType("com.me.try", "echo_user_id")
        private val GREETINGS = EgressIdentifier<TypedValue>(
            "
com.me.try",
            "out",
            TypedValue::class.java
        )

        @JvmStatic
        fun main(args: Array<String>) {
            val env = StreamExecutionEnvironment.getExecutionEnvironment()
            env.parallelism = 1
            val usersCsv = env.readTextFile("input/users.csv")
            val users = createUsersStream(usersCsv)
            val statefunConfig = StatefulFunctionsConfig.fromEnvironment(env)
            statefunConfig.factoryType = MessageFactoryType.WITH_KRYO_PAYLOADS
            val usersIngress: DataStream<RoutableMessage> = users.map { payload ->
                RoutableMessageBuilder.builder()
                    .withTargetAddress(REMOTE_GREET, "test")
                    .withMessageBody(
                        payload.f1
                    )
                    .build()
            }

            val predictEgress = StatefulFunctionDataStreamBuilder.builder("test")
                .withDataStreamAsIngress(usersIngress)
                .withRequestReplyRemoteFunction(
                    RequestReplyFunctionBuilder
                        .requestReplyFunctionBuilder(REMOTE_GREET, URI.create("http://127.0.0.1:8000/statefun"))
                        .withMaxRequestDuration(Duration.ofSeconds(15))
                        .withMaxNumBatchRequests(2)

                )
                .withEgressId(GREETINGS)
                .withConfiguration(statefunConfig)
                .build(env)

            val output = predictEgress.getDataStreamForEgressId(GREETINGS)
            val convertedOut = output.map { typedValue -> com.google.protobuf.StringValue.parseFrom(typedValue.value) }
            convertedOut.print()
            env.execute("Hello stateful!!")
        }

        private fun createUsersStream(usersCsv: DataStreamSource<String>): DataStream<Tuple2<Int, TypedValue>> {
            return usersCsv.flatMap { value, out ->
                val fields = value.split("\t")
                val random = 0
                val user = User.newBuilder().apply {
                    this.userId = fields[1].toInt()
                    this.gender = fields[2]
                    this.zipcode = fields[3].toInt()
                    this.ageDesc = fields[4]
                    this.occDesc = fields[5]
                }.build()
                for (i in 0..random) {
                    val ub = TypedValue.newBuilder()
                        .setValue(user.toByteString())
                        .setTypename("
com.me.try/user")
                        .setHasValue(true)
                        .build()
                    out.collect(Tuple2(user.userId, ub))
                }
            }
        }
    }

By investigating what happens on the Python side, it appears that in the case of few messages the state spec is missing from the received request, while being specified in the Python function binding (as reported below),

@functions.bind(typename='com.me.try/echo_user_id',
                specs=[ValueSpec(name='count', type=IntType)])

thus causing the handle_async method to exit before actually calling the function implementation (at line 213), see excerpt below:

 if res.missing_specs:
            pb_from_function = collect_failure(res.missing_specs)
            return pb_from_function.SerializeToString()


Actually, by looking at the RequestReplyFunction.java code, on the Kotlin pipeline side, the inner PersistedRemoteFunctionValues managedStates, is empty when requests are issued, while it seems to me (tell me if I am wrong) that it should not be empty. Searching for discrepancies between my implementation and the examples reported in the documentation (see below) I have identified a single call missing in my code: the .withPersistedState("<state_name>"). Unfortunately, however, the call is missing in the statefun release 3.0.0  as the corresponding method of the RequestReplyFunctionBuilder is missing.

StatefulFunctionEgressStreams egresses =
    StatefulFunctionDataStreamBuilder.builder("example")
        .withDataStreamAsIngress(namesIngress)
        .withRequestReplyRemoteFunction(
            RequestReplyFunctionBuilder.requestReplyFunctionBuilder(
                    REMOTE_GREET, URI.create("http://localhost:5000/statefun"))
                .withPersistedState("seen_count")
                .withMaxRequestDuration(Duration.ofSeconds(15))
                .withMaxNumBatchRequests(500))
        .withEgressId(GREETINGS)
        .build(env);


Am I on the right track? If yes, could you please tell me how to perform the operation in the release 3.0.0? If you spotted something wrong in what I have done, could you please tell me where I am failing to do the right steps?

Thank you very much,

Best regards

Dario Bonino

On 4/27/21 6:05 PM, Igal Shilman wrote:
Hello!

Your analysis is correct, indeed what is passed is whatever is being handed to withMessageBody(..).
Starting with StateFun 3.0, if you need to send a message to a remote function the message needs to be a TypedValue.

You can create an instance of TypedValue manually, or you can add a dependency on the Java SDK and use the MessageBuilder to
extract TypedValues.

I think that you are right, the DataStream <-> Remote functions could be a little bit improved, I will file a JIRA issue
for that!

A side question is there a particular reason that you chose to use the DataStream SDK v.s the other deployment options?

Thanks,
Igal.





On Tue, Apr 27, 2021 at 5:31 PM Bonino Dario <[hidden email]> wrote:

Dear List,

I am trying to call a sample stateful function defined in Python, using the Stateful Function Python SDK, from a Flink pipeline. I am building upon the examples provided for the  SDK for Flink DataStream Integration but I am currently stuck on a type cast issue that I am not able to overcome, even by looking at the flink-statefun sources. I am sure that I am probably doing something wrong.

In the flink pipeline (of which an excerpt is reported below), I load a set of users from a CSV file and create a Datastream<User> where User is a protobuf v3 generated class. Given this stream, the base idea is to forward the stream to a remote function (written in Python using the sdk) that basically unpacks the user object, extracts the user id and provides it back as a String.


val REMOTE_GREET = FunctionType("com.me.try", "echo_user_id")
val GREETINGS = EgressIdentifier<String>("com.me.try", "out", String::class.java)

@JvmStatic
fun main(args: Array<String>) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    val usersCsv = env.readTextFile("input/users.csv")
    val users = createUsersStream(usersCsv).shuffle()

    val statefunConfig = StatefulFunctionsConfig.fromEnvironment(env)
    statefunConfig.factoryType = MessageFactoryType.WITH_PROTOBUF_PAYLOADS

    val usersIngress: DataStream<RoutableMessage> = users.map { user ->
        RoutableMessageBuilder.builder()
            .withTargetAddress(REMOTE_GREET, user.userId.toString())
            .withMessageBody(user)
            .build()
    }


    val predictEgress = StatefulFunctionDataStreamBuilder.builder("test")
        .withDataStreamAsIngress(usersIngress)
        .withRequestReplyRemoteFunction(
            RequestReplyFunctionBuilder
                .requestReplyFunctionBuilder(REMOTE_GREET, URI.create("http://127.0.0.1:8000/statefun"))
                .withMaxRequestDuration(Duration.ofSeconds(15))
                .withMaxNumBatchRequests(500)
        )
        .withEgressId(GREETINGS)
        .withConfiguration(statefunConfig)
        .build(env)

    val output = predictEgress.getDataStreamForEgressId(GREETINGS)

    output.print()
    env.execute("Hello stateful!!")
}

Unfortunately, while the Python function seems to be working (tests build by following the Ververica workshop repository about Stateful functions are up and consistently running) and it is listening at the provided address (http://127.0.0.1:8000/statefun), the Kotlin pipeline (above) fails with a type cast error, which occurs before actually calling the remote function, at line 90 of the org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction. The reported exception is:


Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) at jdk.internal.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException: An error occurred when attempting to invoke function FunctionType(com.me.try, echo_user_id). at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50) at org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:73) at org.apache.flink.statefun.flink.core.functions.FunctionActivation.applyNextPendingEnvelope(FunctionActivation.java:50) at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:61) at org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:161) at org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:146) at org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:186) at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.ClassCastException: class com.lambda.User cannot be cast to class org.apache.flink.statefun.sdk.reqreply.generated.TypedValue (com.lambda.User and org.apache.flink.statefun.sdk.reqreply.generated.TypedValue are in unnamed module of loader 'app') at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:90) at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48) ... 24 more Process finished with exit code 1

According to the above exception, it appears that the object being sent is a plain User, which is not a TypedValue, while I was expecting that the first map operator used to define the userIngress stream should suffice to correctly set-up the data to be sent to the stateful function. Can you spot something I am doing wrong?

Waiting for a kind reply,
Best regards

Dario Bonino
-- 
Ing. Dario Bonino, Ph.D

e-m@il: [hidden email] 
www: https://www.linkedin.com/in/dariobonino
<foaf:Person>
	<foaf:firstName>Dario</foaf:firstName>
	<foaf:surname>Bonino</foaf:surname>
	<foaf:msnChatID>[hidden email]</foaf:msnChatID>
</foaf:Person> 
-- 
Ing. Dario Bonino, Ph.D

e-m@il: [hidden email] 
www: https://www.linkedin.com/in/dariobonino
<foaf:Person>
	<foaf:firstName>Dario</foaf:firstName>
	<foaf:surname>Bonino</foaf:surname>
	<foaf:msnChatID>[hidden email]</foaf:msnChatID>
</foaf:Person>