Kafka to Flink to Hive - Writes failing

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka to Flink to Hive - Writes failing

sagar loke

I am trying to Sink data to Hive via Confluent Kafka -> Flink -> Hive using following code snippet:

But I am getting following error:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<GenericRecord> stream = readFromKafka(env);


private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{
        BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
};

 JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
            .setDrivername("org.apache.hive.jdbc.HiveDriver")
            .setDBUrl("jdbc:<a href="hive2://hiveconnstring" class="">hive2://hiveconnstring")
            .setUsername("myuser")
            .setPassword("mypass")
            .setQuery("INSERT INTO testHiveDriverTable (key,value) VALUES (?,?)")
            .setBatchSize(1000)
            .setParameterTypes(FIELD_TYPES)
            .build();

    DataStream<Row> rows = stream.map((MapFunction<GenericRecord, Row>) st1 -> {
                Row row = new Row(2); // 
                row.setField(0, st1.get("SOME_ID")); 
                row.setField(1, st1.get("SOME_ADDRESS"));
                return row;
            });

    sink.emitDataStream(rows);
    env.execute("Flink101");


Caused by: java.lang.RuntimeException: Execution of JDBC statement failed.
at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:219)
at org.apache.flink.api.java.io.jdbc.JDBCSinkFunction.snapshotState(JDBCSinkFunction.java:43)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:356)
... 12 more

Caused by: java.sql.SQLException: Method not supported
at org.apache.hive.jdbc.HiveStatement.executeBatch(HiveStatement.java:381)
at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
... 17 more

I checked hive-jdbc driver and it seems that the Method is not supported in hive-jdbc driver.

public class HiveStatement implements java.sql.Statement {
...

  @Override  
  public int[] executeBatch() throws SQLException {
        throw new SQLFeatureNotSupportedException("Method not supported");
  }

..
}

Is there any way we can achieve this using JDBC Driver ?

Let me know,

Thanks in advance.

Reply | Threaded
Open this post in threaded view
|

Re: Kafka to Flink to Hive - Writes failing

Jörn Franke
Don’t use the JDBC driver to write to Hive. The performance of JDBC in general for large volumes is suboptimal.
Write it to a file in HDFS in a format supported by HIve and point the table definition in Hive to it.

On 11. Jun 2018, at 04:47, sagar loke <[hidden email]> wrote:

I am trying to Sink data to Hive via Confluent Kafka -> Flink -> Hive using following code snippet:

But I am getting following error:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<GenericRecord> stream = readFromKafka(env);


private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{
        BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
};

 JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
            .setDrivername("org.apache.hive.jdbc.HiveDriver")
            .setDBUrl("jdbc:<a href="hive2://hiveconnstring" class="">hive2://hiveconnstring")
            .setUsername("myuser")
            .setPassword("mypass")
            .setQuery("INSERT INTO testHiveDriverTable (key,value) VALUES (?,?)")
            .setBatchSize(1000)
            .setParameterTypes(FIELD_TYPES)
            .build();

    DataStream<Row> rows = stream.map((MapFunction<GenericRecord, Row>) st1 -> {
                Row row = new Row(2); // 
                row.setField(0, st1.get("SOME_ID")); 
                row.setField(1, st1.get("SOME_ADDRESS"));
                return row;
            });

    sink.emitDataStream(rows);
    env.execute("Flink101");


Caused by: java.lang.RuntimeException: Execution of JDBC statement failed.
at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:219)
at org.apache.flink.api.java.io.jdbc.JDBCSinkFunction.snapshotState(JDBCSinkFunction.java:43)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:356)
... 12 more

Caused by: java.sql.SQLException: Method not supported
at org.apache.hive.jdbc.HiveStatement.executeBatch(HiveStatement.java:381)
at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
... 17 more

I checked hive-jdbc driver and it seems that the Method is not supported in hive-jdbc driver.

public class HiveStatement implements java.sql.Statement {
...

  @Override  
  public int[] executeBatch() throws SQLException {
        throw new SQLFeatureNotSupportedException("Method not supported");
  }

..
}

Is there any way we can achieve this using JDBC Driver ?

Let me know,

Thanks in advance.

Reply | Threaded
Open this post in threaded view
|

Re: Kafka to Flink to Hive - Writes failing

sagar loke
Thanks, 
We are getting data in Avro format from Kafka and are planning to write data in ORC format to Hive tables. 

1. Is BucketingSink better option for this use case or something else ?
2. Is there a sample code example which we can refer ?

Thanks in advance,

On Sun, Jun 10, 2018 at 10:49 PM, Jörn Franke <[hidden email]> wrote:
Don’t use the JDBC driver to write to Hive. The performance of JDBC in general for large volumes is suboptimal.
Write it to a file in HDFS in a format supported by HIve and point the table definition in Hive to it.

