flink one transformation end,the next transformation start

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink one transformation end,the next transformation start

rimin515
hi,all,
i run a job,it is :
---------------------------------------------------------
val data = env.readTextFile("hdfs:///....")//DataSet[(String,Array[String])]
val dataVec = computeDataVect(data)//DataSet[(String,Int,Array[(Int,Double)])]
val rescomm = computeCosSims (dataVec)//DataSet[(String,Array[(String,Double)])]

but when run on the yarn cluster,the result was error,the job can success;and run on the local,in eclipse on my computer,the result is correct.

so,i run twice,
first:
val data = env.readTextFile("hdfs:///....")//DataSet[(String,Array[String])]
val dataVec = computeDataVect(data)//DataSet[(String,Int,Array[(Int,Double)])]
dataVec.writeAsText("hdfs///vec")//the vector is correct,

second:
val readVec = env.readTextFile("hdfs:///vec").map(...)//DataSet[(String,Int,Array[(Int,Double)])]
val rescomm = computeCosSims (dataVec)//DataSet[(String,Array[(String,Double)])]
and the result is correct,is the same as on local,in eclispe.
----------------------------------
someone can solve the problem?


Reply | Threaded
Open this post in threaded view
|

Re: flink one transformation end,the next transformation start

Ufuk Celebi
What is the error message/stack trace you get here?

On Thu, Mar 30, 2017 at 9:33 AM,  <[hidden email]> wrote:

> hi,all,
> i run a job,it is :
> ---------------------------------------------------------
> val data = env.readTextFile("hdfs:///....")//DataSet[(String,Array[String])]
> val dataVec =
> computeDataVect(data)//DataSet[(String,Int,Array[(Int,Double)])]
> val rescomm = computeCosSims
> (dataVec)//DataSet[(String,Array[(String,Double)])]
>
> but when run on the yarn cluster,the result was error,the job can
> success;and run on the local,in eclipse on my computer,the result is
> correct.
>
> so,i run twice,
> first:
> val data = env.readTextFile("hdfs:///....")//DataSet[(String,Array[String])]
> val dataVec =
> computeDataVect(data)//DataSet[(String,Int,Array[(Int,Double)])]
> dataVec.writeAsText("hdfs///vec")//the vector is correct,
>
> second:
> val readVec =
> env.readTextFile("hdfs:///vec").map(...)//DataSet[(String,Int,Array[(Int,Double)])]
> val rescomm = computeCosSims
> (dataVec)//DataSet[(String,Array[(String,Double)])]
> and the result is correct,is the same as on local,in eclispe.
> ----------------------------------
> someone can solve the problem?
>
>