yes,it is YARN single job,use the commend: flink-1.1.1/bin/flink run -m yarn-cluster \ -yn 2 \ -ys 2 \ -yjm 2048 \ -ytm 2048 \ --class statics.ComputeDocSim \ --classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar \ --classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar \ --classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar \ --classpath file:///opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar \ --classpath file:///opt/cloudera/parcels/CDH/lib/hbase/lib/guava-12.0.1.jar \ text-assembly-0.1.0.jar hdfs:///user/hadoop/tf-idf-ex hdfs:///user/hadoop/tf-idf-ex-sims and code is: val to = //DataSet[(String, Vector)] val to = from.collect() val cosDistince = CosineDistanceMetric.apply() val res = from.map{x=> val fromId = x._1 val docSims = to.filter(_._1!=fromId).map{y=> val toId = y._1 val score = 1-cosDistince.distance(x._2, y._2) (toId,score) }.toList.sortWith((x,y)=>x._2>y._2).take(20) (fromId,docSims) } res.writeAsText(..) ----- 原始邮件 ----- 发件人:Stephan Ewen <[hidden email]> 收件人:[hidden email] 抄送人:亘谷 <[hidden email]> 主题:Re: flink Broadcast 日期:2017年03月24日 17点40分 The program consists of two executions - one that only collects() back to the client, one that executes the map function.
Are you running this as a "YARN single job" execution? IN that case, there may be an issue that this incorrectly tries to submit to a stopping YARN cluster. On Fri, Mar 24, 2017 at 10:32 AM, Robert Metzger <[hidden email]> wrote:
|
Hi Rimin, I've just tested a Flink application consisting of multiple jobs similar to yours (using collect) with the `yarn-cluster` option and Flink 1.2.0 and it seemed to work. The yarn cluster was only shut down after the last Flink job has been executed. Could you maybe test if your problem still exists with Flink 1.2.0? Cheers, Till On Fri, Mar 24, 2017 at 10:59 AM, <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |