is dataset immutable in Flink?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

is dataset immutable in Flink?

Xing Feng
Hi, all,

I have a case where each element in the Dataset is an Arraylist. And I may add some elements in the Arraylist. Therefore, I use map operator to add elements. However, due to dataset is immutable. Every time I use a mapper to add an element, it actually creates another arraylist and add element in it.

Below is the code as an example.

DataSet<ArrayList<Integer>> dataset0 = env.fromElements(new ArrayList<>(), new ArrayList<>());
DataSet<ArrayList<Integer>> dataset1 = dataset0.map(
new MapFunction<ArrayList<Integer>, ArrayList<Integer>>() {
  @Override
  public ArrayList<Integer> map(ArrayList<Integer> value) throws Exception {
    value.add(10);
    return value;
  }
});
DataSet<ArrayList<Integer>> unioned = dataset0.union(dataset1);
unioned.print();

The output is “[] [] [10] [10]”

But I expect “[10] [10] [10] [10]”


Any advice is appreciated.

--
Best Regards.
---
Xing FENG
The University of New South Wales
PhD Student
Mobile: +61 413 8572 88
Reply | Threaded
Open this post in threaded view
|

Re: is dataset immutable in Flink?

Tzu-Li (Gordon) Tai
Hi!

Yes, DataSets and DataStreams in Flink are immutable.
Transformations on them would result in new DataSets / DataStreams.

Therefore, the `dataset0` in your case remains to be ([], []), while the newly transformed `dataset1` is ([10], [10]).
The union of the two is therefore ([0], [0], [10], [10]).

Best,
Gordon

On 29 June 2017 at 3:40:52 PM, Xing Feng ([hidden email]) wrote:

Hi, all,

I have a case where each element in the Dataset is an Arraylist. And I may add some elements in the Arraylist. Therefore, I use map operator to add elements. However, due to dataset is immutable. Every time I use a mapper to add an element, it actually creates another arraylist and add element in it.

Below is the code as an example.

DataSet<ArrayList<Integer>> dataset0 = env.fromElements(new ArrayList<>(), new ArrayList<>());
DataSet<ArrayList<Integer>> dataset1 = dataset0.map(
new MapFunction<ArrayList<Integer>, ArrayList<Integer>>() {
  @Override
  public ArrayList<Integer> map(ArrayList<Integer> value) throws Exception {
    value.add(10);
    return value;
  }
});
DataSet<ArrayList<Integer>> unioned = dataset0.union(dataset1);
unioned.print();

The output is “[] [] [10] [10]”

But I expect “[10] [10] [10] [10]”


Any advice is appreciated.

--
Best Regards.
---
Xing FENG
The University of New South Wales
PhD Student
Mobile: +61 413 8572 88