Apache Flink Reading CSV Files ,Transform and Writting Back to CSV using Paralliesm

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Apache Flink Reading CSV Files ,Transform and Writting Back to CSV using Paralliesm

Lokesh R

Hi Team,

I am using the apache flink with java for below problem statement

1.where i will read a csv file with field delimeter  character ;
2.transform the fields
3.write back the data back to csv

my doubts are as below

1. if i need to read the csv file of size above 50 gb what would be the approach
2 if i use Parallelism i am not able to split the data and collect it since its a csv file
and while writing a back to csv its creating a multiple files to write the data using the default Parallelism how can achieve the same

sample input is
000008000077;151139924603;3526358005322;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940

and sample output is

000008000077sfhsdfbs;151139924603;XXXXXXXXX;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940


below is the code which i am currently running on local environment

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

package com.ericsson.voucher;

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple8;

import org.apache.flink.util.Collector;


public class Classification {

    private static final String OUTPUT_PATH = "C:\\Projects\\DM\\Pentaho\\OutPut\\output.csv";

    public static void main(String[] args) throws Exception {
       

        ExecutionEnvironment env = ExecutionEnvironment
                .getExecutionEnvironment();
        env.setParallelism(20);
       
         long subViewStartTime = System.currentTimeMillis();
        DataSet<Tuple1<String>> rawdata = (DataSet<Tuple1<String>>) env
                .readCsvFile("C:\\Projects\\DM\\Pentaho\\CRV_EXPORT.csv")
                .lineDelimiter("\n").types(String.class);
    DataSet<Tuple8<String,String, String, String, String, String, String, String>> mails = rawdata
                .flatMap(new DataExtractor()).rebalance();
                mails.writeAsCsv(OUTPUT_PATH, "\n", ";").setParallelism(1);
           mails.print();
           long subViewEndTime = System.currentTimeMillis();
         
           long subViewDifference = subViewEndTime - subViewStartTime;
         
           System.out.println("The Difference Time is"+ subViewDifference/1000 +"seconds");

    }

    public static class DataExtractor
            extends
            RichFlatMapFunction<Tuple1<String>, Tuple8<String, String, String, String, String, String, String, String>> {

        /**
         *
         */
        private static final long serialVersionUID = 1L;

        public void flatMap(
                Tuple1<String> paramIN,
                org.apache.flink.util.Collector<Tuple8<String, String, String, String, String, String, String, String>> out)
                throws Exception {
            String[] lines = paramIN.f0.split(";");
            if (lines != null && lines.length > 0) {
                String vocuherCode =lines[0];
                vocuherCode=vocuherCode+"TEST1";
                String VoucherId =  lines[1];
                String voucherNumber = lines[2];
                String status = lines[3]+"TWTSTST";
                String startDate = lines[4] + "";
                String endDate = lines[5] + "";
                String endStatus = lines[6];
                String endVoucherNumber = lines[7];
               
               
               
           
           
            out.collect(new Tuple8<String, String, String, String, String, String, String, String>(
                    vocuherCode, VoucherId, voucherNumber, status,
                    startDate, endDate, endStatus, endVoucherNumber));
            }

        }

    }

    public static class RecordReducer
            implements
            GroupReduceFunction<Tuple8<String, String, String, String, String, String, String, String>,
            Tuple8<String, String, String, String, String, String, String, String>> {

   
        /**
         *
         */
        private static final long serialVersionUID = -6045821605365596025L;

        @Override
        public void reduce(
                Iterable<Tuple8<String, String, String, String, String, String, String, String>> paramIterable,
                Collector<Tuple8<String, String, String, String, String, String, String, String>> paramCollector)
                throws Exception {
            // TODO Auto-generated method stub
            String vocuherCode = null;
            String VoucherId = null;
            String voucherNumber = null;
            String status = null;
            String startDate = null;
            String endDate = null;
            String endStatus = null;
            String endVoucherNumber = null;
            for (Tuple8<String, String, String, String, String, String, String, String> m : paramIterable) {
                vocuherCode = m.f0;
                VoucherId = m.f1;
                voucherNumber = m.f2;
                status = m.f3;
                startDate = m.f4;
                endDate = m.f5;
                endStatus = m.f6;
                endVoucherNumber = m.f7;
                paramCollector
                .collect(new Tuple8<String, String, String, String, String, String, String, String>(
                        vocuherCode, VoucherId, voucherNumber, status,
                        startDate, endDate, endStatus, endVoucherNumber));

            }

       

        }
    }
   

}

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

please help me on the same how can I achieve the portioning of fields on the above data and achieve the parallism to increase the throughput of my application

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Reading CSV Files ,Transform and Writting Back to CSV using Paralliesm

rmetzger0
Hi Lokesh,

I'm not sure if I fully understood your question. But you can not write the result in a single file from multiple writers.
If you want to process the data fully distributed, you'll also have to write it distributed.

