Apache Phenix integration

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Apache Phenix integration

Flavio Pompermaier
Hi to all,
I'm writing a job that uses Apache Phoenix.

At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but it's not well suited to work with Table API because it cannot handle generic objects like Rows (it need a DBWritable Object that should be already present at compile time). So I've looked into the code of the PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat (basically).

However, to make it work I had to slightly modify the Flink JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the PreparedStatement. E.g:

    upload.executeBatch();
    dbConn.commit();

For the moment I've just created a dedicated PhoenixJdbcOutpuFormat where I've added these 2 lines of code starting from the code of the JDBCOutputformat (it couldn't be extended in this case because all fields are private).

What do you think about this? Should I open a ticket to add a connection commit after executeBatch (in order to be compatible with Phoenix) or something else (e.g. create a Phoenix connector that basically extend JDBCOutputConnector and ovewrite 2 methods, changing also the visibility of its fields to protected)?

Best,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: Apache Phenix integration

Fabian Hueske-2
Hi,

According to the JavaDocs of java.sql.Connection, commit() will throw an exception if the connection is in auto commit mode which should be the default.
So adding this change to the JdbcOutputFormat seems a bit risky.

Maybe the Phoenix JDBC connector does not enable auto commits by default (or doesn't support it). Can you check that Flavio?
If the Phoenix connector supports but not activates auto commits by default, we can enable it in JdbcOutputFormat.open().
If auto commits are not supported, we can add a check after execute() and call commit() only if Connection.getAutoCommit() returns false.

Best, Fabian


2017-09-06 11:38 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I'm writing a job that uses Apache Phoenix.

At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but it's not well suited to work with Table API because it cannot handle generic objects like Rows (it need a DBWritable Object that should be already present at compile time). So I've looked into the code of the PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat (basically).

However, to make it work I had to slightly modify the Flink JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the PreparedStatement. E.g:

    upload.executeBatch();
    dbConn.commit();

For the moment I've just created a dedicated PhoenixJdbcOutpuFormat where I've added these 2 lines of code starting from the code of the JDBCOutputformat (it couldn't be extended in this case because all fields are private).

What do you think about this? Should I open a ticket to add a connection commit after executeBatch (in order to be compatible with Phoenix) or something else (e.g. create a Phoenix connector that basically extend JDBCOutputConnector and ovewrite 2 methods, changing also the visibility of its fields to protected)?

Best,
Flavio


Reply | Threaded
Open this post in threaded view
|

Re: Apache Phenix integration

Flavio Pompermaier
Hi Fabian,
thanks for the detailed answer. Obviously you are right :)
As stated by https://phoenix.apache.org/tuning.html auto-commit is disabled by default in Phoenix, but it can be easily enabled just appending AutoCommit=true to the connection URL or, equivalently, setting the proper property in the conf object passed to the Phoenix QueryUtil.getConnectionUrl method that autogenerate the connection URL, i.e.:

----------------------
Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job");
jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
final Properties phoenixProps = PropertiesUtil.extractProperties(new Properties(), jobConf);
String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
----------------------

Now my job works also with the standard Flink JDBCOutputformat.
Just to help other people willing to play with Phoenix and HBase I paste below my simple test job:

@Test
  public void testPhoenixOutputFormat() throws Exception {

    final StreamExecutionEnvironment senv = getStreamingExecutionEnv();
    senv.enableCheckpointing(5000);
    DataStream<String> testStream = senv.fromElements("1,aaa,XXX", "2,bbb,YYY", "3,ccc,ZZZ");

    // Set the target Phoenix table and the columns
    DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() {

      private static final long serialVersionUID = 1L;

      @Override
      public Row map(String str) throws Exception {
        String[] split = str.split(Pattern.quote(","));
        Row ret = new Row(3);
        ret.setField(0, split[0]);
        ret.setField(1, split[1]);
        ret.setField(2, split[2]);
        return ret;
      }
    }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO));

    Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job");
    PhoenixMapReduceUtil.setOutput(job, "MY_TABLE", "FIELD_1,FIELD2,FIELD_3");
    final org.apache.hadoop.conf.Configuration jobConf = job.getConfiguration();
    jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
    final String upsertStatement = PhoenixConfigurationUtil.getUpsertStatement(jobConf);
    final Properties phoenixProps = PropertiesUtil.extractProperties(new Properties(), jobConf);
    String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
    
    rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
        .setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.class.getCanonicalName())
        .setDBUrl(connUrl)
        .setQuery(upsertStatement)
        .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR, Types.VARCHAR})
        .finish());
    
    senv.execute();
  }

