java.io.IOException: Couldn't access resultSet

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

java.io.IOException: Couldn't access resultSet

David Olsen
Following the sample on the flink website[1] to test jdbc I encountered an error "Couldn't access resultSet". It looks like the nextRecord is called before open() function. However I've called open() when I construct jdbc input format. Any functions I should call before job submission?

def jdbc()= {
  val jdbcif = JDBCInputFormat.buildJDBCInputFormat.setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://localhost/test").setQuery("select name from department").setUsername(...).setPassword(...).finish
  jdbcif.open(null)
  jdbcif.asInstanceOf[JDBCInputFormat[Tuple1[String]]]
}

def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment // -> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    val evidence$6 = new TupleTypeInfo(classOf[Tuple1[String]], STRING_TYPE_INFO)
    val stream = env.createInput(jdbc(), evidence$6)
    stream.map ( new MapFunction[Tuple1[String], String]() {
      override def map(tuple: Tuple1[String]): String = tuple.getField(0)
    }).returns(classOf[String]).writeAsText("/path/to/jdbc")
    env.execute("test-flink")

The version used in this test is flink 1.0.3 and scala 2.11.
Reply | Threaded
Open this post in threaded view
|

Re: java.io.IOException: Couldn't access resultSet

Chesnay Schepler
you are not supposed to call open yourselves.

On 05.06.2016 11:05, David Olsen wrote:

> Following the sample on the flink website[1] to test jdbc I
> encountered an error "Couldn't access resultSet". It looks like the
> nextRecord is called before open() function. However I've called
> open() when I construct jdbc input format. Any functions I should call
> before job submission?
>
> def jdbc()= {
>   val jdbcif =
> JDBCInputFormat.buildJDBCInputFormat.setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://localhost/test").setQuery("select
> name from department").setUsername(...).setPassword(...).finish
>   jdbcif.open(null)
>   jdbcif.asInstanceOf[JDBCInputFormat[Tuple1[String]]]
> }
>
> def main(args: Array[String]) {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment // ->
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>     val evidence$6 = new TupleTypeInfo(classOf[Tuple1[String]],
> STRING_TYPE_INFO)
>     val stream = env.createInput(jdbc(), evidence$6)
>     stream.map ( new MapFunction[Tuple1[String], String]() {
>       override def map(tuple: Tuple1[String]): String = tuple.getField(0)
> }).returns(classOf[String]).writeAsText("/path/to/jdbc")
>     env.execute("test-flink")
> }
>
> The version used in this test is flink 1.0.3 and scala 2.11.
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/

Reply | Threaded
Open this post in threaded view
|

Re: java.io.IOException: Couldn't access resultSet

David Olsen
I remove the open method when constructing jdbc input format, but I still obtain "couldn't access resultSet" error.

Caused by: java.io.IOException: Couldn't access resultSet
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:179)
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:51)
at org.apache.flink.streaming.api.functions.source.FileSourceFunction.run(FileSourceFunction.java:124)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:164)
... 7 more

Anything I should check as well?

Thanks


On 5 June 2016 at 17:26, Chesnay Schepler <[hidden email]> wrote:
you are not supposed to call open yourselves.


On 05.06.2016 11:05, David Olsen wrote:
Following the sample on the flink website[1] to test jdbc I encountered an error "Couldn't access resultSet". It looks like the nextRecord is called before open() function. However I've called open() when I construct jdbc input format. Any functions I should call before job submission?

def jdbc()= {
  val jdbcif = JDBCInputFormat.buildJDBCInputFormat.setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://localhost/test").setQuery("select name from department").setUsername(...).setPassword(...).finish
  jdbcif.open(null)
  jdbcif.asInstanceOf[JDBCInputFormat[Tuple1[String]]]
}

def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment // -> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    val evidence$6 = new TupleTypeInfo(classOf[Tuple1[String]], STRING_TYPE_INFO)
    val stream = env.createInput(jdbc(), evidence$6)
    stream.map ( new MapFunction[Tuple1[String], String]() {
      override def map(tuple: Tuple1[String]): String = tuple.getField(0)
}).returns(classOf[String]).writeAsText("/path/to/jdbc")
    env.execute("test-flink")
}

The version used in this test is flink 1.0.3 and scala 2.11.

