The question is how to encapsulate numerous transformations into one object or may be a function in Apache Flink Java setting. I have tried to investigate this question using an example of Pi calculation (see below). I am wondering whether or not the suggested approach is valid from the Flink's point of view. It works on one computer, however, I do not know how it will behave in a cluster setup. The code is given below, and the main idea behind it as follows:
Thus, the code has two ExecutionEnvironment objects: one in main and another in the class classPI.
the whole code is given below. Again, the question is if this is a valid approach for encapsulation of data transformation into a class in Flink setup that is supposed to be parallelizable to work on a cluster. Is there a better way to hide details of data transformations? Thanks a lot! -------------------------The code ----------------------
|
from what i can tell from your code you
are trying to execute a job within a job. This just doesn't work.
your main method should look like this:
On 06.06.2016 21:14, Ser Kho wrote:
|
Chesnay: 1a. The code actually works, that is the point. 1b. What restrict for a Flink program to have several execution environments? 2. I am not sure that your modification allows for parallelism. Does it? 3. This code is a simple example of writing/organizing large and complicated programs, where the result of this pi needed to be used in another DataSet transformations beyond classPi(). What to do in this case? Thanks a lot for the suggestions. On Tuesday, June 7, 2016 6:15 AM, Chesnay Schepler <[hidden email]> wrote: from what i can tell from your code you
are trying to execute a job within a job. This just doesn't work. your main method should look like this:
On 06.06.2016 21:14, Ser Kho wrote:
|
1a. ah. yeah i see how it could work,
but i wouldn't count on it in a cluster.
you would (most likely) run the the sub-job (calculating pi) only on a single node. 1b. different execution environments generally imply different flink programs. 2. sure it does, since it's a normal flink job. yours on the other hand doesn't, since the job calculating PI only runs on a single TaskManager. 3. there are 2 ways. you can either chain jobs like this: (effectively running 2 flink programs in succession)
or (if all building blocks are flink programs) build a single job:
On 07.06.2016 13:35, Ser Kho wrote:
|
In reply to this post by Ser Kho
"The question is how to encapsulate numerous transformations into one object or may be a function in Apache Flink Java setting." Implement CustomUnaryOperation. This can then be applied to a DataSet by calling `DataSet result = DataSet.runOperation(new MyOperation<>(...));`.On Mon, Jun 6, 2016 at 3:14 PM, Ser Kho <[hidden email]> wrote:
|
On Tuesday, June 7, 2016 8:18 AM, Greg Hogan <[hidden email]> wrote: "The question is how to encapsulate numerous transformations into one object or may be a function in Apache Flink Java setting." Implement CustomUnaryOperation. This can then be applied to a DataSet by calling `DataSet result = DataSet.runOperation(new MyOperation<>(...));`.On Mon, Jun 6, 2016 at 3:14 PM, Ser Kho <[hidden email]> wrote:
|
In reply to this post by Chesnay Schepler
Below is my code (the bold lines are very similar and work ok). The line of concern is marked by blue color. The issue is that I do not use env in the constructor of the class classLengthCircle(), instead I use DataSet pi in the method computeLengthCircle(pi, Radius) and also DataSet Radius, but the latter does not matter for the question. Then, I proceed with transformations using this DataSet pi, see the class classLengthCircle below. It seems that the logic of this class and its method computeLengthCircle() does not require env at all. My question is if this code work will on a cluster (it does work on a local computer)? final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Double> Radius = env.fromElements(10.0); DataSet<Long> NumIter =env.fromElements(1000000L); // this line is similar to the suggested DataSet<Double> pi = new classPI(env).compute(NumIter); // this line is somewhat different from the suggested, as it has no env in the constructor DataSet<Double> LengthCircle = new classLengthCircle().computeLengthCircle(pi, Radius); ========================= public static final class classLengthCircle { public DataSet<Double> computeLengthCircle(DataSet<Double> pi, DataSet<Double> Radius) { DataSet<Double> result = pi.cross(Radius).map( new MapFunction<Tuple2<Double, Double>, Double >() { @Override public Double map(Tuple2<Double, Double> arg0) throws Exception { return 2*arg0.f0 *arg0.f1; }} ); return result; } }
{ private final ExecutionEnvironment env; public classPI(ExecutionEnvironment env) {this.env = env;} public DataSet<Double> compute( final DataSet<Long> NumIter) throws Exception{ return this.env.generateSequence(1, NumIter.collect().get(0)) .map(new Sampler()) .reduce(new SumReducer()) .map(new MapFunction<Long, Double >() { Long N = NumIter.collect().get(0); @Override public Double map(Long arg0) throws Exception { return arg0 *4.0/N; }}); }}
On Tuesday, June 7, 2016 8:14 AM, Chesnay Schepler <[hidden email]> wrote: 1a. ah. yeah i see how it could work, but i wouldn't count on it in a cluster.
you would (most likely) run the the sub-job (calculating pi) only on a single node. 1b. different execution environments generally imply different flink programs. 2. sure it does, since it's a normal flink job. yours on the other hand doesn't, since the job calculating PI only runs on a single TaskManager. 3. there are 2 ways. you can either chain jobs like this: (effectively running 2 flink programs in succession) or (if all building blocks are flink programs) build a single job:
On 07.06.2016 13:35, Ser Kho wrote:
|
Q1:
Whether one of your classes requires the env parameter depends on whether you want to create a new Source or set a ExecutionEnvironment parameter inside the class. If you don't you can of course not pass it :) I can't see anything that would prevent it form running on a cluster. Q2: Usually, parameters are passed to a UDF through the constructor. You can use a DataSet within a function initializer block, but it's rather unusual (this is in-fact the first time I've seen it done this way). You can also just pass a long into the constructor, there is no need to use a DataSet and collect(). final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Double> Radius = env.fromElements(10.0); final long numIter = 1000000L; DataSet<Double> pi = new classPI(env).compute(numIter); DataSet<Double> LengthCircle = new classLengthCircle().computeLengthCircle(pi, Radius); public static final class classPI implements Serializable { private final ExecutionEnvironment env; public classPI(ExecutionEnvironment env) { this.env = env; } public DataSet<Double> compute( final long numIter) throws Exception { return this.env.generateSequence(1, numIter) .map(new Sampler()) .reduce(new SumReducer()) .map(new MapFunction<Long, Double >() { @Override public Double map(Long arg0) throws Exception { return arg0 *4.0/numIter; }}); } }Regards, Chesnay On 10.06.2016 02:46, Ser Kho wrote:
|
Free forum by Nabble | Edit this page |