On Wed, Aug 23, 2017 at 8:07 PM, Lokesh R <[hidden email]> wrote:

Hi Team,

I am using the apache flink with java for below problem statement

1.where i will read a csv file with field delimeter  character ;
2.transform the fields
3.write back the data back to csv

my doubts are as below

1. if i need to read the csv file of size above 50 gb what would be the approach
2 if i use Parallelism i am not able to split the data and collect it since its a csv file
and while writing a back to csv its creating a multiple files to write the data using the default Parallelism how can achieve the same

sample input is
000008000077;151139924603;3526358005322;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940

and sample output is

000008000077sfhsdfbs;151139924603;XXXXXXXXX;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940


below is the code which i am currently running on local environment

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

package com.ericsson.voucher;

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple8;

import org.apache.flink.util.Collector;


public class Classification {

    private static final String OUTPUT_PATH = "C:\\Projects\\DM\\Pentaho\\OutPut\\output.csv";

    public static void main(String[] args) throws Exception {
       

        ExecutionEnvironment env = ExecutionEnvironment
                .getExecutionEnvironment();
        env.setParallelism(20);
       
         long subViewStartTime = System.currentTimeMillis();
        DataSet<Tuple1<String>> rawdata = (DataSet<Tuple1<String>>) env
                .readCsvFile("C:\\Projects\\DM\\Pentaho\\CRV_EXPORT.csv")
                .lineDelimiter("\n").types(String.class);
    DataSet<Tuple8<String,String, String, String, String, String, String, String>> mails = rawdata
                .flatMap(new DataExtractor()).rebalance();
                mails.writeAsCsv(OUTPUT_PATH, "\n", ";").setParallelism(1);
           mails.print();
           long subViewEndTime = System.currentTimeMillis();
         
           long subViewDifference = subViewEndTime - subViewStartTime;
         
           System.out.println("The Difference Time is"+ subViewDifference/1000 +"seconds");

    }

    public static class DataExtractor
            extends
            RichFlatMapFunction<Tuple1<String>, Tuple8<String, String, String, String, String, String, String, String>> {

        /**
         *
         */
        private static final long serialVersionUID = 1L;

        public void flatMap(
                Tuple1<String> paramIN,
                org.apache.flink.util.Collector<Tuple8<String, String, String, String, String, String, String, String>> out)
                throws Exception {
            String[] lines = paramIN.f0.split(";");
            if (lines != null && lines.length > 0) {
                String vocuherCode =lines[0];
                vocuherCode=vocuherCode+"TEST1";
                String VoucherId =  lines[1];
                String voucherNumber = lines[2];
                String status = lines[3]+"TWTSTST";
                String startDate = lines[4] + "";
                String endDate = lines[5] + "";
                String endStatus = lines[6];
                String endVoucherNumber = lines[7];
               
               
               
           
           
            out.collect(new Tuple8<String, String, String, String, String, String, String, String>(
                    vocuherCode, VoucherId, voucherNumber, status,
                    startDate, endDate, endStatus, endVoucherNumber));
            }

        }

    }

    public static class RecordReducer
            implements
            GroupReduceFunction<Tuple8<String, String, String, String, String, String, String, String>,
            Tuple8<String, String, String, String, String, String, String, String>> {

   
        /**
         *
         */
        private static final long serialVersionUID = -6045821605365596025L;

        @Override
        public void reduce(
                Iterable<Tuple8<String, String, String, String, String, String, String, String>> paramIterable,
                Collector<Tuple8<String, String, String, String, String, String, String, String>> paramCollector)
                throws Exception {
            // TODO Auto-generated method stub
            String vocuherCode = null;
            String VoucherId = null;
            String voucherNumber = null;
            String status = null;
            String startDate = null;
            String endDate = null;
            String endStatus = null;
            String endVoucherNumber = null;
            for (Tuple8<String, String, String, String, String, String, String, String> m : paramIterable) {
                vocuherCode = m.f0;
                VoucherId = m.f1;
                voucherNumber = m.f2;
                status = m.f3;
                startDate = m.f4;
                endDate = m.f5;
                endStatus = m.f6;
                endVoucherNumber = m.f7;
                paramCollector
                .collect(new Tuple8<String, String, String, String, String, String, String, String>(
                        vocuherCode, VoucherId, voucherNumber, status,
                        startDate, endDate, endStatus, endVoucherNumber));

            }

       

        }
    }
   

}

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

please help me on the same how can I achieve the portioning of fields on the above data and achieve the parallism to increase the throughput of my application

 

 

 


Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Reading CSV Files ,Transform and Writting Back to CSV using Paralliesm

Lokesh Gowda
 Hi Robert  my question was if I need to read and write the csv file  of size which will be in gb how i can distribute the data sink to write into files 1gb exactly and since I am
New to flink I am not sure about this 


Regards 
Lokesh.r 

