akka.pattern.AskTimeoutException: Ask timed out (after upgrading to flink 1.4.0)

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

akka.pattern.AskTimeoutException: Ask timed out (after upgrading to flink 1.4.0)

Bart Kastermans
I have upgraded to flink-1.4.0, with just local task and job manager (flink/bin/start-cluster.sh).  After
solving the dependency issues, I now get the below error consistently on a specific job.  As this means
absolutely nothing to me (other than that I realise flink uses akka), I have no idea where to start
debugging.

The job that errors this way, reads from kafka with FlinkKafkaConsumer010 and writes to postgres
over a JDBC connection.

- bart


Stack trace:

java.lang.Exception: Cannot deploy task Source: Custom Source -> com.kpn.datalab.inhome.StoreInKV$ mapAddKrnToEvent -> Sink: Unnamed (1/1) (676bd6fcaff976cd72ea3672c45354ce) - TaskManager (2855fef5d0fb313cdb47fbd6350f81aa @ flink-1682409794-3n335 (dataPort=37534)) not responding after a timeout of 10000 ms
        at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$3(Execution.java:529)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@flink-1682409794-3n335:42536/user/taskmanager#262833087]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage".
        at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
        at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
        at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
        at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
        at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
        at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
        at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
        at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
        at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
        ... 1 more





job code:



package com.datalab.inhome

import java.util.Properties
import java.util.function.BiConsumer

import com.datalab.SchemaStore
import com.datalab.commons.Utils._
import com.datalab.commons.flink.ByteArraySerializer
import com.datalab.commons.flink.ByteArraySerializer.MyByteArray

import com.datalab.schemas.inhome.model_result.v1_1_1.ModelResult
import org.apache.flink.api.common.functions.RichMapFunction

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.json.JSONObject
import org.slf4j.{Logger, LoggerFactory}

class mapAddUserId(schemaBaseDirectory: String) extends RichMapFunction[MyByteArray, (String, MyByteArray)] {
  val log: Logger = LoggerFactory.getLogger(this.getClass)

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    SchemaStore.initialize(schemaBaseDirectory)
  }

  override def map(value: MyByteArray): (String, MyByteArray) = {
    val event = SchemaStore.instance().deserializeEvent(value.bytes)
    val userId = event.payload(classOf[ModelResult]).getProfile.getKrn.toString
    (userId, value)
  }
}

object StoreInKV {
  val log: Logger = LoggerFactory.getLogger(this.getClass.getName)

  def main(args: Array[String]): Unit = {
    val parameters = ParameterTool.fromArgs(args)
    val setup = RaeUtils.setup()
    val sourceTopic = parameters.get("sourceTopic","modelResult")

    val kvTableName = parameters.get("kvTableName", "inhome_model_scores")

    val jobDescription = new JSONObject()
    parameters.toMap.forEach(new BiConsumer[String, String] {
      override def accept(t: String, u: String): Unit = jobDescription.put(t, u)
    })
    jobDescription.put("jar", this.getClass.getName)

    log.info("********************** Settings ********************************************")
    log.info(s"Job ${this.getClass} started at ${msToUtc(System.currentTimeMillis())}")
    log.info(s"Parameters: ${jobDescription.toString}")
    log.info(s"Setup: ${setup}")
    log.info("*****************************************************************************")

    val kafkaConsumerProps = new Properties()
    kafkaConsumerProps.setProperty("bootstrap.servers", setup.kafka_bootstrap)
    kafkaConsumerProps.setProperty("group.id",s"storeInKV-${scala.util.Random.nextInt}")
    kafkaConsumerProps.setProperty("client.id",s"storeInKV-${scala.util.Random.nextInt}")
    val source = new FlinkKafkaConsumer010[ByteArraySerializer.MyByteArray](sourceTopic, new ByteArraySerializer(),
      kafkaConsumerProps)

    val dbProps = new Properties()
    dbProps.setProperty("databaseConnect", setup.dbConnectionString)
    dbProps.setProperty("user", setup.dbUser)
    dbProps.setProperty("password", setup.dbPasswd)
    dbProps.setProperty("tableName", kvTableName)
    dbProps.setProperty("createTable", "false")    // TODO: remove table creating from KeyValueStore; should be done by orchestrator

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.addSource(source)
      .map(new mapAddUserId(setup.schemaBaseDirectory))
      .name(s"${this.getClass.getName} mapAddKrnToEvent")
      .addSink(new PostgresSinkKV(dbProps))

    env.execute(s"RAE_StoreInKV:: ${jobDescription.toString}")
  }
}