Best,
Flavio

On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

According to the JavaDocs of java.sql.Connection, commit() will throw an exception if the connection is in auto commit mode which should be the default.
So adding this change to the JdbcOutputFormat seems a bit risky.

Maybe the Phoenix JDBC connector does not enable auto commits by default (or doesn't support it). Can you check that Flavio?
If the Phoenix connector supports but not activates auto commits by default, we can enable it in JdbcOutputFormat.open().
If auto commits are not supported, we can add a check after execute() and call commit() only if Connection.getAutoCommit() returns false.

Best, Fabian


2017-09-06 11:38 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I'm writing a job that uses Apache Phoenix.

At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but it's not well suited to work with Table API because it cannot handle generic objects like Rows (it need a DBWritable Object that should be already present at compile time). So I've looked into the code of the PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat (basically).

However, to make it work I had to slightly modify the Flink JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the PreparedStatement. E.g:

    upload.executeBatch();
    dbConn.commit();

For the moment I've just created a dedicated PhoenixJdbcOutpuFormat where I've added these 2 lines of code starting from the code of the JDBCOutputformat (it couldn't be extended in this case because all fields are private).

What do you think about this? Should I open a ticket to add a connection commit after executeBatch (in order to be compatible with Phoenix) or something else (e.g. create a Phoenix connector that basically extend JDBCOutputConnector and ovewrite 2 methods, changing also the visibility of its fields to protected)?

Best,
Flavio




Reply | Threaded
Open this post in threaded view
|

Re: Apache Phenix integration

Fabian Hueske-2
Great!

If you want to, you can open a PR that adds

if (!conn.getAutoCommit()) {
  conn.setAutoCommit(true);
}

to JdbcOutputFormat.open().

Cheers, Fabian



2017-09-06 15:55 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi Fabian,
thanks for the detailed answer. Obviously you are right :)
As stated by https://phoenix.apache.org/tuning.html auto-commit is disabled by default in Phoenix, but it can be easily enabled just appending AutoCommit=true to the connection URL or, equivalently, setting the proper property in the conf object passed to the Phoenix QueryUtil.getConnectionUrl method that autogenerate the connection URL, i.e.:

----------------------
Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job");
jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
final Properties phoenixProps = PropertiesUtil.extractProperties(new Properties(), jobConf);
String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
----------------------

Now my job works also with the standard Flink JDBCOutputformat.
Just to help other people willing to play with Phoenix and HBase I paste below my simple test job:

@Test
  public void testPhoenixOutputFormat() throws Exception {

    final StreamExecutionEnvironment senv = getStreamingExecutionEnv();
    senv.enableCheckpointing(5000);
    DataStream<String> testStream = senv.fromElements("1,aaa,XXX", "2,bbb,YYY", "3,ccc,ZZZ");

    // Set the target Phoenix table and the columns
    DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() {

      private static final long serialVersionUID = 1L;

      @Override
      public Row map(String str) throws Exception {
        String[] split = str.split(Pattern.quote(","));
        Row ret = new Row(3);
        ret.setField(0, split[0]);
        ret.setField(1, split[1]);
        ret.setField(2, split[2]);
        return ret;
      }
    }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO));

    Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job");
    PhoenixMapReduceUtil.setOutput(job, "MY_TABLE", "FIELD_1,FIELD2,FIELD_3");
    final org.apache.hadoop.conf.Configuration jobConf = job.getConfiguration();
    jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
    final String upsertStatement = PhoenixConfigurationUtil.getUpsertStatement(jobConf);
    final Properties phoenixProps = PropertiesUtil.extractProperties(new Properties(), jobConf);
    String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
    
    rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
        .setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.class.getCanonicalName())
        .setDBUrl(connUrl)
        .setQuery(upsertStatement)
        .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR, Types.VARCHAR})
        .finish());
    
    senv.execute();
  }

Best,
Flavio

On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

According to the JavaDocs of java.sql.Connection, commit() will throw an exception if the connection is in auto commit mode which should be the default.
So adding this change to the JdbcOutputFormat seems a bit risky.