On Sat, Aug 26, 2017 at 2:56 AM Robert Metzger <[hidden email]> wrote:
Hi Lokesh,

I'm not sure if I fully understood your question. But you can not write the result in a single file from multiple writers.
If you want to process the data fully distributed, you'll also have to write it distributed.

On Wed, Aug 23, 2017 at 8:07 PM, Lokesh R <[hidden email]> wrote:

Hi Team,

I am using the apache flink with java for below problem statement

1.where i will read a csv file with field delimeter  character ;
2.transform the fields
3.write back the data back to csv

my doubts are as below

1. if i need to read the csv file of size above 50 gb what would be the approach
2 if i use Parallelism i am not able to split the data and collect it since its a csv file
and while writing a back to csv its creating a multiple files to write the data using the default Parallelism how can achieve the same

sample input is
000008000077;151139924603;3526358005322;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940

and sample output is

000008000077sfhsdfbs;151139924603;XXXXXXXXX;2;29/07/2016:00:00:00;29/07/2018:00:00:00;20;4800019940


below is the code which i am currently running on local environment

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

package com.ericsson.voucher;

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple8;

import org.apache.flink.util.Collector;


public class Classification {

    private static final String OUTPUT_PATH = "C:\\Projects\\DM\\Pentaho\\OutPut\\output.csv";

    public static void main(String[] args) throws Exception {
       

        ExecutionEnvironment env = ExecutionEnvironment
                .getExecutionEnvironment();
        env.setParallelism(20);
       
         long subViewStartTime = System.currentTimeMillis();
        DataSet<Tuple1<String>> rawdata = (DataSet<Tuple1<String>>) env
                .readCsvFile("C:\\Projects\\DM\\Pentaho\\CRV_EXPORT.csv")
                .lineDelimiter("\n").types(String.class);
    DataSet<Tuple8<String,String, String, String, String, String, String, String>> mails = rawdata
                .flatMap(new DataExtractor()).rebalance();
                mails.writeAsCsv(OUTPUT_PATH, "\n", ";").setParallelism(1);
           mails.print();
           long subViewEndTime = System.currentTimeMillis();
         
           long subViewDifference = subViewEndTime - subViewStartTime;
         
           System.out.println("The Difference Time is"+ subViewDifference/1000 +"seconds");

    }

    public static class DataExtractor
            extends
            RichFlatMapFunction<Tuple1<String>, Tuple8<String, String, String, String, String, String, String, String>> {

        /**
         *
         */
        private static final long serialVersionUID = 1L;

        public void flatMap(
                Tuple1<String> paramIN,
                org.apache.flink.util.Collector<Tuple8<String, String, String, String, String, String, String, String>> out)
                throws Exception {
            String[] lines = paramIN.f0.split(";");
            if (lines != null && lines.length > 0) {
                String vocuherCode =lines[0];
                vocuherCode=vocuherCode+"TEST1";
                String VoucherId =  lines[1];
                String voucherNumber = lines[2];
                String status = lines[3]+"TWTSTST";
                String startDate = lines[4] + "";
                String endDate = lines[5] + "";
                String endStatus = lines[6];
                String endVoucherNumber = lines[7];
               
               
               
           
           
            out.collect(new Tuple8<String, String, String, String, String, String, String, String>(
                    vocuherCode, VoucherId, voucherNumber, status,
                    startDate, endDate, endStatus, endVoucherNumber));
            }

        }

    }

    public static class RecordReducer
            implements
            GroupReduceFunction<Tuple8<String, String, String, String, String, String, String, String>,
            Tuple8<String, String, String, String, String, String, String, String>> {

   
        /**
         *
         */
        private static final long serialVersionUID = -6045821605365596025L;

        @Override
        public void reduce(
                Iterable<Tuple8<String, String, String, String, String, String, String, String>> paramIterable,
                Collector<Tuple8<String, String, String, String, String, String, String, String>> paramCollector)
                throws Exception {
            // TODO Auto-generated method stub
            String vocuherCode = null;
            String VoucherId = null;
            String voucherNumber = null;
            String status = null;
            String startDate = null;
            String endDate = null;
            String endStatus = null;
            String endVoucherNumber = null;
            for (Tuple8<String, String, String, String, String, String, String, String> m : paramIterable) {
                vocuherCode = m.f0;
                VoucherId = m.f1;
                voucherNumber = m.f2;
                status = m.f3;
                startDate = m.f4;
                endDate = m.f5;
                endStatus = m.f6;
                endVoucherNumber = m.f7;
                paramCollector
                .collect(new Tuple8<String, String, String, String, String, String, String, String>(
                        vocuherCode, VoucherId, voucherNumber, status,
                        startDate, endDate, endStatus, endVoucherNumber));

            }

       

        }
    }
   

}

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

please help me on the same how can I achieve the portioning of fields on the above data and achieve the parallism to increase the throughput of my application