I have upgraded to flink-1.4.0, with just local task and job manager (flink/bin/start-cluster.sh). After
solving the dependency issues, I now get the below error consistently on a specific job. As this means absolutely nothing to me (other than that I realise flink uses akka), I have no idea where to start debugging. The job that errors this way, reads from kafka with FlinkKafkaConsumer010 and writes to postgres over a JDBC connection. - bart Stack trace: java.lang.Exception: Cannot deploy task Source: Custom Source -> com.kpn.datalab.inhome.StoreInKV$ mapAddKrnToEvent -> Sink: Unnamed (1/1) (676bd6fcaff976cd72ea3672c45354ce) - TaskManager (2855fef5d0fb313cdb47fbd6350f81aa @ flink-1682409794-3n335 (dataPort=37534)) not responding after a timeout of 10000 ms at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$3(Execution.java:529) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@flink-1682409794-3n335:42536/user/taskmanager#262833087]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage". at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) ... 1 more job code: package com.datalab.inhome import java.util.Properties import java.util.function.BiConsumer import com.datalab.SchemaStore import com.datalab.commons.Utils._ import com.datalab.commons.flink.ByteArraySerializer import com.datalab.commons.flink.ByteArraySerializer.MyByteArray import com.datalab.schemas.inhome.model_result.v1_1_1.ModelResult import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 import org.json.JSONObject import org.slf4j.{Logger, LoggerFactory} class mapAddUserId(schemaBaseDirectory: String) extends RichMapFunction[MyByteArray, (String, MyByteArray)] { val log: Logger = LoggerFactory.getLogger(this.getClass) override def open(parameters: Configuration): Unit = { super.open(parameters) SchemaStore.initialize(schemaBaseDirectory) } override def map(value: MyByteArray): (String, MyByteArray) = { val event = SchemaStore.instance().deserializeEvent(value.bytes) val userId = event.payload(classOf[ModelResult]).getProfile.getKrn.toString (userId, value) } } object StoreInKV { val log: Logger = LoggerFactory.getLogger(this.getClass.getName) def main(args: Array[String]): Unit = { val parameters = ParameterTool.fromArgs(args) val setup = RaeUtils.setup() val sourceTopic = parameters.get("sourceTopic","modelResult") val kvTableName = parameters.get("kvTableName", "inhome_model_scores") val jobDescription = new JSONObject() parameters.toMap.forEach(new BiConsumer[String, String] { override def accept(t: String, u: String): Unit = jobDescription.put(t, u) }) jobDescription.put("jar", this.getClass.getName) log.info("********************** Settings ********************************************") log.info(s"Job ${this.getClass} started at ${msToUtc(System.currentTimeMillis())}") log.info(s"Parameters: ${jobDescription.toString}") log.info(s"Setup: ${setup}") log.info("*****************************************************************************") val kafkaConsumerProps = new Properties() kafkaConsumerProps.setProperty("bootstrap.servers", setup.kafka_bootstrap) kafkaConsumerProps.setProperty("group.id",s"storeInKV-${scala.util.Random.nextInt}") kafkaConsumerProps.setProperty("client.id",s"storeInKV-${scala.util.Random.nextInt}") val source = new FlinkKafkaConsumer010[ByteArraySerializer.MyByteArray](sourceTopic, new ByteArraySerializer(), kafkaConsumerProps) val dbProps = new Properties() dbProps.setProperty("databaseConnect", setup.dbConnectionString) dbProps.setProperty("user", setup.dbUser) dbProps.setProperty("password", setup.dbPasswd) dbProps.setProperty("tableName", kvTableName) dbProps.setProperty("createTable", "false") // TODO: remove table creating from KeyValueStore; should be done by orchestrator val env = StreamExecutionEnvironment.getExecutionEnvironment env.addSource(source) .map(new mapAddUserId(setup.schemaBaseDirectory)) .name(s"${this.getClass.getName} mapAddKrnToEvent") .addSink(new PostgresSinkKV(dbProps)) env.execute(s"RAE_StoreInKV:: ${jobDescription.toString}") } } |
Free forum by Nabble | Edit this page |