Fwd: Not able to query : Queryable State

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Fwd: Not able to query : Queryable State

pushpendra.jaiswal

---------- Forwarded message ----------
From: Pushpendra Jaiswal <[hidden email]>
Date: Wed, Aug 31, 2016 at 3:52 PM
Subject: Not able to query : Queryable State
To: [hidden email]


Hi
I have been trying to use QueryableState with my project.
It's always failing the query saying.

Job 461acde52fae479a6e02c6b2922cd9c1 not found

Both keys(one in queryablestate and one in query) are of same type containing same data.
Still I am not able to fetch the result.
Type of key Class :
case class Record(val fields: Seq[String], val ab: Double = 0.0, val cd: Double = 0.0, val count: Long = 0) extends Serializable with Addable[Record] with Comparable[Record]{

override def +(other: Record) = {
new Record(fields,
ab + other.ab,
cd + other.cd,
count + other.count)
}

override def equals(obj: scala.Any): Boolean = {
obj match {
case other: Record => fields == other.fields
case _ => false
}
}

override def hashCode(): Int = {
var hashCode = 0
fields.foreach(x => hashCode= hashCode+ 31 * x.hashCode)
// Math.abs(hashCode)
hashCode
}

override def compareTo(o: Record): Int = {
var i = 0
i = i + (count - o.count).toInt
i = i+ (ab - o.ab).toInt
i = i+ (cd - o.cd).toInt
return i
}
}

Is there problem with serialization?
Can I not use complex object as key (it is implementing comparable)

Thanks and Regards
Pushpendra Jaiswal

Reply | Threaded
Open this post in threaded view
|

Re: Not able to query : Queryable State

Stefan Richter
Hi,

can you provide some more code from your job and a full stack trace for your problem? That would help us to figure out the reason.

Best,
Stefan

Am 31.08.2016 um 13:00 schrieb Pushpendra Jaiswal <[hidden email]>:


---------- Forwarded message ----------
From: Pushpendra Jaiswal <[hidden email]>
Date: Wed, Aug 31, 2016 at 3:52 PM
Subject: Not able to query : Queryable State
To: [hidden email]


Hi
I have been trying to use QueryableState with my project.
It's always failing the query saying.

Job 461acde52fae479a6e02c6b2922cd9c1 not found

Both keys(one in queryablestate and one in query) are of same type containing same data.
Still I am not able to fetch the result.
Type of key Class :
case class Record(val fields: Seq[String], val ab: Double = 0.0, val cd: Double = 0.0, val count: Long = 0) extends Serializable with Addable[Record] with Comparable[Record]{

override def +(other: Record) = {
new Record(fields,
ab + other.ab,
cd + other.cd,
count + other.count)
}

override def equals(obj: scala.Any): Boolean = {
obj match {
case other: Record => fields == other.fields
case _ => false
}
}

override def hashCode(): Int = {
var hashCode = 0
fields.foreach(x => hashCode= hashCode+ 31 * x.hashCode)
// Math.abs(hashCode)
hashCode
}

override def compareTo(o: Record): Int = {
var i = 0
i = i + (count - o.count).toInt
i = i+ (ab - o.ab).toInt
i = i+ (cd - o.cd).toInt
return i
}
}

Is there problem with serialization?
Can I not use complex object as key (it is implementing comparable)

Thanks and Regards
Pushpendra Jaiswal


Reply | Threaded
Open this post in threaded view
|

Re: Not able to query : Queryable State

vishnuviswanath
Hi,

Are you sure the job id matches with running job? The error clearly says JobID not found.

How are you setting the JobID, If I remember correctly, I used something like 
val jobId = JobID.fromHexString("myjob_id_here") to get the JobID object and used it to call client.getKvState()

Thanks,
Vishnu

On Wed, Aug 31, 2016 at 10:30 AM, Stefan Richter <[hidden email]> wrote:
Hi,

can you provide some more code from your job and a full stack trace for your problem? That would help us to figure out the reason.

Best,
Stefan

Am 31.08.2016 um 13:00 schrieb Pushpendra Jaiswal <[hidden email]>:


---------- Forwarded message ----------
From: Pushpendra Jaiswal <[hidden email]>
Date: Wed, Aug 31, 2016 at 3:52 PM
Subject: Not able to query : Queryable State
To: [hidden email]


Hi
I have been trying to use QueryableState with my project.
It's always failing the query saying.

Job 461acde52fae479a6e02c6b2922cd9c1 not found

Both keys(one in queryablestate and one in query) are of same type containing same data.
Still I am not able to fetch the result.
Type of key Class :
case class Record(val fields: Seq[String], val ab: Double = 0.0, val cd: Double = 0.0, val count: Long = 0) extends Serializable with Addable[Record] with Comparable[Record]{

override def +(other: Record) = {
new Record(fields,
ab + other.ab,
cd + other.cd,
count + other.count)
}

override def equals(obj: scala.Any): Boolean = {
obj match {
case other: Record => fields == other.fields
case _ => false
}
}

override def hashCode(): Int = {
var hashCode = 0
fields.foreach(x => hashCode= hashCode+ 31 * x.hashCode)
// Math.abs(hashCode)
hashCode
}

override def compareTo(o: Record): Int = {
var i = 0
i = i + (count - o.count).toInt
i = i+ (ab - o.ab).toInt
i = i+ (cd - o.cd).toInt
return i
}
}

Is there problem with serialization?
Can I not use complex object as key (it is implementing comparable)

