Dear List,
I am trying to call a sample stateful function defined in Python, using the Stateful Function Python SDK, from a Flink pipeline. I am building upon the examples provided for the SDK for Flink DataStream Integration but I am currently stuck on a type cast issue that I am not able to overcome, even by looking at the flink-statefun sources. I am sure that I am probably doing something wrong.
In the flink pipeline (of which an excerpt is reported below), I load a set of users from a CSV file and create a Datastream<User> where User is a protobuf v3 generated class. Given this stream, the base idea is to forward the stream to a remote function (written in python using the sdk) that basically unpacks the user object, extracts the user id and provides it back as a String.
val REMOTE_GREET =
FunctionType("com.me.try", "echo_user_id")
val GREETINGS = EgressIdentifier<String>("com.me.try",
"out", String::class.java)
@JvmStatic
fun main(args: Array<String>) {
val env =
StreamExecutionEnvironment.getExecutionEnvironment()
val usersCsv = env.readTextFile("input/users.csv")
val users = createUsersStream(usersCsv).shuffle()
val statefunConfig =
StatefulFunctionsConfig.fromEnvironment(env)
statefunConfig.factoryType =
MessageFactoryType.WITH_PROTOBUF_PAYLOADS
val usersIngress: DataStream<RoutableMessage> =
users.map { user ->
RoutableMessageBuilder.builder()
.withTargetAddress(REMOTE_GREET,
user.userId.toString())
.withMessageBody(user)
.build()
}
val predictEgress =
StatefulFunctionDataStreamBuilder.builder("test")
.withDataStreamAsIngress(usersIngress)
.withRequestReplyRemoteFunction(
RequestReplyFunctionBuilder
.requestReplyFunctionBuilder(REMOTE_GREET,
URI.create("http://127.0.0.1:8000/statefun"))
.withMaxRequestDuration(Duration.ofSeconds(15))
.withMaxNumBatchRequests(500)
)
.withEgressId(GREETINGS)
.withConfiguration(statefunConfig)
.build(env)
val output =
predictEgress.getDataStreamForEgressId(GREETINGS)
output.print()
env.execute("Hello stateful!!")
}
Unfortunately, while the Python function seems to be working
(tests build by following the Ververica workshop repository about
Stateful functions are up and consistently running) and it is
listening at the provided address
(http://127.0.0.1:8000/statefun), the Kotlin pipeline (above)
fails with a type cast error, which occurs before actually calling
the remote function, at line 90 of the org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.
The reported exception is:
Exception
in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job
execution failed. at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
at akka.dispatch.OnComplete.internal(Future.scala:264) at
akka.dispatch.OnComplete.internal(Future.scala:261) at
akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at
akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at
scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at
scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
at
akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused
by: org.apache.flink.runtime.JobException: Recovery is
suppressed by NoRestartBackoffTimeStrategy at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at
jdk.internal.reflect.GeneratedMethodAccessor15.invoke(Unknown
Source) at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at
java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at
scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517) at
akka.actor.Actor.aroundReceive$(Actor.scala:515) at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561) at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
akka.dispatch.Mailbox.run(Mailbox.scala:225) at
akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more
Caused by:
org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException:
An error occurred when attempting to invoke function
FunctionType(com.me.try, echo_user_id). at
org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50)
at
org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:73)
at
org.apache.flink.statefun.flink.core.functions.FunctionActivation.applyNextPendingEnvelope(FunctionActivation.java:50)
at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:61)
at
org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:161)
at
org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:146)
at
org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:186)
at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassCastException: class
com.lambda.User cannot be cast to class
org.apache.flink.statefun.sdk.reqreply.generated.TypedValue
(com.lambda.User and
org.apache.flink.statefun.sdk.reqreply.generated.TypedValue
are in unnamed module of loader 'app') at
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:90)
at
org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
... 24 more
Process finished with exit code 1
According to the above exception, it appears that the object being sent is a plain User, which is not a TypedValue, while I was expecting that the first map operator used to define the userIngress stream should suffice to correctly set-up the data to be sent to the stateful function. Can you spot something I am doing wrong?
-- Ing. Dario Bonino, Ph.D e-m@il: [hidden email] www: https://www.linkedin.com/in/dariobonino <foaf:Person> <foaf:firstName>Dario</foaf:firstName> <foaf:surname>Bonino</foaf:surname> <foaf:msnChatID>[hidden email]</foaf:msnChatID> </foaf:Person><script type="application/ld+json"> { "@context": "http://schema.org", "@type": "Person", "name": "Dario Bonino", "jobTitle": "POst-doc Researcher", "affiliation": "PHPower s.r.l.", "additionalName": "Dario", "url": "https://www.linkedin.com/in/dariobonino", } </script>
Free forum by Nabble | Edit this page |