---------- Forwarded message ---------- From: Pushpendra Jaiswal <[hidden email]> Date: Wed, Aug 31, 2016 at 3:52 PM Subject: Not able to query : Queryable State To: [hidden email] Hi I have been trying to use QueryableState with my project. It's always failing the query saying. Job 461acde52fae479a6e02c6b2922cd9 Both keys(one in queryablestate and one in query) are of same type containing same data. Still I am not able to fetch the result. Type of key Class : case class Record(val fields: Seq[String], val ab: Double = 0.0, val cd: Double = 0.0, val count: Long = 0) extends Serializable with Addable[Record] with Comparable[Record]{ |
Hi,
can you provide some more code from your job and a full stack trace for your problem? That would help us to figure out the reason. Best, Stefan
|
Hi,
Are you sure the job id matches with running job? The error clearly says JobID not found. How are you setting the JobID, If I remember correctly, I used something like val jobId = JobID.fromHexString("myjob_id_here") to get the JobID object and used it to call client.getKvState() Thanks, Vishnu On Wed, Aug 31, 2016 at 10:30 AM, Stefan Richter <[hidden email]> wrote:
|
Hi Vishnu
val env = StreamExecutionEnvironment.getExecutionEnvironment val jobID = env.getStreamGraph.getJobGraph.getJobID As I am using the jobId of current running job. It should exist. Thanks and regards Pushpendra |
In reply to this post by Stefan Richter
Hi Stefan
Please find below stack trace and code : java.lang.IllegalStateException: Job 81ca41b13e7be8feb99f064e5a9a4237 not found at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleKvStateMessage(JobManager.scala:1470) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:684) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:123) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Code : class Aggregator(val stream: KeyedStream[Record, Long]) extends Serializable { def reduceFunction = new ReduceFunction[Record] { override def reduce(t: Record, t1: Record): Record = { val total = t + t1 total } } val reducingStateDesc = new ReducingStateDescriptor[Record]("record reducing descriptor", reduceFunction, classOf[Record]) // reducingStateDesc.setQueryable("queryStore") def reduceToQueryable = { stream.asQueryableState("queryStore", reducingStateDesc) } } class FlinkQuery[T](jobID: JobID, val serializer: TypeSerializer[T],jobManagerIP:String, jobManagerPort:Int) extends Serializable with LazyLogging { @Transient private lazy val client = new QueryableStateClient(config) val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey) kvState.onFailure(onFailure) kvState.onSuccess(onSuccess) def onFailure = new PartialFunction[Throwable, String] { override def isDefinedAt(x: Throwable): Boolean = true override def apply(v1: Throwable): String = { logger.error("failed to query " + v1.getLocalizedMessage) } } def onSuccess = new PartialFunction[Array[Byte], Array[Byte]] { override def isDefinedAt(x: Array[Byte]): Boolean = x != Nil override def apply(v1: Array[Byte]) = { logger.error("got result " + v1); v1 } } } class Driver { val jobID = env.getStreamGraph.getJobGraph.getJobID val aggregatedNQueryable = driver.aggregateWithQueryable(stream) val queryStoreName = aggregatedNQueryable.getQueryableStateName val serializer = aggregatedNQueryable.getKeySerializer val valueSerializer = aggregatedNQueryable.getValueSerialize } |
I think the exception message is saying what’s the problem. The job simply does not exist. You can verify that by running The reason is that calling Cheers, On Wed, Sep 7, 2016 at 8:07 AM, pushpendra.jaiswal <[hidden email]> wrote: Hi Stefan |
Free forum by Nabble | Edit this page |