[1]. https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/


Reply | Threaded
Open this post in threaded view
|

Re: java.io.IOException: Couldn't access resultSet

David Olsen
Switching to use org.apache.flink.api.java.ExecutionEnvironment, my code can successfully read data from database through JDBCInputFormat. But I need stream mode (and now it seems that the DataSet and DataStream is not interchangeable). Are there any additional functions required to be executed before StreamExecutionEnvironment creates jdbc input?

Thanks


On 5 June 2016 at 18:26, David Olsen <[hidden email]> wrote:
I remove the open method when constructing jdbc input format, but I still obtain "couldn't access resultSet" error.

Caused by: java.io.IOException: Couldn't access resultSet
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:179)
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:51)
at org.apache.flink.streaming.api.functions.source.FileSourceFunction.run(FileSourceFunction.java:124)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:164)
... 7 more

Anything I should check as well?

Thanks


On 5 June 2016 at 17:26, Chesnay Schepler <[hidden email]> wrote:
you are not supposed to call open yourselves.


On 05.06.2016 11:05, David Olsen wrote:
Following the sample on the flink website[1] to test jdbc I encountered an error "Couldn't access resultSet". It looks like the nextRecord is called before open() function. However I've called open() when I construct jdbc input format. Any functions I should call before job submission?

def jdbc()= {
  val jdbcif = JDBCInputFormat.buildJDBCInputFormat.setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://localhost/test").setQuery("select name from department").setUsername(...).setPassword(...).finish
  jdbcif.open(null)
  jdbcif.asInstanceOf[JDBCInputFormat[Tuple1[String]]]
}

def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment // -> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    val evidence$6 = new TupleTypeInfo(classOf[Tuple1[String]], STRING_TYPE_INFO)
    val stream = env.createInput(jdbc(), evidence$6)
    stream.map ( new MapFunction[Tuple1[String], String]() {
      override def map(tuple: Tuple1[String]): String = tuple.getField(0)
}).returns(classOf[String]).writeAsText("/path/to/jdbc")
    env.execute("test-flink")
}

The version used in this test is flink 1.0.3 and scala 2.11.

[1]. https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/



Reply | Threaded
Open this post in threaded view
|

Re: java.io.IOException: Couldn't access resultSet

Stephan Ewen
Hi David!

You are using the JDBC format that was written for the batch API in the streaming API.

While that should actually work, it is a somewhat new and less tested function. Let's double check that the call to open() is properly forwarded.


On Sun, Jun 5, 2016 at 12:47 PM, David Olsen <[hidden email]> wrote:
Switching to use org.apache.flink.api.java.ExecutionEnvironment, my code can successfully read data from database through JDBCInputFormat. But I need stream mode (and now it seems that the DataSet and DataStream is not interchangeable). Are there any additional functions required to be executed before StreamExecutionEnvironment creates jdbc input?

Thanks


On 5 June 2016 at 18:26, David Olsen <[hidden email]> wrote:
I remove the open method when constructing jdbc input format, but I still obtain "couldn't access resultSet" error.

Caused by: java.io.IOException: Couldn't access resultSet
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:179)
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:51)
at org.apache.flink.streaming.api.functions.source.FileSourceFunction.run(FileSourceFunction.java:124)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:164)
... 7 more

Anything I should check as well?

Thanks


On 5 June 2016 at 17:26, Chesnay Schepler <[hidden email]> wrote:
you are not supposed to call open yourselves.


On 05.06.2016 11:05, David Olsen wrote:
Following the sample on the flink website[1] to test jdbc I encountered an error "Couldn't access resultSet". It looks like the nextRecord is called before open() function. However I've called open() when I construct jdbc input format. Any functions I should call before job submission?

def jdbc()= {
  val jdbcif = JDBCInputFormat.buildJDBCInputFormat.setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://localhost/test").setQuery("select name from department").setUsername(...).setPassword(...).finish
  jdbcif.open(null)
  jdbcif.asInstanceOf[JDBCInputFormat[Tuple1[String]]]
}

