flink Broadcast

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flink Broadcast

rimin515
Hi ,alll,
i have a 36000 documents,and the document all transfer a vector , one doc is a vector,and dimension is the same,so have DataSet
------------------------
val data :DataSet[(String,SparseVector)]= ....//36000 record
val toData = data.collect()
val docSims = data.map{x=>
     val fromId=x._1
     val docsims = toData.filter{y=>y._1!=fromId}.map{y=>
          val score =1- cosDisticnce(x._2,y._2)
         (y._1,score)
     }.toList.sortWith{(a,b)=>a._2>b._2}.take(20)
   (fromId,docsims)
}
docSims.writeAsText(file)
.....
when run the job on yarn,it will get error ,the message is following:
       java.lang.InterruptedException  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274)


someone can tell me ?thank you
Reply | Threaded
Open this post in threaded view
|

Re: flink Broadcast

rmetzger0
Hi,

Can you provide more logs to help us understand whats going on?

One note regarding your application: You are calling .collect() and send the collection with the map() call to the cluster again.
This is pretty inefficient and can potentially break your application (in particular the RPC system of Flink).

I would recommend to use broadcast variables to send the dataset to the map operator: https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables


On Thu, Mar 23, 2017 at 3:11 PM, <[hidden email]> wrote:
Hi ,alll,
i have a 36000 documents,and the document all transfer a vector , one doc is a vector,and dimension is the same,so have DataSet
------------------------
val data :DataSet[(String,SparseVector)]= ....//36000 record
val toData = data.collect()
val docSims = data.map{x=>
     val fromId=x._1
     val docsims = toData.filter{y=>y._1!=fromId}.map{y=>
          val score =1- cosDisticnce(x._2,y._2)
         (y._1,score)
     }.toList.sortWith{(a,b)=>a._2>b._2}.take(20)
   (fromId,docsims)
}
docSims.writeAsText(file)
.....
when run the job on yarn,it will get error ,the message is following:
       java.lang.InterruptedException  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274)


someone can tell me ?thank you

Reply | Threaded
Open this post in threaded view
|

Re: flink Broadcast

Stephan Ewen
The program consists of two executions - one that only collects() back to the client, one that executes the map function.

Are you running this as a "YARN single job" execution? IN that case, there may be an issue that this incorrectly tries to submit to a stopping YARN cluster.



On Fri, Mar 24, 2017 at 10:32 AM, Robert Metzger <[hidden email]> wrote:
Hi,

Can you provide more logs to help us understand whats going on?

One note regarding your application: You are calling .collect() and send the collection with the map() call to the cluster again.
This is pretty inefficient and can potentially break your application (in particular the RPC system of Flink).

I would recommend to use broadcast variables to send the dataset to the map operator: https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables


On Thu, Mar 23, 2017 at 3:11 PM, <[hidden email]> wrote:
Hi ,alll,
i have a 36000 documents,and the document all transfer a vector , one doc is a vector,and dimension is the same,so have DataSet
------------------------
val data :DataSet[(String,SparseVector)]= ....//36000 record
val toData = data.collect()
val docSims = data.map{x=>
     val fromId=x._1
     val docsims = toData.filter{y=>y._1!=fromId}.map{y=>
          val score =1- cosDisticnce(x._2,y._2)
         (y._1,score)
     }.toList.sortWith{(a,b)=>a._2>b._2}.take(20)
   (fromId,docsims)
}
docSims.writeAsText(file)
.....
when run the job on yarn,it will get error ,the message is following:
       java.lang.InterruptedException  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274)


someone can tell me ?thank you