String[] fieldNames = "Column 1,Column 2,Column 3,Column4".split(fieldDelim);
Character quoteCharacter = '"';
boolean ignoreFirstLine = Boolean.TRUE;
String ignoreComments = null;
boolean lenient = Boolean.FALSE;
TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
};
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
CsvTableSource csvTableSource = new CsvTableSource(path,fieldNames,fieldTypes,fieldDelim,
rowDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient);
DataSet<Row> csvDataSet = csvTableSource.getDataSet(env);
JDBCOutputFormat jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("org.postgresql.Driver")
.setDBUrl("jdbc:postgresql://localhost/test?user=xxx&password=xxx")
.setQuery("insert into %s (id, title, author, price, qty) values (?,?,?,?,?)")
.finish();
csvDataSet.output(jdbcOutputFormat);
}