def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment // -> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    val evidence$6 = new TupleTypeInfo(classOf[Tuple1[String]], STRING_TYPE_INFO)
    val stream = env.createInput(jdbc(), evidence$6)
    stream.map ( new MapFunction[Tuple1[String], String]() {
      override def map(tuple: Tuple1[String]): String = tuple.getField(0)
}).returns(classOf[String]).writeAsText("/path/to/jdbc")
    env.execute("test-flink")
}

The version used in this test is flink 1.0.3 and scala 2.11.

[1]. https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/




Reply | Threaded
Open this post in threaded view
|

Re: java.io.IOException: Couldn't access resultSet

Aljoscha Krettek
The problem could be that open() is not called with a proper Configuration object in streaming mode.

On Sun, 5 Jun 2016 at 19:33 Stephan Ewen <[hidden email]> wrote:
Hi David!

You are using the JDBC format that was written for the batch API in the streaming API.

While that should actually work, it is a somewhat new and less tested function. Let's double check that the call to open() is properly forwarded.


On Sun, Jun 5, 2016 at 12:47 PM, David Olsen <[hidden email]> wrote:
Switching to use org.apache.flink.api.java.ExecutionEnvironment, my code can successfully read data from database through JDBCInputFormat. But I need stream mode (and now it seems that the DataSet and DataStream is not interchangeable). Are there any additional functions required to be executed before StreamExecutionEnvironment creates jdbc input?

Thanks


On 5 June 2016 at 18:26, David Olsen <[hidden email]> wrote:
I remove the open method when constructing jdbc input format, but I still obtain "couldn't access resultSet" error.

Caused by: java.io.IOException: Couldn't access resultSet
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:179)
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:51)
at org.apache.flink.streaming.api.functions.source.FileSourceFunction.run(FileSourceFunction.java:124)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:164)
... 7 more

Anything I should check as well?

Thanks


On 5 June 2016 at 17:26, Chesnay Schepler <[hidden email]> wrote:
you are not supposed to call open yourselves.


On 05.06.2016 11:05, David Olsen wrote:
Following the sample on the flink website[1] to test jdbc I encountered an error "Couldn't access resultSet". It looks like the nextRecord is called before open() function. However I've called open() when I construct jdbc input format. Any functions I should call before job submission?

def jdbc()= {
  val jdbcif = JDBCInputFormat.buildJDBCInputFormat.setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://localhost/test").setQuery("select name from department").setUsername(...).setPassword(...).finish
  jdbcif.open(null)
  jdbcif.asInstanceOf[JDBCInputFormat[Tuple1[String]]]
}

def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment // -> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    val evidence$6 = new TupleTypeInfo(classOf[Tuple1[String]], STRING_TYPE_INFO)
    val stream = env.createInput(jdbc(), evidence$6)
    stream.map ( new MapFunction[Tuple1[String], String]() {
      override def map(tuple: Tuple1[String]): String = tuple.getField(0)
}).returns(classOf[String]).writeAsText("/path/to/jdbc")
    env.execute("test-flink")
}

The version used in this test is flink 1.0.3 and scala 2.11.

[1]. https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/




Reply | Threaded
Open this post in threaded view
|

Re: java.io.IOException: Couldn't access resultSet

Chesnay Schepler
the JDBC IF does not and never has used the configuration.

On 06.06.2016 09:27, Aljoscha Krettek wrote:
The problem could be that open() is not called with a proper Configuration object in streaming mode.

On Sun, 5 Jun 2016 at 19:33 Stephan Ewen <[hidden email]> wrote:
Hi David!

You are using the JDBC format that was written for the batch API in the streaming API.

While that should actually work, it is a somewhat new and less tested function. Let's double check that the call to open() is properly forwarded.


On Sun, Jun 5, 2016 at 12:47 PM, David Olsen <[hidden email]> wrote:
Switching to use org.apache.flink.api.java.ExecutionEnvironment, my code can successfully read data from database through JDBCInputFormat. But I need stream mode (and now it seems that the DataSet and DataStream is not interchangeable). Are there any additional functions required to be executed before StreamExecutionEnvironment creates jdbc input?

Thanks


On 5 June 2016 at 18:26, David Olsen <[hidden email]> wrote:
I remove the open method when constructing jdbc input format, but I still obtain "couldn't access resultSet" error.

Caused by: java.io.IOException: Couldn't access resultSet
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:179)
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:51)
at org.apache.flink.streaming.api.functions.source.FileSourceFunction.run(FileSourceFunction.java:124)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:164)
... 7 more

