Hi to all,
I'm writing a job that uses Apache Phoenix. At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but it's not well suited to work with Table API because it cannot handle generic objects like Rows (it need a DBWritable Object that should be already present at compile time). So I've looked into the code of the PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat (basically). However, to make it work I had to slightly modify the Flink JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the PreparedStatement. E.g: upload.executeBatch(); dbConn.commit(); For the moment I've just created a dedicated PhoenixJdbcOutpuFormat where I've added these 2 lines of code starting from the code of the JDBCOutputformat (it couldn't be extended in this case because all fields are private). What do you think about this? Should I open a ticket to add a connection commit after executeBatch (in order to be compatible with Phoenix) or something else (e.g. create a Phoenix connector that basically extend JDBCOutputConnector and ovewrite 2 methods, changing also the visibility of its fields to protected)? Best, Flavio |
Hi, According to the JavaDocs of java.sql.Connection, commit() will throw an exception if the connection is in auto commit mode which should be the default. So adding this change to the JdbcOutputFormat seems a bit risky. If the Phoenix connector supports but not activates auto commits by default, we can enable it in JdbcOutputFormat.open(). If auto commits are not supported, we can add a check after execute() and call commit() only if Connection.getAutoCommit() returns false. Best, Fabian 2017-09-06 11:38 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
Hi Fabian,
thanks for the detailed answer. Obviously you are right :) As stated by https://phoenix.apache.org/tuning.html auto-commit is disabled by default in Phoenix, but it can be easily enabled just appending AutoCommit=true to the connection URL or, equivalently, setting the proper property in the conf object passed to the Phoenix QueryUtil.getConnectionUrl method that autogenerate the connection URL, i.e.: ---------------------- Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job"); jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true"); final Properties phoenixProps = PropertiesUtil.extractProperties(new Properties(), jobConf); String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf); ---------------------- Now my job works also with the standard Flink JDBCOutputformat. Just to help other people willing to play with Phoenix and HBase I paste below my simple test job: @Test public void testPhoenixOutputFormat() throws Exception { final StreamExecutionEnvironment senv = getStreamingExecutionEnv(); senv.enableCheckpointing(5000); DataStream<String> testStream = senv.fromElements("1,aaa,XXX", "2,bbb,YYY", "3,ccc,ZZZ"); // Set the target Phoenix table and the columns DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() { private static final long serialVersionUID = 1L; @Override public Row map(String str) throws Exception { String[] split = str.split(Pattern.quote(",")); Row ret = new Row(3); ret.setField(0, split[0]); ret.setField(1, split[1]); ret.setField(2, split[2]); return ret; } }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO)); Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job"); PhoenixMapReduceUtil.setOutput(job, "MY_TABLE", "FIELD_1,FIELD2,FIELD_3"); final org.apache.hadoop.conf.Configuration jobConf = job.getConfiguration(); jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true"); final String upsertStatement = PhoenixConfigurationUtil.getUpsertStatement(jobConf); final Properties phoenixProps = PropertiesUtil.extractProperties(new Properties(), jobConf); String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf); rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.class.getCanonicalName()) .setDBUrl(connUrl) .setQuery(upsertStatement) .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR, Types.VARCHAR}) .finish()); senv.execute(); } Best, Flavio On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske <[hidden email]> wrote:
|
Great! If you want to, you can open a PR that adds} 2017-09-06 15:55 GMT+02:00 Flavio Pompermaier <[hidden email]>:
|
Maybe this should be well documented also...is there any dedicated page to Flink and JDBC connectors?
On Wed, Sep 6, 2017 at 4:12 PM, Fabian Hueske <[hidden email]> wrote:
|
I opened an issue for this: https://issues.apache.org/jira/browse/FLINK-7605
On Wed, Sep 6, 2017 at 4:24 PM, Flavio Pompermaier <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |