Need help with JDBC Broken Pipeline Issue after some idle time

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Need help with JDBC Broken Pipeline Issue after some idle time

Fuyao Li-2

Hi Flink Community,

 

I need some help with JDBC sink in Datastream API. I can produce some records and sink it to database correctly. However, if I wait for 5 minutes between insertions. I will run into broken pipeline issue. Ater that, the Flink application will restart and recover from checkpoint and execute the failed SQL query. I tried hard to search for resources to understand such broken pipeline will happen, but I still can’t understand it.

 

The interesting thing is that, if the idle time is around 3 minutes, everything seems to be fine.

 

It seems to be a timeout related issue, but I just don’t know what should I do to fix the issue. I have shared the sink code. Could anyone share some ideas? Thank you so much!

My environment settings:

Flink version: 1.12.1

Scala version: 2.11

Java version: 1.11

Flink System parallelism: 1

JDBC Driver: Oracle ojdbc10

Database: Oracle Autonomous Database on Oracle Cloud Infrastructure(You can regard this as an cloud based Oracle Database)

 

The code for the sink:

        boDataStream

        .addSink(

            JdbcSink.sink(

                "INSERT INTO TEST_ADW (invoice_id, json_doc) values(?, ?)",

                (preparedStatement, testInvoiceBo) -> {

                  try {

                      Gson gson = new GsonBuilder()

                              .excludeFieldsWithoutExposeAnnotation()

                              .create();

                      String invoiceId = testInvoiceBo.getINVOICE_ID();

                      String json = gson.toJson(testInvoiceBo);

                      log.info("insertion information: {}", json);

                      preparedStatement.setString(1, invoiceId);

                      preparedStatement.setString(2, json);

                  } catch (JsonIOException e) {

                      log.error("Failed to parse JSON", e);

                  }

                },

                new JdbcExecutionOptions.Builder()

                .withBatchIntervalMs(0)

                .withBatchSize(1)

                .withMaxRetries(3)

                .build(),

                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

                    .withUrl(DB_URL)

                    .withDriverName("oracle.jdbc.driver.OracleDriver")

                    .withUsername("admin")

                    .withPassword("password")

                    .build()))

        .name("adwSink")

        .uid("adwSink")

        .setParallelism(1);

 

The JDBC broken pipeline log:

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Need help with JDBC Broken Pipeline Issue after some idle time

Fuyao Li-2

Sorry for the uncompleted email.

 

Error log of broken pipeline, the failed SQL will be executed after checkpoint automatic recovery. Please share some ideas on this issue. Really appreciate it. Thanks!

 

09:20:02,868 ERROR org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC executeBatch error, retry times = 3

java.sql.SQLRecoverableException: Closed Connection

                at oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)

                at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)

                at oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)

                at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)

                at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)

                at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)

                at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)

                at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)

                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

09:20:02,869 WARN  org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - Writing records to JDBC failed.

java.io.IOException: java.sql.SQLRecoverableException: Closed Connection

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:190)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)

                at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)

                at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)

                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.sql.SQLRecoverableException: Closed Connection

                at oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)

                at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)

                at oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)

                at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)

                at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)

                at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)

                ... 11 more

09:20:02,869 WARN  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Error closing producer.

java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.KafkaProducer.close(Ljava/time/Duration;)V

                at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.close(FlinkKafkaInternalProducer.java:172)

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:949)

                at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)

                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

09:20:02,871 WARN  org.apache.flink.runtime.taskmanager.Task                     - ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: distributions-notification) (1/1)#0 (b57e84496b38c77e8201536e7d0e3723) switched from RUNNING to FAILED.

java.io.IOException: Writing records to JDBC failed.

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:170)

                at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)

                at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)

                at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)

                at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)

                at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)

                at org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:504)

                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)

                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)

                at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)

                at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)

                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)

                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)

                at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)

                at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)

                at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)

                at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)

                at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)

                at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.io.IOException: Reestablish JDBC connection failed

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)

                ... 29 more

Caused by: java.sql.SQLRecoverableException: Closed Connection

                at oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)

                at oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)

                at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)

                at oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)

                at oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)

                at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)

                ... 30 more