Anything I should check as well?

Thanks


On 5 June 2016 at 17:26, Chesnay Schepler <[hidden email]> wrote:
you are not supposed to call open yourselves.


On 05.06.2016 11:05, David Olsen wrote:
Following the sample on the flink website[1] to test jdbc I encountered an error "Couldn't access resultSet". It looks like the nextRecord is called before open() function. However I've called open() when I construct jdbc input format. Any functions I should call before job submission?

def jdbc()= {
  val jdbcif = JDBCInputFormat.buildJDBCInputFormat.setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://localhost/test").setQuery("select name from department").setUsername(...).setPassword(...).finish
  jdbcif.open(null)
  jdbcif.asInstanceOf[JDBCInputFormat[Tuple1[String]]]
}

def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment // -> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    val evidence$6 = new TupleTypeInfo(classOf[Tuple1[String]], STRING_TYPE_INFO)
    val stream = env.createInput(jdbc(), evidence$6)
    stream.map ( new MapFunction[Tuple1[String], String]() {
      override def map(tuple: Tuple1[String]): String = tuple.getField(0)
}).returns(classOf[String]).writeAsText("/path/to/jdbc")
    env.execute("test-flink")
}

The version used in this test is flink 1.0.3 and scala 2.11.

[1]. https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/





Reply | Threaded
Open this post in threaded view
|

Re: java.io.IOException: Couldn't access resultSet

David Olsen
After recompiling the 1.0.3 source and testing it, I discover that InputFormat.open() in FileSourceFunction doesn't get called because splitIterator.hasNext() returns false. This looks like getInputSplits() creates Iterator<InputSplit> object with 'exhausted' variable initialized to false, and then the following statement checks if hasNext() then open inputformat when it's true but exhausted variable always returns false, resulting in InputFormat.open() is not called (due to hasNext() always returns false). Is the variable 'exhausted' supposed to act in that way (initialized to false, then check if hasNext() true, which unfortunately is always false)? 

I appreciate any suggestions. Thanks. 


On 6 June 2016 at 15:46, Chesnay Schepler <[hidden email]> wrote:
the JDBC IF does not and never has used the configuration.


On 06.06.2016 09:27, Aljoscha Krettek wrote:
The problem could be that open() is not called with a proper Configuration object in streaming mode.

On Sun, 5 Jun 2016 at 19:33 Stephan Ewen <[hidden email][hidden email]> wrote:
Hi David!

You are using the JDBC format that was written for the batch API in the streaming API.

While that should actually work, it is a somewhat new and less tested function. Let's double check that the call to open() is properly forwarded.


On Sun, Jun 5, 2016 at 12:47 PM, David Olsen <[hidden email]> wrote:
Switching to use org.apache.flink.api.java.ExecutionEnvironment, my code can successfully read data from database through JDBCInputFormat. But I need stream mode (and now it seems that the DataSet and DataStream is not interchangeable). Are there any additional functions required to be executed before StreamExecutionEnvironment creates jdbc input?

Thanks


On 5 June 2016 at 18:26, David Olsen <[hidden email][hidden email]> wrote:
I remove the open method when constructing jdbc input format, but I still obtain "couldn't access resultSet" error.

Caused by: java.io.IOException: Couldn't access resultSet
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:179)
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:51)
at org.apache.flink.streaming.api.functions.source.FileSourceFunction.run(FileSourceFunction.java:124)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:164)
... 7 more

Anything I should check as well?

Thanks


On 5 June 2016 at 17:26, Chesnay Schepler <[hidden email][hidden email]> wrote:
you are not supposed to call open yourselves.


On 05.06.2016 11:05, David Olsen wrote:
Following the sample on the flink website[1] to test jdbc I encountered an error "Couldn't access resultSet". It looks like the nextRecord is called before open() function. However I've called open() when I construct jdbc input format. Any functions I should call before job submission?

def jdbc()= {
  val jdbcif = JDBCInputFormat.buildJDBCInputFormat.setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://localhost/test").setQuery("select name from department").setUsername(...).setPassword(...).finish
  jdbcif.open(null)
  jdbcif.asInstanceOf[JDBCInputFormat[Tuple1[String]]]
}

