Flink scala or Java - Dataload from CSV to database..

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink scala or Java - Dataload from CSV to database..

ram kumar-2
Hi Guys,

We are in the process of creating POC, 
I am looking for the sample project - Flink scala or java which can load data from database to database or 
CSV to relational database(any).


CSV ----------> SQLSERVER  ----------> AWS Redshift

could you please some one help me on that..

Cheers
Ram
Reply | Threaded
Open this post in threaded view
|

Re: Flink scala or Java - Dataload from CSV to database..

Flavio Pompermaier
I think you can start from this (using flink table-api), I hope it could be helpful:

PS:maybe someone could write a blog post on how to do this with Scala since it's a frequent question on the mailing list... :)
                
public static void main(String[] args) throws Exception {
                String path = "file:/tmp/myFile.csv";
String rowDelim = CsvInputFormat.DEFAULT_LINE_DELIMITER;
String fieldDelim = CsvInputFormat.DEFAULT_FIELD_DELIMITER;
                String[] fieldNames = "Column 1,Column 2,Column 3,Column4".split(fieldDelim);

Character quoteCharacter = '"';
boolean ignoreFirstLine = Boolean.TRUE;
String ignoreComments = null;
boolean lenient = Boolean.FALSE;

TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
};

                ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

CsvTableSource csvTableSource = new CsvTableSource(path,fieldNames,fieldTypes,fieldDelim,
    rowDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient);
DataSet<Row> csvDataSet = csvTableSource.getDataSet(env);
JDBCOutputFormat jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("org.postgresql.Driver") .setDBUrl("jdbc:postgresql://localhost/test?user=xxx&password=xxx") .setQuery("insert into %s (id, title, author, price, qty) values (?,?,?,?,?)") .finish(); csvDataSet.output(jdbcOutputFormat);
}

Best,
Flavio

On Tue, Oct 4, 2016 at 2:18 PM, ram kumar <[hidden email]> wrote:
Hi Guys,

We are in the process of creating POC,
I am looking for the sample project - Flink scala or java which can load
data from database to database or
CSV to relational database(any).


CSV ----------> SQLSERVER  ----------> AWS Redshift

could you please some one help me on that..

Cheers
Ram