09:20:02,872 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: distributions-notification) (1/1)#0 (b57e84496b38c77e8201536e7d0e3723).

09:20:02,878 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering task and sending final execution state FAILED to JobManager for task ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: distributions-notification) (1/1)#0 b57e84496b38c77e8201536e7d0e3723.

09:20:02,880 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: distributions-notification) (1/1) (b57e84496b38c77e8201536e7d0e3723) switched from RUNNING to FAILED on bc6eadd0-19bf-4924-9e6e-62ba5f239d1e @ localhost (dataPort=-1).

java.io.IOException: Writing records to JDBC failed.

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:170)

                at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)

                at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)

                at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)

                at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)

                at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)

                at org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:504)

                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)

                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)

                at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)

                at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)

                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)

                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)

                at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)

                at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)

                at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)

                at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)

                at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)

                at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.io.IOException: Reestablish JDBC connection failed

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)

                ... 29 more

Caused by: java.sql.SQLRecoverableException: Closed Connection

                at oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)

                at oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)

                at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)

                at oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)

                at oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)

                at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)

                ... 30 more

 

 

Thanks,

 

Best regards,

Fuyao

 

From: Fuyao Li <[hidden email]>
Date: Tuesday, March 2, 2021 at 10:33
To: user <[hidden email]>, Timo Walther <[hidden email]>
Subject: Need help with JDBC Broken Pipeline Issue after some idle time

Hi Flink Community,

 

I need some help with JDBC sink in Datastream API. I can produce some records and sink it to database correctly. However, if I wait for 5 minutes between insertions. I will run into broken pipeline issue. Ater that, the Flink application will restart and recover from checkpoint and execute the failed SQL query. I tried hard to search for resources to understand such broken pipeline will happen, but I still can’t understand it.

 

The interesting thing is that, if the idle time is around 3 minutes, everything seems to be fine.

 

It seems to be a timeout related issue, but I just don’t know what should I do to fix the issue. I have shared the sink code. Could anyone share some ideas? Thank you so much!

My environment settings:

Flink version: 1.12.1

Scala version: 2.11

Java version: 1.11

Flink System parallelism: 1

JDBC Driver: Oracle ojdbc10

Database: Oracle Autonomous Database on Oracle Cloud Infrastructure(You can regard this as an cloud based Oracle Database)

 

The code for the sink:

        boDataStream

        .addSink(

            JdbcSink.sink(

                "INSERT INTO TEST_ADW (invoice_id, json_doc) values(?, ?)",

                (preparedStatement, testInvoiceBo) -> {

                  try {

                      Gson gson = new GsonBuilder()

                              .excludeFieldsWithoutExposeAnnotation()

                              .create();

                      String invoiceId = testInvoiceBo.getINVOICE_ID();

                      String json = gson.toJson(testInvoiceBo);

                      log.info("insertion information: {}", json);

                      preparedStatement.setString(1, invoiceId);

                      preparedStatement.setString(2, json);

                  } catch (JsonIOException e) {

                      log.error("Failed to parse JSON", e);

                  }

                },

                new JdbcExecutionOptions.Builder()

                .withBatchIntervalMs(0)

                .withBatchSize(1)

                .withMaxRetries(3)

                .build(),

                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

                    .withUrl(DB_URL)

                    .withDriverName("oracle.jdbc.driver.OracleDriver")

                    .withUsername("admin")

                    .withPassword("password")

                    .build()))

        .name("adwSink")

        .uid("adwSink")

        .setParallelism(1);

 

The JDBC broken pipeline log:

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Need help with JDBC Broken Pipeline Issue after some idle time

XU Qinghui-2
It sounds like the jdbc driver's connection is closed somehow, and probably has nothing to do with flink itself.
Maybe you could check if there's some settings on the db that could close the connection after some inactivity, or otherwise it could be your network drops the inactive tcp connection after some time (you can try to use tcp keepalive in this case). 

BR,


Le mar. 2 mars 2021 à 19:38, Fuyao Li <[hidden email]> a écrit :