Maybe the Phoenix JDBC connector does not enable auto commits by default (or doesn't support it). Can you check that Flavio?
If the Phoenix connector supports but not activates auto commits by default, we can enable it in JdbcOutputFormat.open().
If auto commits are not supported, we can add a check after execute() and call commit() only if Connection.getAutoCommit() returns false.

Best, Fabian


2017-09-06 11:38 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I'm writing a job that uses Apache Phoenix.

At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but it's not well suited to work with Table API because it cannot handle generic objects like Rows (it need a DBWritable Object that should be already present at compile time). So I've looked into the code of the PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat (basically).

However, to make it work I had to slightly modify the Flink JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the PreparedStatement. E.g:

    upload.executeBatch();
    dbConn.commit();

For the moment I've just created a dedicated PhoenixJdbcOutpuFormat where I've added these 2 lines of code starting from the code of the JDBCOutputformat (it couldn't be extended in this case because all fields are private).

What do you think about this? Should I open a ticket to add a connection commit after executeBatch (in order to be compatible with Phoenix) or something else (e.g. create a Phoenix connector that basically extend JDBCOutputConnector and ovewrite 2 methods, changing also the visibility of its fields to protected)?

Best,
Flavio





Reply | Threaded
Open this post in threaded view
|

Re: Apache Phenix integration

Flavio Pompermaier
Maybe this should be well documented also...is there any dedicated page to Flink and JDBC connectors?

On Wed, Sep 6, 2017 at 4:12 PM, Fabian Hueske <[hidden email]> wrote:
Great!

If you want to, you can open a PR that adds

if (!conn.getAutoCommit()) {
  conn.setAutoCommit(true);
}

to JdbcOutputFormat.open().

Cheers, Fabian



2017-09-06 15:55 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi Fabian,
thanks for the detailed answer. Obviously you are right :)
As stated by https://phoenix.apache.org/tuning.html auto-commit is disabled by default in Phoenix, but it can be easily enabled just appending AutoCommit=true to the connection URL or, equivalently, setting the proper property in the conf object passed to the Phoenix QueryUtil.getConnectionUrl method that autogenerate the connection URL, i.e.:

----------------------
Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job");
jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
final Properties phoenixProps = PropertiesUtil.extractProperties(new Properties(), jobConf);
String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
----------------------

Now my job works also with the standard Flink JDBCOutputformat.
Just to help other people willing to play with Phoenix and HBase I paste below my simple test job:

@Test
  public void testPhoenixOutputFormat() throws Exception {

    final StreamExecutionEnvironment senv = getStreamingExecutionEnv();
    senv.enableCheckpointing(5000);
    DataStream<String> testStream = senv.fromElements("1,aaa,XXX", "2,bbb,YYY", "3,ccc,ZZZ");

    // Set the target Phoenix table and the columns
    DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() {

      private static final long serialVersionUID = 1L;

      @Override
      public Row map(String str) throws Exception {
        String[] split = str.split(Pattern.quote(","));
        Row ret = new Row(3);
        ret.setField(0, split[0]);
        ret.setField(1, split[1]);
        ret.setField(2, split[2]);
        return ret;
      }
    }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO));

    Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job");
    PhoenixMapReduceUtil.setOutput(job, "MY_TABLE", "FIELD_1,FIELD2,FIELD_3");
    final org.apache.hadoop.conf.Configuration jobConf = job.getConfiguration();
    jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
    final String upsertStatement = PhoenixConfigurationUtil.getUpsertStatement(jobConf);
    final Properties phoenixProps = PropertiesUtil.extractProperties(new Properties(), jobConf);
    String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
    
    rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
        .setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.class.getCanonicalName())
        .setDBUrl(connUrl)
        .setQuery(upsertStatement)
        .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR, Types.VARCHAR})
        .finish());
    
    senv.execute();
  }

Best,
Flavio

On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

According to the JavaDocs of java.sql.Connection, commit() will throw an exception if the connection is in auto commit mode which should be the default.
So adding this change to the JdbcOutputFormat seems a bit risky.

