Hi Team,
I am using the apache flink with java for below problem statement
1.where i will read a csv file with field delimeter character ;
2.transform the fields
3.write back the data back to csv
my doubts are as below
1. if i need to read the csv file of size above 50 gb what would be the approach
2 if i use Parallelism i am not able to split the data and collect it since its a csv file
and while writing a back to csv its creating a multiple files to write the data using the default Parallelism how can achieve the same
sample input is
000008000077;151139924603;3526358005322;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940
and sample output is
000008000077sfhsdfbs;151139924603;XXXXXXXXX;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940
below is the code which i am currently running on local environment
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
package com.ericsson.voucher;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple8;
import org.apache.flink.util.Collector;
public class Classification {
private static final String OUTPUT_PATH = "C:\\Projects\\DM\\Pentaho\\OutPut\\output.csv";
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
env.setParallelism(20);
long subViewStartTime = System.currentTimeMillis();
DataSet<Tuple1<String>> rawdata = (DataSet<Tuple1<String>>) env
.readCsvFile("C:\\Projects\\DM\\Pentaho\\CRV_EXPORT.csv")
.lineDelimiter("\n").types(String.class);
DataSet<Tuple8<String,String, String, String, String, String, String, String>> mails = rawdata
.flatMap(new DataExtractor()).rebalance();
mails.writeAsCsv(OUTPUT_PATH, "\n", ";").setParallelism(1);
mails.print();
long subViewEndTime = System.currentTimeMillis();
long subViewDifference = subViewEndTime - subViewStartTime;
System.out.println("The Difference Time is"+ subViewDifference/1000 +"seconds");
}
public static class DataExtractor
extends
RichFlatMapFunction<Tuple1<String>, Tuple8<String, String, String, String, String, String, String, String>> {
/**
*
*/
private static final long serialVersionUID = 1L;
public void flatMap(
Tuple1<String> paramIN,
org.apache.flink.util.Collector<Tuple8<String, String, String, String, String, String, String, String>> out)
throws Exception {
String[] lines = paramIN.f0.split(";");
if (lines != null && lines.length > 0) {
String vocuherCode =lines[0];
vocuherCode=vocuherCode+"TEST1";
String VoucherId = lines[1];
String voucherNumber = lines[2];
String status = lines[3]+"TWTSTST";
String startDate = lines[4] + "";
String endDate = lines[5] + "";
String endStatus = lines[6];
String endVoucherNumber = lines[7];
out.collect(new Tuple8<String, String, String, String, String, String, String, String>(
vocuherCode, VoucherId, voucherNumber, status,
startDate, endDate, endStatus, endVoucherNumber));
}
}
}
public static class RecordReducer
implements
GroupReduceFunction<Tuple8<String, String, String, String, String, String, String, String>,
Tuple8<String, String, String, String, String, String, String, String>> {
/**
*
*/
private static final long serialVersionUID = -6045821605365596025L;
@Override
public void reduce(
Iterable<Tuple8<String, String, String, String, String, String, String, String>> paramIterable,
Collector<Tuple8<String, String, String, String, String, String, String, String>> paramCollector)
throws Exception {
// TODO Auto-generated method stub
String vocuherCode = null;
String VoucherId = null;
String voucherNumber = null;
String status = null;
String startDate = null;
String endDate = null;
String endStatus = null;
String endVoucherNumber = null;
for (Tuple8<String, String, String, String, String, String, String, String> m : paramIterable) {
vocuherCode = m.f0;
VoucherId = m.f1;
voucherNumber = m.f2;
status = m.f3;
startDate = m.f4;
endDate = m.f5;
endStatus = m.f6;
endVoucherNumber = m.f7;
paramCollector
.collect(new Tuple8<String, String, String, String, String, String, String, String>(
vocuherCode, VoucherId, voucherNumber, status,
startDate, endDate, endStatus, endVoucherNumber));
}
}
}
}
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
please help me on the same how can I achieve the portioning of fields on the above data and achieve the parallism to increase the throughput of my application