Sorry for the uncompleted email.

 

Error log of broken pipeline, the failed SQL will be executed after checkpoint automatic recovery. Please share some ideas on this issue. Really appreciate it. Thanks!

 

09:20:02,868 ERROR org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC executeBatch error, retry times = 3

java.sql.SQLRecoverableException: Closed Connection

                at oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)

                at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)

                at oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)

                at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)

                at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)

                at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)

                at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)

                at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)

                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

09:20:02,869 WARN  org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - Writing records to JDBC failed.

java.io.IOException: java.sql.SQLRecoverableException: Closed Connection

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:190)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)

                at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)

                at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)

                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.sql.SQLRecoverableException: Closed Connection

                at oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)

                at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)

                at oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)

                at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)

                at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)

                at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)

                ... 11 more

09:20:02,869 WARN  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Error closing producer.

java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.KafkaProducer.close(Ljava/time/Duration;)V

                at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.close(FlinkKafkaInternalProducer.java:172)

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:949)

                at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)

                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

09:20:02,871 WARN  org.apache.flink.runtime.taskmanager.Task                     - ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: distributions-notification) (1/1)#0 (b57e84496b38c77e8201536e7d0e3723) switched from RUNNING to FAILED.

java.io.IOException: Writing records to JDBC failed.

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:170)

                at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)

                at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)

                at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)

                at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)

                at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)

                at org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:504)

                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)

                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)

                at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)

                at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)

                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)

                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)

                at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)

                at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)

                at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)

                at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)

                at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)

                at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.io.IOException: Reestablish JDBC connection failed

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)

                ... 29 more

Caused by: java.sql.SQLRecoverableException: Closed Connection

                at oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)

                at oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)

                at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)

                at oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)

                at oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)

                at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)

                ... 30 more

09:20:02,872 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: distributions-notification) (1/1)#0 (b57e84496b38c77e8201536e7d0e3723).

09:20:02,878 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering task and sending final execution state FAILED to JobManager for task ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: distributions-notification) (1/1)#0 b57e84496b38c77e8201536e7d0e3723.

09:20:02,880 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: distributions-notification) (1/1) (b57e84496b38c77e8201536e7d0e3723) switched from RUNNING to FAILED on bc6eadd0-19bf-4924-9e6e-62ba5f239d1e @ localhost (dataPort=-1).

java.io.IOException: Writing records to JDBC failed.

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:170)

                at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)

                at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)

                at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)

                at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)

                at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)

                at org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:504)

                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)

                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)

                at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)

                at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)

                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)

                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)

                at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)

                at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)

                at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)

                at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)

                at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)

                at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.io.IOException: Reestablish JDBC connection failed

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)

                ... 29 more

Caused by: java.sql.SQLRecoverableException: Closed Connection

                at oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)

                at oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)

                at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)

                at oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)

                at oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)

                at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)

                ... 30 more

 

 

Thanks,

 

Best regards,

Fuyao

 

From: Fuyao Li <[hidden email]>
Date: Tuesday, March 2, 2021 at 10:33
To: user <[hidden email]>, Timo Walther <[hidden email]>
Subject: Need help with JDBC Broken Pipeline Issue after some idle time

Hi Flink Community,

 

I need some help with JDBC sink in Datastream API. I can produce some records and sink it to database correctly. However, if I wait for 5 minutes between insertions. I will run into broken pipeline issue. Ater that, the Flink application will restart and recover from checkpoint and execute the failed SQL query. I tried hard to search for resources to understand such broken pipeline will happen, but I still can’t understand it.

 

The interesting thing is that, if the idle time is around 3 minutes, everything seems to be fine.

 

It seems to be a timeout related issue, but I just don’t know what should I do to fix the issue. I have shared the sink code. Could anyone share some ideas? Thank you so much!

My environment settings:

Flink version: 1.12.1

Scala version: 2.11

Java version: 1.11

Flink System parallelism: 1

JDBC Driver: Oracle ojdbc10

Database: Oracle Autonomous Database on Oracle Cloud Infrastructure(You can regard this as an cloud based Oracle Database)

 

The code for the sink:

        boDataStream

        .addSink(

            JdbcSink.sink(

                "INSERT INTO TEST_ADW (invoice_id, json_doc) values(?, ?)",

                (preparedStatement, testInvoiceBo) -> {

                  try {

                      Gson gson = new GsonBuilder()

                              .excludeFieldsWithoutExposeAnnotation()

                              .create();

                      String invoiceId = testInvoiceBo.getINVOICE_ID();

                      String json = gson.toJson(testInvoiceBo);

                      log.info("insertion information: {}", json);

                      preparedStatement.setString(1, invoiceId);

                      preparedStatement.setString(2, json);

                  } catch (JsonIOException e) {

                      log.error("Failed to parse JSON", e);

                  }

                },

                new JdbcExecutionOptions.Builder()

                .withBatchIntervalMs(0)

                .withBatchSize(1)

                .withMaxRetries(3)

                .build(),

                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

                    .withUrl(DB_URL)

                    .withDriverName("oracle.jdbc.driver.OracleDriver")

                    .withUsername("admin")

                    .withPassword("password")

                    .build()))

        .name("adwSink")

        .uid("adwSink")

        .setParallelism(1);

 

The JDBC broken pipeline log:

 

 

Reply | Threaded
Open this post in threaded view
|

Re: [External] : Re: Need help with JDBC Broken Pipeline Issue after some idle time

Fuyao Li-2

Hi Qinghui,

 

I agree. I am trying to found internal and resources on the internet to fix the issue.  Idle Time Limits might be a reason. But after configuring those parameters and updated the sqlnet.ora to

WALLET_LOCATION = (SOURCE = (METHOD = file) (METHOD_DATA = (DIRECTORY="… ")))
SSL_SERVER_DN_MATCH=yes
NAMES.DIRECTORY_PATH=(ezconnect,tnsnames)
SQLNET.USE_HTTPS_PROXY=on
DISABLE_OOB=on
SQLNET.RECV_TIMEOUT = 7200
BEQUEATH_DETACH = YES
SQLNET.EXPIRE_TIME = 1
SQLNET.SEND_TIMEOUT = 7200
SQLNET.INBOUND_CONNECT_TIMEOUT = 7200

 

SQLNET.EXPIRE_TIME is kind of like heartbeat thing to keep the connection alive.

 

It still doesn’t work after all of these configurations. Pretty weird…

 

I will post a follow up if I could find the answer… Thanks.

 

BR,

Fuyao

 

 

From: XU Qinghui <[hidden email]>
Date: Tuesday, March 2, 2021 at 13:40
To: Fuyao Li <[hidden email]>
Cc: user <[hidden email]>, Timo Walther <[hidden email]>
Subject: [External] : Re: Need help with JDBC Broken Pipeline Issue after some idle time

It sounds like the jdbc driver's connection is closed somehow, and probably has nothing to do with flink itself.
Maybe you could check if there's some settings on the db that could close the connection after some inactivity, or otherwise it could be your network drops the inactive tcp connection after some time (you can try to use tcp keepalive in this case). 

 

BR,

 

 

Le mar. 2 mars 2021 à 19:38, Fuyao Li <[hidden email]> a écrit :

Sorry for the uncompleted email.

 

Error log of broken pipeline, the failed SQL will be executed after checkpoint automatic recovery. Please share some ideas on this issue. Really appreciate it. Thanks!

 

09:20:02,868 ERROR org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC executeBatch error, retry times = 3

java.sql.SQLRecoverableException: Closed Connection

                at oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)

                at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)

                at oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)

                at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)

                at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)

                at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)

                at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)

                at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)

                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

09:20:02,869 WARN  org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - Writing records to JDBC failed.

java.io.IOException: java.sql.SQLRecoverableException: Closed Connection

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:190)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)

                at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)

                at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)

                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.sql.SQLRecoverableException: Closed Connection

                at oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)

                at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)

                at oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)

                at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)

                at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)

                at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)

                ... 11 more

09:20:02,869 WARN  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Error closing producer.

java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.KafkaProducer.close(Ljava/time/Duration;)V

                at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.close(FlinkKafkaInternalProducer.java:172)

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:949)

                at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)

                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

