Strange behavior on filter, group and reduce DataSets
Posted by
simone on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/ANNOUNCE-Apache-Flink-1-3-3-released-tp18942p18967.html
Hi all,
I am using Flink 1.3.1 and I have found a strange behavior on
running the following logic:
- Read data from file and store into DataSet<POJO>
- Split dataset in two, by checking if "field1" of POJOs is
empty or not, so that the first dataset contains only elements
with non empty "field1", and the second dataset will contain the
other elements.
- Each dataset is then grouped by, one by "field1" and other by
another field, and subsequently reduced.
- The 2 datasets are merged together by union.
- The final dataset is written as json.
What I was expected, from output, was to find only one element with
a specific value of "field1" because:
- Reducing the first dataset grouped by "field1" should generate
only one element with a specific value of "field1".
- The second dataset should contain only elements with empty
"field1".
- Making an union of them should not duplicate any record.
This does not happen. When i read the generated jsons i see some
duplicate (non empty) values of "field1".
Strangely this does not happen when the union between the two
datasets is not computed. In this case the first dataset produces
elements only with distinct values of "field1", while second dataset
produces only records with empty field "value1".
Debugging the code, it seems that the map function used to convent
the last merged dataset into json strings starts before the
reduction functions terminates. This seems to produce duplicates.
Here is my pseudocode:
DataSet<POJO> subjects = read from csv...
DataSet<POJO> subjectsWithCondition =
subjects.filter(new FilterFunction<POJO>(){
@Override
public boolean filter(POJO subject) throws Exception {
return subject.getField("field1") != "";
}
}).groupBy("field1").reduce(new ReduceFunction<P>() {
@Override
public Soggetto reduce(POJO subject1, POJO subject2) {
POJO ret = subject1;
return ret;
}
});
DataSet<POJO> subjectsWithoutCondition =
subjects.filter(new FilterFunction<POJO>(){
@Override
public boolean filter(POJO subject) throws Exception {
return subject.getField("field1") == "";
}
}).groupBy("field2").reduce(new ReduceFunction<P>() {
@Override
public Soggetto reduce(POJO subject1, POJO subject2) {
POJO ret = subject1;
return ret;
}
});
DataSet<POJO> allSubjects =
subjectsWithCondition.union(subjectsWithoutCondition);
DataSet<String> jsonSubjects = allSubjects.map(new
RichMapFunction<POJO, String>() {
private static final long serialVersionUID = 1L;
ObjectMapper mapper = new ObjectMapper();
@Override
public String map(POJO subject) throws Exception {
return mapper.writeValueAsString(subject);
}
});
jsonSubjects.writeAsText("/tmp/subjects/",
WriteMode.OVERWRITE);
env.execute("JSON generation");
What is the problem? Did I made some mistake on filtering,grouping
or reducing logic?
Thanks in advance,
Simone.