Dear Flink community,
I started to use Async Functions in Scala, Flink 1.2.0, in order to retrieve enriching information from MariaDB database. In order to do that, I firstly employed classical jdbc library (org.mariadb.jdbc) and it worked has expected. Due to the blocking behavior of jdbc, I'm trying to use this library https://github.com/mauricio/postgresql-async/tree/master/mysql-async which promises to offer a subset of features in a non-blocking fashion. Sadly I'm not able to use it. Following the async function code. object AsyncEnricher { case class OutputType(field1: FieldType, field2: FieldType) } class AsyncEnricher(configuration: MariaDBConfig) extends AsyncFunction[InputType, OutputType] with Serializable with AutoCloseable with LazyLogging { private val queryString = s"SELECT <column> FROM [table] WHERE <column_name> = <value>;" implicit lazy val executor = ExecutionContext.fromExecutor(Executors.directExecutor()) private lazy val mariaDBClient: Connection = { val config = createConfiguration(configuration) val connection = new MySQLConnection(config) Await.result(connection.connect, 5 seconds) } override def asyncInvoke(input: InputType, collector: AsyncCollector[OutputType]): Unit = { val queryResult = mariaDBClient.sendPreparedStatement(queryString, Seq(input.fieldToSearch)) queryResult.map(_.rows) onSuccess { case Some(resultSet) => Try { resultSet.head(0).asInstanceOf[FieldType] } match { case Success(value) => collector.collect(Iterable(OutputType(value, value))) case Failure(e) => logger.error(s"retrieving value from MariaDB raised $e: $queryString executed") } case _ => logger.error(s"value not found: $queryString executed") } queryResult onFailure { case e: Throwable => logger.error(s"retrieving location volume from MariaDB raised $e: $queryString executed") } } override def close(): Unit = { Try(mariaDBClient.disconnect).recover { case t: Throwable => logger.info(s"MariaDB cannot be closed - ${t.getMessage}") } } } Follows the stack TimerException{java.lang.IllegalStateException: Timer service is shut down} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalStateException: Timer service is shut down at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.registerTimer(SystemProcessingTimeService.java:118) at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.onProcessingTime(TimestampsAndPeriodicWatermarksOperator.java:82) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218) ... 7 more java.lang.NullPointerException at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:343) at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:320) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) I think it's involving connection.connect returning object which is a Future and so the Await. This is different than jdbc driver, which worked like a charm. I tried to move away the await from the lazy val. Can't wait for your opinion. Thank you so much in advance. Andrea |
I'm not too familiar with what's happening here, but maybe Klou (cc'd) can help?
On Thu, Mar 30, 2017 at 6:50 PM, Andrea Spina <[hidden email]> wrote: > Dear Flink community, > > I started to use Async Functions in Scala, Flink 1.2.0, in order to retrieve > enriching information from MariaDB database. In order to do that, I firstly > employed classical jdbc library (org.mariadb.jdbc) and it worked has > expected. > > Due to the blocking behavior of jdbc, I'm trying to use this library > https://github.com/mauricio/postgresql-async/tree/master/mysql-async > which promises to offer a subset of features in a non-blocking fashion. > > Sadly I'm not able to use it. > > Following the async function code. > > * > object AsyncEnricher { > case class OutputType(field1: FieldType, field2: FieldType) > } > > class AsyncEnricher(configuration: MariaDBConfig) > extends AsyncFunction[InputType, OutputType] > with Serializable > with AutoCloseable > with LazyLogging { > > private val queryString = s"SELECT <column> FROM [table] WHERE > <column_name> = <value>;" > > implicit lazy val executor = > ExecutionContext.fromExecutor(Executors.directExecutor()) > > private lazy val mariaDBClient: Connection = { > val config = createConfiguration(configuration) > val connection = new MySQLConnection(config) > Await.result(connection.connect, 5 seconds) > } > > override def asyncInvoke(input: InputType, collector: > AsyncCollector[OutputType]): Unit = { > > val queryResult = mariaDBClient.sendPreparedStatement(queryString, > Seq(input.fieldToSearch)) > > queryResult.map(_.rows) onSuccess { > case Some(resultSet) => > Try { > resultSet.head(0).asInstanceOf[FieldType] > } match { > case Success(value) => > collector.collect(Iterable(OutputType(value, value))) > case Failure(e) => > logger.error(s"retrieving value from MariaDB raised $e: > $queryString executed") > } > case _ => logger.error(s"value not found: $queryString executed") > } > > queryResult onFailure { > case e: Throwable => > logger.error(s"retrieving location volume from MariaDB raised $e: > $queryString executed") > } > > } > > override def close(): Unit = { > Try(mariaDBClient.disconnect).recover { > case t: Throwable => logger.info(s"MariaDB cannot be closed - > ${t.getMessage}") > } > } > > } > * > > Follows the stack > > / > TimerException{java.lang.IllegalStateException: Timer service is shut down} > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalStateException: Timer service is shut down > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.registerTimer(SystemProcessingTimeService.java:118) > at > org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.onProcessingTime(TimestampsAndPeriodicWatermarksOperator.java:82) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218) > ... 7 more > > java.lang.NullPointerException > at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:343) > at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:320) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > / > > I think it's involving connection.connect returning object which is a Future > and so the Await. This is different than jdbc driver, which worked like a > charm. I tried to move away the await from the lazy val. > > Can't wait for your opinion. Thank you so much in advance. > > Andrea > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Async-Functions-and-Scala-async-client-for-mySql-MariaDB-database-connection-tp12469.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Free forum by Nabble | Edit this page |