09:20:02,871 WARN  org.apache.flink.runtime.taskmanager.Task                     - ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: distributions-notification) (1/1)#0 (b57e84496b38c77e8201536e7d0e3723) switched from RUNNING to FAILED.

java.io.IOException: Writing records to JDBC failed.

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:170)

                at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)

                at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)

                at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)

                at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)

                at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)

                at org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:504)

                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)

                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)

                at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)

                at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)

                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)

                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)

                at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)

                at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)

                at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)

                at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)

                at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)

                at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.io.IOException: Reestablish JDBC connection failed

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)

                ... 29 more

Caused by: java.sql.SQLRecoverableException: Closed Connection

                at oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)

                at oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)

                at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)

                at oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)

                at oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)

                at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)

                ... 30 more

09:20:02,872 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: distributions-notification) (1/1)#0 (b57e84496b38c77e8201536e7d0e3723).

09:20:02,878 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering task and sending final execution state FAILED to JobManager for task ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: distributions-notification) (1/1)#0 b57e84496b38c77e8201536e7d0e3723.

09:20:02,880 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: distributions-notification) (1/1) (b57e84496b38c77e8201536e7d0e3723) switched from RUNNING to FAILED on bc6eadd0-19bf-4924-9e6e-62ba5f239d1e @ localhost (dataPort=-1).

java.io.IOException: Writing records to JDBC failed.

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:170)

                at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)

                at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)

                at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)

                at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)

                at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)

                at org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:504)

                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)

                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)

                at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)

                at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)

                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)

                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)

                at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)

                at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)

                at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)

                at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)

                at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)

                at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.io.IOException: Reestablish JDBC connection failed

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)

                ... 29 more

Caused by: java.sql.SQLRecoverableException: Closed Connection

                at oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)

                at oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)

                at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)

                at oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)

                at oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)

                at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)

                ... 30 more

 

 

Thanks,

 

Best regards,

Fuyao

 

From: Fuyao Li <[hidden email]>
Date: Tuesday, March 2, 2021 at 10:33
To: user <[hidden email]>, Timo Walther <[hidden email]>
Subject: Need help with JDBC Broken Pipeline Issue after some idle time

Hi Flink Community,

 

I need some help with JDBC sink in Datastream API. I can produce some records and sink it to database correctly. However, if I wait for 5 minutes between insertions. I will run into broken pipeline issue. Ater that, the Flink application will restart and recover from checkpoint and execute the failed SQL query. I tried hard to search for resources to understand such broken pipeline will happen, but I still can’t understand it.

 

The interesting thing is that, if the idle time is around 3 minutes, everything seems to be fine.

 

It seems to be a timeout related issue, but I just don’t know what should I do to fix the issue. I have shared the sink code. Could anyone share some ideas? Thank you so much!

My environment settings:

Flink version: 1.12.1

Scala version: 2.11

Java version: 1.11

Flink System parallelism: 1

JDBC Driver: Oracle ojdbc10

Database: Oracle Autonomous Database on Oracle Cloud Infrastructure(You can regard this as an cloud based Oracle Database)

 

The code for the sink:

        boDataStream

        .addSink(

            JdbcSink.sink(

                "INSERT INTO TEST_ADW (invoice_id, json_doc) values(?, ?)",

                (preparedStatement, testInvoiceBo) -> {

                  try {

                      Gson gson = new GsonBuilder()

                              .excludeFieldsWithoutExposeAnnotation()

                              .create();

                      String invoiceId = testInvoiceBo.getINVOICE_ID();

                      String json = gson.toJson(testInvoiceBo);

                      log.info("insertion information: {}", json);

                      preparedStatement.setString(1, invoiceId);

                      preparedStatement.setString(2, json);

                  } catch (JsonIOException e) {

                      log.error("Failed to parse JSON", e);

                  }

                },

                new JdbcExecutionOptions.Builder()

                .withBatchIntervalMs(0)

                .withBatchSize(1)

                .withMaxRetries(3)

                .build(),

                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

                    .withUrl(DB_URL)

                    .withDriverName("oracle.jdbc.driver.OracleDriver")

                    .withUsername("admin")

                    .withPassword("password")

                    .build()))

        .name("adwSink")

        .uid("adwSink")

        .setParallelism(1);

 