def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment // -> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    val evidence$6 = new TupleTypeInfo(classOf[Tuple1[String]], STRING_TYPE_INFO)
    val stream = env.createInput(jdbc(), evidence$6)
    stream.map ( new MapFunction[Tuple1[String], String]() {
      override def map(tuple: Tuple1[String]): String = tuple.getField(0)
}).returns(classOf[String]).writeAsText("/path/to/jdbc")
    env.execute("test-flink")
}

The version used in this test is flink 1.0.3 and scala 2.11.

[1]. https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/






Reply | Threaded
Open this post in threaded view
|

Re: java.io.IOException: Couldn't access resultSet

Chesnay Schepler
open() not being called is a valuable observation, but from then on out i have problems following you.

within hasNext() we first check whether exhausted is true, and if so return false. Since it is initialized with false we will not return here.
this seems like correct behaviour.

I have one question though: what parallelism does the source run with?

On 06.06.2016 17:37, David Olsen wrote:
After recompiling the 1.0.3 source and testing it, I discover that InputFormat.open() in FileSourceFunction doesn't get called because splitIterator.hasNext() returns false. This looks like getInputSplits() creates Iterator<InputSplit> object with 'exhausted' variable initialized to false, and then the following statement checks if hasNext() then open inputformat when it's true but exhausted variable always returns false, resulting in InputFormat.open() is not called (due to hasNext() always returns false). Is the variable 'exhausted' supposed to act in that way (initialized to false, then check if hasNext() true, which unfortunately is always false)? 

I appreciate any suggestions. Thanks. 


On 6 June 2016 at 15:46, Chesnay Schepler <[hidden email]> wrote:
the JDBC IF does not and never has used the configuration.


On 06.06.2016 09:27, Aljoscha Krettek wrote:
The problem could be that open() is not called with a proper Configuration object in streaming mode.

On Sun, 5 Jun 2016 at 19:33 Stephan Ewen <[hidden email]> wrote:
Hi David!

You are using the JDBC format that was written for the batch API in the streaming API.

While that should actually work, it is a somewhat new and less tested function. Let's double check that the call to open() is properly forwarded.


On Sun, Jun 5, 2016 at 12:47 PM, David Olsen <[hidden email]> wrote:
Switching to use org.apache.flink.api.java.ExecutionEnvironment, my code can successfully read data from database through JDBCInputFormat. But I need stream mode (and now it seems that the DataSet and DataStream is not interchangeable). Are there any additional functions required to be executed before StreamExecutionEnvironment creates jdbc input?

Thanks


On 5 June 2016 at 18:26, David Olsen <[hidden email]> wrote:
I remove the open method when constructing jdbc input format, but I still obtain "couldn't access resultSet" error.

Caused by: java.io.IOException: Couldn't access resultSet
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:179)
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:51)
at org.apache.flink.streaming.api.functions.source.FileSourceFunction.run(FileSourceFunction.java:124)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:164)
... 7 more

Anything I should check as well?

Thanks


On 5 June 2016 at 17:26, Chesnay Schepler <[hidden email]> wrote:
you are not supposed to call open yourselves.


On 05.06.2016 11:05, David Olsen wrote:
Following the sample on the flink website[1] to test jdbc I encountered an error "Couldn't access resultSet". It looks like the nextRecord is called before open() function. However I've called open() when I construct jdbc input format. Any functions I should call before job submission?

def jdbc()= {
  val jdbcif = JDBCInputFormat.buildJDBCInputFormat.setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://localhost/test").setQuery("select name from department").setUsername(...).setPassword(...).finish
  jdbcif.open(null)
  jdbcif.asInstanceOf[JDBCInputFormat[Tuple1[String]]]
}

def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment // -> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    val evidence$6 = new TupleTypeInfo(classOf[Tuple1[String]], STRING_TYPE_INFO)
    val stream = env.createInput(jdbc(), evidence$6)
    stream.map ( new MapFunction[Tuple1[String], String]() {
      override def map(tuple: Tuple1[String]): String = tuple.getField(0)
}).returns(classOf[String]).writeAsText("/path/to/jdbc")
    env.execute("test-flink")
}

The version used in this test is flink 1.0.3 and scala 2.11.

[1]. https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/