Thanks and Regards
Pushpendra Jaiswal



Reply | Threaded
Open this post in threaded view
|

Re: Not able to query : Queryable State

pushpendra.jaiswal
Hi Vishnu

val env = StreamExecutionEnvironment.getExecutionEnvironment
val jobID = env.getStreamGraph.getJobGraph.getJobID


As I am using the jobId of current running job. It should exist.

Thanks and regards
Pushpendra
Reply | Threaded
Open this post in threaded view
|

Re: Not able to query : Queryable State

pushpendra.jaiswal
In reply to this post by Stefan Richter
Hi Stefan

Please find below stack trace and code :

java.lang.IllegalStateException: Job 81ca41b13e7be8feb99f064e5a9a4237 not found
        at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleKvStateMessage(JobManager.scala:1470)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:684)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:123)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Code :

class Aggregator(val stream: KeyedStream[Record, Long]) extends Serializable {

  def reduceFunction = new ReduceFunction[Record] {
    override def reduce(t: Record, t1: Record): Record = {
      val total = t + t1
      total
    }
  }

  val reducingStateDesc = new ReducingStateDescriptor[Record]("record reducing descriptor", reduceFunction, classOf[Record])
//  reducingStateDesc.setQueryable("queryStore")

  def reduceToQueryable = {
    stream.asQueryableState("queryStore", reducingStateDesc)
  }
}

class FlinkQuery[T](jobID: JobID, val serializer: TypeSerializer[T],jobManagerIP:String, jobManagerPort:Int) extends Serializable with LazyLogging {

  @Transient
  private lazy val client = new QueryableStateClient(config)

 val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey)
    kvState.onFailure(onFailure)
    kvState.onSuccess(onSuccess)

 def onFailure = new PartialFunction[Throwable, String] {
    override def isDefinedAt(x: Throwable): Boolean = true
    override def apply(v1: Throwable): String = {
      logger.error("failed to query " + v1.getLocalizedMessage)
    }
  }


  def onSuccess = new PartialFunction[Array[Byte], Array[Byte]] {
    override def isDefinedAt(x: Array[Byte]): Boolean = x != Nil

    override def apply(v1: Array[Byte]) = {
      logger.error("got result " + v1);
      v1
    }
  }
}

class Driver {
    val jobID = env.getStreamGraph.getJobGraph.getJobID
    val aggregatedNQueryable = driver.aggregateWithQueryable(stream)
    val queryStoreName =  aggregatedNQueryable.getQueryableStateName
    val serializer =    aggregatedNQueryable.getKeySerializer
     val valueSerializer =    aggregatedNQueryable.getValueSerialize
}

Reply | Threaded
Open this post in threaded view
|

Re: Not able to query : Queryable State

Till Rohrmann

I think the exception message is saying what’s the problem. The job simply does not exist. You can verify that by running bin/flink list or look it up in the web interface.

The reason is that calling env.getStreamGraph.getJobGraph will generate a new JobGraph (not the one which is sent to the JobManager) and this JobGraph will get a new JobID assigned. Thus, the JobGraph which you send to the JobManager and the one you used to retrieve the JobID from are different.

Cheers,
Till


On Wed, Sep 7, 2016 at 8:07 AM, pushpendra.jaiswal <[hidden email]> wrote:
Hi Stefan

Please find below stack trace and code :

java.lang.IllegalStateException: Job 81ca41b13e7be8feb99f064e5a9a4237 not
found
        at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleKvStateMessage(JobManager.scala:1470)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:684)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:123)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Code :

class Aggregator(val stream: KeyedStream[Record, Long]) extends Serializable
{

  def reduceFunction = new ReduceFunction[Record] {
    override def reduce(t: Record, t1: Record): Record = {
      val total = t + t1
      total
    }
  }

  val reducingStateDesc = new ReducingStateDescriptor[Record]("record
reducing descriptor", reduceFunction, classOf[Record])
//  reducingStateDesc.setQueryable("queryStore")

  def reduceToQueryable = {
    stream.asQueryableState("queryStore", reducingStateDesc)
  }
}

class FlinkQuery[T](jobID: JobID, val serializer:
TypeSerializer[T],jobManagerIP:String, jobManagerPort:Int) extends
Serializable with LazyLogging {

  @Transient
  private lazy val client = new QueryableStateClient(config)

 val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey)
    kvState.onFailure(onFailure)
    kvState.onSuccess(onSuccess)

 def onFailure = new PartialFunction[Throwable, String] {
    override def isDefinedAt(x: Throwable): Boolean = true
    override def apply(v1: Throwable): String = {
      logger.error("failed to query " + v1.getLocalizedMessage)
    }
  }


  def onSuccess = new PartialFunction[Array[Byte], Array[Byte]] {
    override def isDefinedAt(x: Array[Byte]): Boolean = x != Nil

    override def apply(v1: Array[Byte]) = {
      logger.error("got result " + v1);
      v1
    }
  }
}

class Driver {
    val jobID = env.getStreamGraph.getJobGraph.getJobID
    val aggregatedNQueryable = driver.aggregateWithQueryable(stream)
    val queryStoreName =  aggregatedNQueryable.getQueryableStateName
    val serializer =    aggregatedNQueryable.getKeySerializer
     val valueSerializer =    aggregatedNQueryable.getValueSerialize
}





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Not-able-to-query-Queryable-State-tp8808p8938.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.