The JDBC broken pipeline log:

 

 

Reply | Threaded
Open this post in threaded view
|

Re: [External] : Re: Need help with JDBC Broken Pipeline Issue after some idle time

Fuyao Li-2

Hi Flink Community,

 

After configuring the JDBC timeout time, I still could not get rid of the issue.

https://issues.apache.org/jira/browse/FLINK-21674

I created a JIRA task to describe the problem. Any suggestion is appreciated.

 

Best regards,

Fuyao

 

From: Fuyao Li <[hidden email]>
Date: Wednesday, March 3, 2021 at 15:14
To: XU Qinghui <[hidden email]>
Cc: user <[hidden email]>, Timo Walther <[hidden email]>
Subject: Re: [External] : Re: Need help with JDBC Broken Pipeline Issue after some idle time

Hi Qinghui,

 

I agree. I am trying to found internal and resources on the internet to fix the issue.  Idle Time Limits might be a reason. But after configuring those parameters and updated the sqlnet.ora to

WALLET_LOCATION = (SOURCE = (METHOD = file) (METHOD_DATA = (DIRECTORY="… ")))
SSL_SERVER_DN_MATCH=yes
NAMES.DIRECTORY_PATH=(ezconnect,tnsnames)
SQLNET.USE_HTTPS_PROXY=on
DISABLE_OOB=on
SQLNET.RECV_TIMEOUT = 7200
BEQUEATH_DETACH = YES
SQLNET.EXPIRE_TIME = 1
SQLNET.SEND_TIMEOUT = 7200
SQLNET.INBOUND_CONNECT_TIMEOUT = 7200

 

SQLNET.EXPIRE_TIME is kind of like heartbeat thing to keep the connection alive.

 

It still doesn’t work after all of these configurations. Pretty weird…

 

I will post a follow up if I could find the answer… Thanks.

 

BR,

Fuyao

 

 

From: XU Qinghui <[hidden email]>
Date: Tuesday, March 2, 2021 at 13:40
To: Fuyao Li <[hidden email]>
Cc: user <[hidden email]>, Timo Walther <[hidden email]>
Subject: [External] : Re: Need help with JDBC Broken Pipeline Issue after some idle time

It sounds like the jdbc driver's connection is closed somehow, and probably has nothing to do with flink itself.
Maybe you could check if there's some settings on the db that could close the connection after some inactivity, or otherwise it could be your network drops the inactive tcp connection after some time (you can try to use tcp keepalive in this case). 

 

BR,

 

 

Le mar. 2 mars 2021 à 19:38, Fuyao Li <[hidden email]> a écrit :

Sorry for the uncompleted email.

 

Error log of broken pipeline, the failed SQL will be executed after checkpoint automatic recovery. Please share some ideas on this issue. Really appreciate it. Thanks!

 

09:20:02,868 ERROR org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC executeBatch error, retry times = 3

java.sql.SQLRecoverableException: Closed Connection

                at oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)

                at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)

                at oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)

                at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)

                at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)

                at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)

                at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)

                at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)

                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

09:20:02,869 WARN  org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - Writing records to JDBC failed.

java.io.IOException: java.sql.SQLRecoverableException: Closed Connection

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:190)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)

                at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)

                at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)

                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.sql.SQLRecoverableException: Closed Connection

                at oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)

                at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)

                at oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)

                at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)

                at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)

                at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)

                ... 11 more

09:20:02,869 WARN  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Error closing producer.

java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.KafkaProducer.close(Ljava/time/Duration;)V

                at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.close(FlinkKafkaInternalProducer.java:172)

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:949)

                at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)

                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

09:20:02,871 WARN  org.apache.flink.runtime.taskmanager.Task                     - ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: distributions-notification) (1/1)#0 (b57e84496b38c77e8201536e7d0e3723) switched from RUNNING to FAILED.

