Hi Benchao,i have run this in the code:println(env.getConfig.getAutoWatermarkInterval)
and got 200 i do fully understand how watermarks and AsyncOperator operator works, buti have decided to make a simple test that should evaluate the time it takes to enter to the asyncInvoke method and it looks that it takes about 80ms witch is longer than the time it take to get a response from my micro-servicecode belowclass AsyncDatabaseRequest extends RichAsyncFunction[String, (String, String)] {
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
/*
implicit val actorSystem = ActorSystem.apply("test", None, None, Some(executor))
implicit val materializer = ActorMaterializer()
implicit val executionContext = actorSystem.dispatcher
println(materializer.system.name)
println("start")
*/
// redis-streaming-dev-new.xwudy5.ng.0001.use1.cache.amazonaws.com
// redis-streaming-dev-001.xwudy5.0001.use1.cache.amazonaws.com
var actorSystem: ActorSystem = null
var materializer: ActorMaterializer = null
var executionContext: ExecutionContextExecutor = null
//var akkaHttp: HttpExt = null
override def open(parameters: Configuration): Unit = {
actorSystem = akka.actor.ActorSystem(UUID.randomUUID().toString, Some(ConfigFactory.load("application.conf")), None, Some(executor))
materializer = ActorMaterializer()(actorSystem)
executionContext = actorSystem.dispatcher
//akkaHttp = Http(actorSystem)
}
override def close(): Unit = {
actorSystem.terminate()
}
override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {
val start = str.toLong
val delta = System.currentTimeMillis() - start
resultFuture.complete(Iterable((str, s"${delta}")))
}
}
object Job {
def main(args: Array[String]): Unit = {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//env.enableCheckpointing(10)
env.setParallelism(1)
val someIntegers: DataStream[Long] = env.generateSequence(1, 100)
//someIntegers.map { _ => System.currentTimeMillis()}.map{ s => System.currentTimeMillis()-s}.print()
val x : DataStream[String] = someIntegers.map( _ => s"${System.currentTimeMillis()}")
val resultStream: DataStream[(String, String)] = AsyncDataStream.unorderedWait(x, new AsyncDatabaseRequest(), 10L, TimeUnit.MILLISECONDS, 100)//.setParallelism(16)
//AsyncDataStream.unorderedWait(data , new AsyncDatabaseRequest,3L,TimeUnit.SECONDS)
resultStream.print()
println(env.getConfig.getAutoWatermarkInterval)
env.execute("Flink Scala API Skeleton")
}
}is this normal behavior?On Mon, Jul 6, 2020 at 2:45 PM Benchao Li <[hidden email]> wrote:Hi Mark,According to your data, I think the config of AsyncOperator is OK.There is one more config that might affect the throughput of AsyncOperator, it's watermark.Because unordered async operator still keeps the order between watermarks, did you useevent time in your job, and if yes, what's the watermark interval in your job?Mark Zitnik <[hidden email]> 于2020年7月5日周日 下午7:44写道:Hi BenchaoThe capacity is 100Parallelism is 8Rpc req is 20msThanksOn Sun, 5 Jul 2020, 6:16 Benchao Li, <[hidden email]> wrote:Hi Mark,Could you give more details about your Flink job?- the capacity of AsyncDataStream- the parallelism of AsyncDataStream operator- the time of per blocked rpc requestMark Zitnik <[hidden email]> 于2020年7月5日周日 上午3:48写道:HiIn my flink application I need to enrich data using AsyncDataStream.unorderedWait but I am getting poor perforce at the beginning I was just working with http call, but I have switched to grpc, I running on 8 core node and getting total of 3200 events per second my service that I am using is not fully utilized and can produce up to 10000 req/seqFlink job flowReading from Kafka ~> some enrichment with unoderedwait ~> map ~> write to KafkaUsing Akkad grpc code written in scalaThanks--
Best,
Benchao Li--
Best,
Benchao Li
Arvid Heise | Senior Java Developer
Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbHFree forum by Nabble | Edit this page |