On 11. Jun 2018, at 04:47, sagar loke <[hidden email]> wrote:

I am trying to Sink data to Hive via Confluent Kafka -> Flink -> Hive using following code snippet:

But I am getting following error:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<GenericRecord> stream = readFromKafka(env);


private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{
        BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
};

 JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
            .setDrivername("org.apache.hive.jdbc.HiveDriver")
            .setDBUrl("jdbc:hive2://hiveconnstring")
            .setUsername("myuser")
            .setPassword("mypass")
            .setQuery("INSERT INTO testHiveDriverTable (key,value) VALUES (?,?)")
            .setBatchSize(1000)
            .setParameterTypes(FIELD_TYPES)
            .build();

    DataStream<Row> rows = stream.map((MapFunction<GenericRecord, Row>) st1 -> {
                Row row = new Row(2); // 
                row.setField(0, st1.get("SOME_ID")); 
                row.setField(1, st1.get("SOME_ADDRESS"));
                return row;
            });

    sink.emitDataStream(rows);
    env.execute("Flink101");


Caused by: java.lang.RuntimeException: Execution of JDBC statement failed.
at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:219)
at org.apache.flink.api.java.io.jdbc.JDBCSinkFunction.snapshotState(JDBCSinkFunction.java:43)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:356)
... 12 more

Caused by: java.sql.SQLException: Method not supported
at org.apache.hive.jdbc.HiveStatement.executeBatch(HiveStatement.java:381)
at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
... 17 more

I checked hive-jdbc driver and it seems that the Method is not supported in hive-jdbc driver.

public class HiveStatement implements java.sql.Statement {
...

  @Override  
  public int[] executeBatch() throws SQLException {
        throw new SQLFeatureNotSupportedException("Method not supported");
  }

..
}

Is there any way we can achieve this using JDBC Driver ?

Let me know,

Thanks in advance.




--
Regards,
SAGAR.
Reply | Threaded
Open this post in threaded view
|

Re: Kafka to Flink to Hive - Writes failing

Piotr Nowojski
Yes, BucketingSink is a better option. You can start from looking at the BucketingSink java docs.

Please also take a look on this: 


Alternatively if you do not need to push a lot of data, you could write your own JDBC sink that bases on the JDBCAppendTableSink and adjusting it so that it works with hive’s JDBC client.

Piotrek

On 11 Jun 2018, at 08:12, sagar loke <[hidden email]> wrote:

Thanks, 
We are getting data in Avro format from Kafka and are planning to write data in ORC format to Hive tables. 

1. Is BucketingSink better option for this use case or something else ?
2. Is there a sample code example which we can refer ?

Thanks in advance,

On Sun, Jun 10, 2018 at 10:49 PM, Jörn Franke <[hidden email]> wrote:
Don’t use the JDBC driver to write to Hive. The performance of JDBC in general for large volumes is suboptimal.
Write it to a file in HDFS in a format supported by HIve and point the table definition in Hive to it.

On 11. Jun 2018, at 04:47, sagar loke <[hidden email]> wrote:

I am trying to Sink data to Hive via Confluent Kafka -> Flink -> Hive using following code snippet:

But I am getting following error:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<GenericRecord> stream = readFromKafka(env);


private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{
        BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
};

 JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
            .setDrivername("org.apache.hive.jdbc.HiveDriver")
            .setDBUrl("jdbc:hive2://hiveconnstring")
            .setUsername("myuser")
            .setPassword("mypass")
            .setQuery("INSERT INTO testHiveDriverTable (key,value) VALUES (?,?)")
            .setBatchSize(1000)
            .setParameterTypes(FIELD_TYPES)
            .build();

    DataStream<Row> rows = stream.map((MapFunction<GenericRecord, Row>) st1 -> {
                Row row = new Row(2); // 
                row.setField(0, st1.get("SOME_ID")); 
                row.setField(1, st1.get("SOME_ADDRESS"));
                return row;
            });

    sink.emitDataStream(rows);
    env.execute("Flink101");


Caused by: java.lang.RuntimeException: Execution of JDBC statement failed.
at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:219)
at org.apache.flink.api.java.io.jdbc.JDBCSinkFunction.snapshotState(JDBCSinkFunction.java:43)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:356)
... 12 more

Caused by: java.sql.SQLException: Method not supported
at org.apache.hive.jdbc.HiveStatement.executeBatch(HiveStatement.java:381)
at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
... 17 more

I checked hive-jdbc driver and it seems that the Method is not supported in hive-jdbc driver.

public class HiveStatement implements java.sql.Statement {
...

  @Override  
  public int[] executeBatch() throws SQLException {
        throw new SQLFeatureNotSupportedException("Method not supported");
  }

..
}

Is there any way we can achieve this using JDBC Driver ?

Let me know,

Thanks in advance.




--
Regards,
SAGAR.