Maybe the Phoenix JDBC connector does not enable auto commits by default (or doesn't support it). Can you check that Flavio?
If the Phoenix connector supports but not activates auto commits by default, we can enable it in JdbcOutputFormat.open().
If auto commits are not supported, we can add a check after execute() and call commit() only if Connection.getAutoCommit() returns false.

Best, Fabian


2017-09-06 11:38 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I'm writing a job that uses Apache Phoenix.

At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but it's not well suited to work with Table API because it cannot handle generic objects like Rows (it need a DBWritable Object that should be already present at compile time). So I've looked into the code of the PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat (basically).

However, to make it work I had to slightly modify the Flink JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the PreparedStatement. E.g:

    upload.executeBatch();
    dbConn.commit();

For the moment I've just created a dedicated PhoenixJdbcOutpuFormat where I've added these 2 lines of code starting from the code of the JDBCOutputformat (it couldn't be extended in this case because all fields are private).

What do you think about this? Should I open a ticket to add a connection commit after executeBatch (in order to be compatible with Phoenix) or something else (e.g. create a Phoenix connector that basically extend JDBCOutputConnector and ovewrite 2 methods, changing also the visibility of its fields to protected)?

Best,
Flavio






Reply | Threaded
Open this post in threaded view
|

Re: Apache Phenix integration

Flavio Pompermaier
I opened an issue for this: https://issues.apache.org/jira/browse/FLINK-7605

On Wed, Sep 6, 2017 at 4:24 PM, Flavio Pompermaier <[hidden email]> wrote:
Maybe this should be well documented also...is there any dedicated page to Flink and JDBC connectors?

On Wed, Sep 6, 2017 at 4:12 PM, Fabian Hueske <[hidden email]> wrote:
Great!

If you want to, you can open a PR that adds

if (!conn.getAutoCommit()) {
  conn.setAutoCommit(true);
}

to JdbcOutputFormat.open().

Cheers, Fabian



2017-09-06 15:55 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi Fabian,
thanks for the detailed answer. Obviously you are right :)
As stated by https://phoenix.apache.org/tuning.html auto-commit is disabled by default in Phoenix, but it can be easily enabled just appending AutoCommit=true to the connection URL or, equivalently, setting the proper property in the conf object passed to the Phoenix QueryUtil.getConnectionUrl method that autogenerate the connection URL, i.e.:

----------------------
Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job");
jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
final Properties phoenixProps = PropertiesUtil.extractProperties(new Properties(), jobConf);
String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
----------------------

Now my job works also with the standard Flink JDBCOutputformat.
Just to help other people willing to play with Phoenix and HBase I paste below my simple test job:

@Test
  public void testPhoenixOutputFormat() throws Exception {

    final StreamExecutionEnvironment senv = getStreamingExecutionEnv();
    senv.enableCheckpointing(5000);
    DataStream<String> testStream = senv.fromElements("1,aaa,XXX", "2,bbb,YYY", "3,ccc,ZZZ");

    // Set the target Phoenix table and the columns
    DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() {

      private static final long serialVersionUID = 1L;

      @Override
      public Row map(String str) throws Exception {
        String[] split = str.split(Pattern.quote(","));
        Row ret = new Row(3);
        ret.setField(0, split[0]);
        ret.setField(1, split[1]);
        ret.setField(2, split[2]);
        return ret;
      }
    }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO));

    Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job");
    PhoenixMapReduceUtil.setOutput(job, "MY_TABLE", "FIELD_1,FIELD2,FIELD_3");
    final org.apache.hadoop.conf.Configuration jobConf = job.getConfiguration();
    jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
    final String upsertStatement = PhoenixConfigurationUtil.getUpsertStatement(jobConf);
    final Properties phoenixProps = PropertiesUtil.extractProperties(new Properties(), jobConf);
    String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
    
    rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
        .setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.class.getCanonicalName())
        .setDBUrl(connUrl)
        .setQuery(upsertStatement)
        .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR, Types.VARCHAR})
        .finish());
    
    senv.execute();
  }

Best,
Flavio

On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

According to the JavaDocs of java.sql.Connection, commit() will throw an exception if the connection is in auto commit mode which should be the default.
So adding this change to the JdbcOutputFormat seems a bit risky.

Maybe the Phoenix JDBC connector does not enable auto commits by default (or doesn't support it). Can you check that Flavio?
If the Phoenix connector supports but not activates auto commits by default, we can enable it in JdbcOutputFormat.open().
If auto commits are not supported, we can add a check after execute() and call commit() only if Connection.getAutoCommit() returns false.

Best, Fabian


2017-09-06 11:38 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Hi to all,
I'm writing a job that uses Apache Phoenix.

At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but it's not well suited to work with Table API because it cannot handle generic objects like Rows (it need a DBWritable Object that should be already present at compile time). So I've looked into the code of the PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat (basically).

However, to make it work I had to slightly modify the Flink JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the PreparedStatement. E.g:

    upload.executeBatch();
    dbConn.commit();

For the moment I've just created a dedicated PhoenixJdbcOutpuFormat where I've added these 2 lines of code starting from the code of the JDBCOutputformat (it couldn't be extended in this case because all fields are private).

What do you think about this? Should I open a ticket to add a connection commit after executeBatch (in order to be compatible with Phoenix) or something else (e.g. create a Phoenix connector that basically extend JDBCOutputConnector and ovewrite 2 methods, changing also the visibility of its fields to protected)?

Best,
Flavio