Async Functions and Scala async-client for mySql/MariaDB database connection

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Async Functions and Scala async-client for mySql/MariaDB database connection

Andrea Spina
Dear Flink community,

I started to use Async Functions in Scala, Flink 1.2.0, in order to retrieve enriching information from MariaDB database. In order to do that, I firstly employed classical jdbc library (org.mariadb.jdbc) and it worked has expected.

Due to the blocking behavior of jdbc, I'm trying to use this library
https://github.com/mauricio/postgresql-async/tree/master/mysql-async
which promises to offer a subset of features in a non-blocking fashion.

Sadly I'm not able to use it.

Following the async function code.


object AsyncEnricher {
  case class OutputType(field1: FieldType, field2: FieldType)
}

class AsyncEnricher(configuration: MariaDBConfig)
    extends AsyncFunction[InputType, OutputType]
    with Serializable
    with AutoCloseable
    with LazyLogging {

  private val queryString = s"SELECT <column> FROM [table] WHERE <column_name> = <value>;"

  implicit lazy val executor = ExecutionContext.fromExecutor(Executors.directExecutor())

  private lazy val mariaDBClient: Connection = {
    val config = createConfiguration(configuration)
    val connection = new MySQLConnection(config)
    Await.result(connection.connect, 5 seconds)
  }

  override def asyncInvoke(input: InputType, collector: AsyncCollector[OutputType]): Unit = {

    val queryResult = mariaDBClient.sendPreparedStatement(queryString, Seq(input.fieldToSearch))

    queryResult.map(_.rows) onSuccess {
      case Some(resultSet) =>
        Try {
          resultSet.head(0).asInstanceOf[FieldType]
        } match {
          case Success(value) =>
            collector.collect(Iterable(OutputType(value, value)))
          case Failure(e) =>
            logger.error(s"retrieving value from MariaDB raised $e: $queryString executed")
        }
      case _ => logger.error(s"value not found: $queryString executed")
    }

    queryResult onFailure {
      case e: Throwable =>
        logger.error(s"retrieving location volume from MariaDB raised $e: $queryString executed")
    }

  }

  override def close(): Unit = {
    Try(mariaDBClient.disconnect).recover {
      case t: Throwable => logger.info(s"MariaDB cannot be closed - ${t.getMessage}")
    }
  }

}


Follows the stack


TimerException{java.lang.IllegalStateException: Timer service is shut down}
        at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Timer service is shut down
        at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.registerTimer(SystemProcessingTimeService.java:118)
        at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.onProcessingTime(TimestampsAndPeriodicWatermarksOperator.java:82)
        at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218)
        ... 7 more

java.lang.NullPointerException
        at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:343)
        at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:320)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
        at java.lang.Thread.run(Thread.java:745)


I think it's involving connection.connect returning object which is a Future and so the Await. This is different than jdbc driver, which worked like a charm. I tried to move away the await from the lazy val.

Can't wait for your opinion. Thank you so much in advance.

Andrea
Reply | Threaded
Open this post in threaded view
|

Re: Async Functions and Scala async-client for mySql/MariaDB database connection

Ufuk Celebi
I'm not too familiar with what's happening here, but maybe Klou (cc'd) can help?

On Thu, Mar 30, 2017 at 6:50 PM, Andrea Spina
<[hidden email]> wrote:

> Dear Flink community,
>
> I started to use Async Functions in Scala, Flink 1.2.0, in order to retrieve
> enriching information from MariaDB database. In order to do that, I firstly
> employed classical jdbc library (org.mariadb.jdbc) and it worked has
> expected.
>
> Due to the blocking behavior of jdbc, I'm trying to use this library
> https://github.com/mauricio/postgresql-async/tree/master/mysql-async
> which promises to offer a subset of features in a non-blocking fashion.
>
> Sadly I'm not able to use it.
>
> Following the async function code.
>
> *
> object AsyncEnricher {
>   case class OutputType(field1: FieldType, field2: FieldType)
> }
>
> class AsyncEnricher(configuration: MariaDBConfig)
>     extends AsyncFunction[InputType, OutputType]
>     with Serializable
>     with AutoCloseable
>     with LazyLogging {
>
>   private val queryString = s"SELECT <column> FROM [table] WHERE
> <column_name> = <value>;"
>
>   implicit lazy val executor =
> ExecutionContext.fromExecutor(Executors.directExecutor())
>
>   private lazy val mariaDBClient: Connection = {
>     val config = createConfiguration(configuration)
>     val connection = new MySQLConnection(config)
>     Await.result(connection.connect, 5 seconds)
>   }
>
>   override def asyncInvoke(input: InputType, collector:
> AsyncCollector[OutputType]): Unit = {
>
>     val queryResult = mariaDBClient.sendPreparedStatement(queryString,
> Seq(input.fieldToSearch))
>
>     queryResult.map(_.rows) onSuccess {
>       case Some(resultSet) =>
>         Try {
>           resultSet.head(0).asInstanceOf[FieldType]
>         } match {
>           case Success(value) =>
>             collector.collect(Iterable(OutputType(value, value)))
>           case Failure(e) =>
>             logger.error(s"retrieving value from MariaDB raised $e:
> $queryString executed")
>         }
>       case _ => logger.error(s"value not found: $queryString executed")
>     }
>
>     queryResult onFailure {
>       case e: Throwable =>
>         logger.error(s"retrieving location volume from MariaDB raised $e:
> $queryString executed")
>     }
>
>   }
>
>   override def close(): Unit = {
>     Try(mariaDBClient.disconnect).recover {
>       case t: Throwable => logger.info(s"MariaDB cannot be closed -
> ${t.getMessage}")
>     }
>   }
>
> }
> *
>
> Follows the stack
>
> /
> TimerException{java.lang.IllegalStateException: Timer service is shut down}
>         at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Timer service is shut down
>         at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.registerTimer(SystemProcessingTimeService.java:118)
>         at
> org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.onProcessingTime(TimestampsAndPeriodicWatermarksOperator.java:82)
>         at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218)
>         ... 7 more
>
> java.lang.NullPointerException
>         at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:343)
>         at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:320)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>         at java.lang.Thread.run(Thread.java:745)
> /
>
> I think it's involving connection.connect returning object which is a Future
> and so the Await. This is different than jdbc driver, which worked like a
> charm. I tried to move away the await from the lazy val.
>
> Can't wait for your opinion. Thank you so much in advance.
>
> Andrea
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Async-Functions-and-Scala-async-client-for-mySql-MariaDB-database-connection-tp12469.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.