java.io.IOException: Writing records to JDBC failed.

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:170)

                at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)

                at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)

                at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)

                at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)

                at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)

                at org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:504)

                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)

                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)

                at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)

                at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)

                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)

                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)

                at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)

                at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)

                at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)

                at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)

                at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)

                at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.io.IOException: Reestablish JDBC connection failed

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)

                ... 29 more

Caused by: java.sql.SQLRecoverableException: Closed Connection

                at oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)

                at oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)

                at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)

                at oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)

                at oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)

                at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)

                ... 30 more

09:20:02,872 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: distributions-notification) (1/1)#0 (b57e84496b38c77e8201536e7d0e3723).

09:20:02,878 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering task and sending final execution state FAILED to JobManager for task ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: distributions-notification) (1/1)#0 b57e84496b38c77e8201536e7d0e3723.

09:20:02,880 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: distributions-notification) (1/1) (b57e84496b38c77e8201536e7d0e3723) switched from RUNNING to FAILED on bc6eadd0-19bf-4924-9e6e-62ba5f239d1e @ localhost (dataPort=-1).

java.io.IOException: Writing records to JDBC failed.

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:170)

                at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)

                at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)

                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)

                at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)

                at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)

                at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)

                at org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:504)

                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)

                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)

                at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)

                at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)

                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)

                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)

                at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)

                at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)

                at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)

                at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)

                at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)

                at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)

                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.io.IOException: Reestablish JDBC connection failed

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)

                ... 29 more

Caused by: java.sql.SQLRecoverableException: Closed Connection

                at oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)

                at oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)

                at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)

                at oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)

                at oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)

                at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)

                at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)

                ... 30 more

 

 

Thanks,

 

Best regards,

Fuyao

 

From: Fuyao Li <[hidden email]>
Date: Tuesday, March 2, 2021 at 10:33
To: user <[hidden email]>, Timo Walther <[hidden email]>
Subject: Need help with JDBC Broken Pipeline Issue after some idle time

Hi Flink Community,

 

I need some help with JDBC sink in Datastream API. I can produce some records and sink it to database correctly. However, if I wait for 5 minutes between insertions. I will run into broken pipeline issue. Ater that, the Flink application will restart and recover from checkpoint and execute the failed SQL query. I tried hard to search for resources to understand such broken pipeline will happen, but I still can’t understand it.

 

The interesting thing is that, if the idle time is around 3 minutes, everything seems to be fine.

 

It seems to be a timeout related issue, but I just don’t know what should I do to fix the issue. I have shared the sink code. Could anyone share some ideas? Thank you so much!

My environment settings:

Flink version: 1.12.1

Scala version: 2.11

Java version: 1.11

Flink System parallelism: 1

JDBC Driver: Oracle ojdbc10

Database: Oracle Autonomous Database on Oracle Cloud Infrastructure(You can regard this as an cloud based Oracle Database)

 

The code for the sink:

        boDataStream

        .addSink(

            JdbcSink.sink(

                "INSERT INTO TEST_ADW (invoice_id, json_doc) values(?, ?)",

                (preparedStatement, testInvoiceBo) -> {

                  try {

                      Gson gson = new GsonBuilder()

                              .excludeFieldsWithoutExposeAnnotation()

                              .create();

                      String invoiceId = testInvoiceBo.getINVOICE_ID();

                      String json = gson.toJson(testInvoiceBo);

                      log.info("insertion information: {}", json);

                      preparedStatement.setString(1, invoiceId);

                      preparedStatement.setString(2, json);

                  } catch (JsonIOException e) {

                      log.error("Failed to parse JSON", e);

                  }

                },

                new JdbcExecutionOptions.Builder()

                .withBatchIntervalMs(0)

                .withBatchSize(1)

                .withMaxRetries(3)

                .build(),

                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

                    .withUrl(DB_URL)

                    .withDriverName("oracle.jdbc.driver.OracleDriver")

                    .withUsername("admin")

                    .withPassword("password")

                    .build()))

        .name("adwSink")

        .uid("adwSink")

        .setParallelism(1);

 

The JDBC broken pipeline log: