Flink JDBC JDBCOutputFormat Open

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink JDBC JDBCOutputFormat Open

Swapnil Chougule
Hi Team,

I want to know how tasknumber & numtasks help in opening db connection in Flink JDBC JDBCOutputFormat Open. I checked with docs where it says:
taskNumber - The number of the parallel instance.
numTasks - The number of parallel tasks.
But couldn't get clear idea among parallel instance & parallel tasks. How do they contribute in concurrency with JDBC Source/Sink?

I also checked with code but couldn't drill down further

/**
* Connects to the target database and initializes the prepared statement.
*
* @param taskNumber The number of the parallel instance.
* @throws IOException Thrown, if the output could not be opened due to an
* I/O problem.
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
establishConnection();
upload = dbConn.prepareStatement(query);
} catch (SQLException sqe) {
throw new IllegalArgumentException("open() failed.", sqe);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
}
}
private void establishConnection() throws SQLException, ClassNotFoundException {
Class.forName(drivername);
if (username == null) {
dbConn = DriverManager.getConnection(dbURL);
} else {
dbConn = DriverManager.getConnection(dbURL, username, password);
}
}

Thanks,
Swapnil


Reply | Threaded
Open this post in threaded view
|

Re: Flink JDBC JDBCOutputFormat Open

Chesnay Schepler
Hello,

the JDBC Sink completely ignores the taskNumber and parallelism.

Regards,
Chesnay

On 12.09.2016 08:41, Swapnil Chougule wrote:
Hi Team,

I want to know how tasknumber & numtasks help in opening db connection in Flink JDBC JDBCOutputFormat Open. I checked with docs where it says:
taskNumber - The number of the parallel instance.
numTasks - The number of parallel tasks.
But couldn't get clear idea among parallel instance & parallel tasks. How do they contribute in concurrency with JDBC Source/Sink?

I also checked with code but couldn't drill down further

/**
* Connects to the target database and initializes the prepared statement.
*
* @param taskNumber The number of the parallel instance.
* @throws IOException Thrown, if the output could not be opened due to an
* I/O problem.
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
establishConnection();
upload = dbConn.prepareStatement(query);
} catch (SQLException sqe) {
throw new IllegalArgumentException("open() failed.", sqe);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
}
}

private void establishConnection() throws SQLException, ClassNotFoundException {
Class.forName(drivername);
if (username == null) {
dbConn = DriverManager.getConnection(dbURL);
} else {
dbConn = DriverManager.getConnection(dbURL, username, password);
}
}

Thanks,
Swapnil



Reply | Threaded
Open this post in threaded view
|

Re: Flink JDBC JDBCOutputFormat Open

Swapnil Chougule
Thanks Chesnay for update.

On Tue, Sep 13, 2016 at 12:13 AM, Chesnay Schepler <[hidden email]> wrote:
Hello,

the JDBC Sink completely ignores the taskNumber and parallelism.

Regards,
Chesnay


On 12.09.2016 08:41, Swapnil Chougule wrote:
Hi Team,

I want to know how tasknumber & numtasks help in opening db connection in Flink JDBC JDBCOutputFormat Open. I checked with docs where it says:
taskNumber - The number of the parallel instance.
numTasks - The number of parallel tasks.
But couldn't get clear idea among parallel instance & parallel tasks. How do they contribute in concurrency with JDBC Source/Sink?

I also checked with code but couldn't drill down further

/**
* Connects to the target database and initializes the prepared statement.
*
* @param taskNumber The number of the parallel instance.
* @throws IOException Thrown, if the output could not be opened due to an
* I/O problem.
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
establishConnection();
upload = dbConn.prepareStatement(query);
} catch (SQLException sqe) {
throw new IllegalArgumentException("open() failed.", sqe);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
}
}

private void establishConnection() throws SQLException, ClassNotFoundException {
Class.forName(drivername);
if (username == null) {
dbConn = DriverManager.getConnection(dbURL);
} else {
dbConn = DriverManager.getConnection(dbURL, username, password);
}
}

Thanks,
Swapnil