Scalaj vs akka as http client for Asyncio Flink

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Scalaj vs akka as http client for Asyncio Flink

andy
Hi guys,

I’m try to decide which http client to go with Flink, currently I tested with scalaj and akka http client and both work ok with our current dev environment.
For scalaj its is pretty straight forward since its is just calling an http request with its timeout.

For akka http client its a bit more complicated (I’m new to scala and all), so I’m asking if am I doing it right by create a AsyncFunction like this
```
class AsyncHttpClient( args: Array[String] = Array()) extends AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

override def asyncInvoke(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {

PPLogger.getActivityLogger.info("###########INIT ------------------- ")
implicit val system = ActorSystem("my-system")
implicit val executionContext = system.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer()
val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))
PPLogger.getActivityLogger.info("###########DONE ------------------- ")


resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Right(res.entity)))
}
case Failure(x) => {
resultFuture.complete(Iterable(Left(x)))
}
}

}

override def timeout(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
resultFuture.complete(Iterable(Left(new TimeoutException("Async function call has timed out."))))
}
}
```
I notice that I have to implicit create a bunch of variable inside the asyncInvoke method. I’m not sure if I’m doing it right, or just adding the overhead. I did try to init them in the constructor of the class but the compiler just throw a bunch of Not implemented Serializer error.

My lib:
   "com.typesafe.akka" %% "akka-http" % "10.1.8",
  "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,

My flink:
scala 2.12
flink 1.70



Any reply are appreciated!

Thanks a bunch

Andy,



Reply | Threaded
Open this post in threaded view
|

Re: Scalaj vs akka as http client for Asyncio Flink

Till Rohrmann
Hi Andy,

without being an expert of Akka's http client, I think you should not create a new ActorSystem for every call to `AsyncFunction#asyncInvoke`. What I would recommend you instead is to implement a `RichAsyncFunction` with a transient field for `ActorMaterializer` which you initialize in the `RichAsyncFunction#open` method. That way you only create the `ActorMaterialier` on the `TaskManager` where the operator is executed and solve the problem of serializability and you make it much more efficient because you don't create a new `ActorSystem` for every request.

Cheers,
Till

On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <[hidden email]> wrote:
Hi guys,

I’m try to decide which http client to go with Flink, currently I tested with scalaj and akka http client and both work ok with our current dev environment.
For scalaj its is pretty straight forward since its is just calling an http request with its timeout.

For akka http client its a bit more complicated (I’m new to scala and all), so I’m asking if am I doing it right by create a AsyncFunction like this
```
class AsyncHttpClient( args: Array[String] = Array()) extends AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

override def asyncInvoke(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {

PPLogger.getActivityLogger.info("###########INIT ------------------- ")
implicit val system = ActorSystem("my-system")
implicit val executionContext = system.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer()
val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))
PPLogger.getActivityLogger.info("###########DONE ------------------- ")


resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Right(res.entity)))
}
case Failure(x) => {
resultFuture.complete(Iterable(Left(x)))
}
}

}

override def timeout(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
resultFuture.complete(Iterable(Left(new TimeoutException("Async function call has timed out."))))
}
}
```
I notice that I have to implicit create a bunch of variable inside the asyncInvoke method. I’m not sure if I’m doing it right, or just adding the overhead. I did try to init them in the constructor of the class but the compiler just throw a bunch of Not implemented Serializer error.

My lib:
   "com.typesafe.akka" %% "akka-http" % "10.1.8",
  "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,

My flink:
scala 2.12
flink 1.70



Any reply are appreciated!

Thanks a bunch

Andy,



Reply | Threaded
Open this post in threaded view
|

Re: Scalaj vs akka as http client for Asyncio Flink

andy
Hi Till, 
Thanks for your reply, I manage do some experiments and has result as some worked and some not. I hope you can give me a bit more insight:

As your suggestion to impl a `RichAsyncFunction` with transient field, like this and having error

```
Class 'RichAsyncHttpClient' must either be declared abstract or implement abstract member 'executionContext: ExecutionContextExecutor' in ‘com.parcelperform.util.RichAsyncHttpClient’
```

```
class RichAsyncHttpClient() extends RichAsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

PPLogger.getActivityLogger.info("###########INIT ------------------- ")
@transient implicit var materializer: ActorMaterializer
@transient implicit var system: ActorSystem
@transient implicit var executionContext: ExecutionContextExecutor


override def asyncInvoke(input: Shipment, resultFuture: async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {

val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))

resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection)
}
case Failure(x) => {
resultFuture.complete(Iterable(Either(x)).asJavaCollection)
}
}
}

override def open(parameters: Configuration): Unit = {
super.open(parameters)
system = ActorSystem("my-system")
executionContext = system.dispatcher
materializer = ActorMaterializer()
}
}
```

Aslo the Usage of that class, I has error, I guess its because of java/scala issue. In flink doc, for java code they use RichAsyncFunction and for scala they use AsyncFunction:
```
//    AsyncDataStream.unorderedWait(streamShipment, new RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ###### error Type mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual: RichAsyncHttpClient

```

### 

So I try to fix my current code again with transient field and move it into constructor:
```


class AsyncHttpClient( args: Array[String] = Array()) extends AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

@transient implicit lazy val system = {
PPLogger.getActivityLogger.info("###########INIT ------------------- ")
ActorSystem("my-system")
}
@transient implicit lazy val executionContext = {
system.dispatcher
}
@transient implicit lazy val materializer: ActorMaterializer = {
PPLogger.getActivityLogger.info("###########DONE ------------------- ")
ActorMaterializer()
}

override def asyncInvoke(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
// PPLogger.getActivityLogger.info("###########INIT ------------------- ")
// implicit val system = ActorSystem("my-system")
// implicit val executionContext = system.dispatcher
// implicit val materializer: ActorMaterializer = ActorMaterializer()
// PPLogger.getActivityLogger.info("###########DONE ------------------- ")

val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))
resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Right(res.entity)))
}
case Failure(x) => {
resultFuture.complete(Iterable(Left(x)))
}
}

}

override def timeout(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
resultFuture.complete(Iterable(Left(new TimeoutException("Async function call has timed out."))))
}
}
```
And its run ok. The log was print only one. 

I still asking about this because I haven’t understand the term `That way you only create the `ActorMaterialier` on the `TaskManager` where the operator is executed and solve the problem of serializability `. I though for all the code executed inside TaskManger?

Thanks for being patient with me, till here

Andy,




On Apr 11, 2019, at 7:12 PM, Till Rohrmann <[hidden email]> wrote:

Hi Andy,

without being an expert of Akka's http client, I think you should not create a new ActorSystem for every call to `AsyncFunction#asyncInvoke`. What I would recommend you instead is to implement a `RichAsyncFunction` with a transient field for `ActorMaterializer` which you initialize in the `RichAsyncFunction#open` method. That way you only create the `ActorMaterialier` on the `TaskManager` where the operator is executed and solve the problem of serializability and you make it much more efficient because you don't create a new `ActorSystem` for every request.

Cheers,
Till

On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <[hidden email]> wrote:
Hi guys,

I’m try to decide which http client to go with Flink, currently I tested with scalaj and akka http client and both work ok with our current dev environment.
For scalaj its is pretty straight forward since its is just calling an http request with its timeout.

For akka http client its a bit more complicated (I’m new to scala and all), so I’m asking if am I doing it right by create a AsyncFunction like this
```
class AsyncHttpClient( args: Array[String] = Array()) extends AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

override def asyncInvoke(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {

PPLogger.getActivityLogger.info("###########INIT ------------------- ")
implicit val system = ActorSystem("my-system")
implicit val executionContext = system.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer()
val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))
PPLogger.getActivityLogger.info("###########DONE ------------------- ")


resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Right(res.entity)))
}
case Failure(x) => {
resultFuture.complete(Iterable(Left(x)))
}
}

}

override def timeout(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
resultFuture.complete(Iterable(Left(new TimeoutException("Async function call has timed out."))))
}
}
```
I notice that I have to implicit create a bunch of variable inside the asyncInvoke method. I’m not sure if I’m doing it right, or just adding the overhead. I did try to init them in the constructor of the class but the compiler just throw a bunch of Not implemented Serializer error.

My lib:
   "com.typesafe.akka" %% "akka-http" % "10.1.8",
  "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,

My flink:
scala 2.12
flink 1.70



Any reply are appreciated!

Thanks a bunch

Andy,




Reply | Threaded
Open this post in threaded view
|

Re: Scalaj vs akka as http client for Asyncio Flink

Till Rohrmann
Hi Andy,

there is also a Scala version of the `RichAsyncFunction`.

In Scala you have to specify a value for class members. This is different from Java.

User code is first instantiated on the client where you create the job topology (basically where you call new RichAsyncHttpClient). The code is then serialized and shipped to the cluster where it is actually executed.

Cheers,
Till

On Fri, Apr 12, 2019 at 5:44 AM Andy Hoang <[hidden email]> wrote:
Hi Till, 
Thanks for your reply, I manage do some experiments and has result as some worked and some not. I hope you can give me a bit more insight:

As your suggestion to impl a `RichAsyncFunction` with transient field, like this and having error

```
Class 'RichAsyncHttpClient' must either be declared abstract or implement abstract member 'executionContext: ExecutionContextExecutor' in ‘com.parcelperform.util.RichAsyncHttpClient’
```

```
class RichAsyncHttpClient() extends RichAsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

PPLogger.getActivityLogger.info("###########INIT ------------------- ")
@transient implicit var materializer: ActorMaterializer
@transient implicit var system: ActorSystem
@transient implicit var executionContext: ExecutionContextExecutor


override def asyncInvoke(input: Shipment, resultFuture: async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {

val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))

resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection)
}
case Failure(x) => {
resultFuture.complete(Iterable(Either(x)).asJavaCollection)
}
}
}

override def open(parameters: Configuration): Unit = {
super.open(parameters)
system = ActorSystem("my-system")
executionContext = system.dispatcher
materializer = ActorMaterializer()
}
}
```

Aslo the Usage of that class, I has error, I guess its because of java/scala issue. In flink doc, for java code they use RichAsyncFunction and for scala they use AsyncFunction:
```
//    AsyncDataStream.unorderedWait(streamShipment, new RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ###### error Type mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual: RichAsyncHttpClient

```

### 

So I try to fix my current code again with transient field and move it into constructor:
```


class AsyncHttpClient( args: Array[String] = Array()) extends AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

@transient implicit lazy val system = {
PPLogger.getActivityLogger.info("###########INIT ------------------- ")
ActorSystem("my-system")
}
@transient implicit lazy val executionContext = {
system.dispatcher
}
@transient implicit lazy val materializer: ActorMaterializer = {
PPLogger.getActivityLogger.info("###########DONE ------------------- ")
ActorMaterializer()
}

override def asyncInvoke(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
// PPLogger.getActivityLogger.info("###########INIT ------------------- ")
// implicit val system = ActorSystem("my-system")
// implicit val executionContext = system.dispatcher
// implicit val materializer: ActorMaterializer = ActorMaterializer()
// PPLogger.getActivityLogger.info("###########DONE ------------------- ")

val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))
resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Right(res.entity)))
}
case Failure(x) => {
resultFuture.complete(Iterable(Left(x)))
}
}

}

override def timeout(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
resultFuture.complete(Iterable(Left(new TimeoutException("Async function call has timed out."))))
}
}
```
And its run ok. The log was print only one. 

I still asking about this because I haven’t understand the term `That way you only create the `ActorMaterialier` on the `TaskManager` where the operator is executed and solve the problem of serializability `. I though for all the code executed inside TaskManger?

Thanks for being patient with me, till here

Andy,




On Apr 11, 2019, at 7:12 PM, Till Rohrmann <[hidden email]> wrote:

Hi Andy,

without being an expert of Akka's http client, I think you should not create a new ActorSystem for every call to `AsyncFunction#asyncInvoke`. What I would recommend you instead is to implement a `RichAsyncFunction` with a transient field for `ActorMaterializer` which you initialize in the `RichAsyncFunction#open` method. That way you only create the `ActorMaterialier` on the `TaskManager` where the operator is executed and solve the problem of serializability and you make it much more efficient because you don't create a new `ActorSystem` for every request.

Cheers,
Till

On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <[hidden email]> wrote:
Hi guys,

I’m try to decide which http client to go with Flink, currently I tested with scalaj and akka http client and both work ok with our current dev environment.
For scalaj its is pretty straight forward since its is just calling an http request with its timeout.

For akka http client its a bit more complicated (I’m new to scala and all), so I’m asking if am I doing it right by create a AsyncFunction like this
```
class AsyncHttpClient( args: Array[String] = Array()) extends AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

override def asyncInvoke(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {

PPLogger.getActivityLogger.info("###########INIT ------------------- ")
implicit val system = ActorSystem("my-system")
implicit val executionContext = system.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer()
val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))
PPLogger.getActivityLogger.info("###########DONE ------------------- ")


resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Right(res.entity)))
}
case Failure(x) => {
resultFuture.complete(Iterable(Left(x)))
}
}

}

override def timeout(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
resultFuture.complete(Iterable(Left(new TimeoutException("Async function call has timed out."))))
}
}
```
I notice that I have to implicit create a bunch of variable inside the asyncInvoke method. I’m not sure if I’m doing it right, or just adding the overhead. I did try to init them in the constructor of the class but the compiler just throw a bunch of Not implemented Serializer error.

My lib:
   "com.typesafe.akka" %% "akka-http" % "10.1.8",
  "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,

My flink:
scala 2.12
flink 1.70



Any reply are appreciated!

Thanks a bunch

Andy,




Reply | Threaded
Open this post in threaded view
|

Re: Scalaj vs akka as http client for Asyncio Flink

andy
Hi Till,
Unfortunately I have to wait for the cluster to upgrade to 1.8 to use that feature: https://issues.apache.org/jira/browse/FLINK-6756
Meanwhile I can reimplement it in the copy-patse manner but I’m still curious if my AsyncHttpClient
 work nicely or not, what would be the down side when you look at it.
I understand the open/close method is will help in term of init/cleaning resource, but how can we benchmark the solution to make sure one is better than the other? What is the key to decide here or we have to try it in production first?
Thank a lot, again

Andy,


On Apr 12, 2019, at 2:44 PM, Till Rohrmann <[hidden email]> wrote:

Hi Andy,

there is also a Scala version of the `RichAsyncFunction`.

In Scala you have to specify a value for class members. This is different from Java.

User code is first instantiated on the client where you create the job topology (basically where you call new RichAsyncHttpClient). The code is then serialized and shipped to the cluster where it is actually executed.

Cheers,
Till

On Fri, Apr 12, 2019 at 5:44 AM Andy Hoang <[hidden email]> wrote:
Hi Till, 
Thanks for your reply, I manage do some experiments and has result as some worked and some not. I hope you can give me a bit more insight:

As your suggestion to impl a `RichAsyncFunction` with transient field, like this and having error

```
Class 'RichAsyncHttpClient' must either be declared abstract or implement abstract member 'executionContext: ExecutionContextExecutor' in ‘com.parcelperform.util.RichAsyncHttpClient’
```

```
class RichAsyncHttpClient() extends RichAsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

PPLogger.getActivityLogger.info("###########INIT ------------------- ")
@transient implicit var materializer: ActorMaterializer
@transient implicit var system: ActorSystem
@transient implicit var executionContext: ExecutionContextExecutor


override def asyncInvoke(input: Shipment, resultFuture: async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {

val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))

resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection)
}
case Failure(x) => {
resultFuture.complete(Iterable(Either(x)).asJavaCollection)
}
}
}

override def open(parameters: Configuration): Unit = {
super.open(parameters)
system = ActorSystem("my-system")
executionContext = system.dispatcher
materializer = ActorMaterializer()
}
}
```

Aslo the Usage of that class, I has error, I guess its because of java/scala issue. In flink doc, for java code they use RichAsyncFunction and for scala they use AsyncFunction:
```
//    AsyncDataStream.unorderedWait(streamShipment, new RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ###### error Type mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual: RichAsyncHttpClient

```

### 

So I try to fix my current code again with transient field and move it into constructor:
```


class AsyncHttpClient( args: Array[String] = Array()) extends AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

@transient implicit lazy val system = {
PPLogger.getActivityLogger.info("###########INIT ------------------- ")
ActorSystem("my-system")
}
@transient implicit lazy val executionContext = {
system.dispatcher
}
@transient implicit lazy val materializer: ActorMaterializer = {
PPLogger.getActivityLogger.info("###########DONE ------------------- ")
ActorMaterializer()
}

override def asyncInvoke(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
// PPLogger.getActivityLogger.info("###########INIT ------------------- ")
// implicit val system = ActorSystem("my-system")
// implicit val executionContext = system.dispatcher
// implicit val materializer: ActorMaterializer = ActorMaterializer()
// PPLogger.getActivityLogger.info("###########DONE ------------------- ")

val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))
resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Right(res.entity)))
}
case Failure(x) => {
resultFuture.complete(Iterable(Left(x)))
}
}

}

override def timeout(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
resultFuture.complete(Iterable(Left(new TimeoutException("Async function call has timed out."))))
}
}
```
And its run ok. The log was print only one. 

I still asking about this because I haven’t understand the term `That way you only create the `ActorMaterialier` on the `TaskManager` where the operator is executed and solve the problem of serializability `. I though for all the code executed inside TaskManger?

Thanks for being patient with me, till here

Andy,




On Apr 11, 2019, at 7:12 PM, Till Rohrmann <[hidden email]> wrote:

Hi Andy,

without being an expert of Akka's http client, I think you should not create a new ActorSystem for every call to `AsyncFunction#asyncInvoke`. What I would recommend you instead is to implement a `RichAsyncFunction` with a transient field for `ActorMaterializer` which you initialize in the `RichAsyncFunction#open` method. That way you only create the `ActorMaterialier` on the `TaskManager` where the operator is executed and solve the problem of serializability and you make it much more efficient because you don't create a new `ActorSystem` for every request.

Cheers,
Till

On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <[hidden email]> wrote:
Hi guys,

I’m try to decide which http client to go with Flink, currently I tested with scalaj and akka http client and both work ok with our current dev environment.
For scalaj its is pretty straight forward since its is just calling an http request with its timeout.

For akka http client its a bit more complicated (I’m new to scala and all), so I’m asking if am I doing it right by create a AsyncFunction like this
```
class AsyncHttpClient( args: Array[String] = Array()) extends AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

override def asyncInvoke(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {

PPLogger.getActivityLogger.info("###########INIT ------------------- ")
implicit val system = ActorSystem("my-system")
implicit val executionContext = system.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer()
val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))
PPLogger.getActivityLogger.info("###########DONE ------------------- ")


resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Right(res.entity)))
}
case Failure(x) => {
resultFuture.complete(Iterable(Left(x)))
}
}

}

override def timeout(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
resultFuture.complete(Iterable(Left(new TimeoutException("Async function call has timed out."))))
}
}
```
I notice that I have to implicit create a bunch of variable inside the asyncInvoke method. I’m not sure if I’m doing it right, or just adding the overhead. I did try to init them in the constructor of the class but the compiler just throw a bunch of Not implemented Serializer error.

My lib:
   "com.typesafe.akka" %% "akka-http" % "10.1.8",
  "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,

My flink:
scala 2.12
flink 1.70



Any reply are appreciated!

Thanks a bunch

Andy,





Reply | Threaded
Open this post in threaded view
|

Re: Scalaj vs akka as http client for Asyncio Flink

Till Rohrmann
Hi Andy,

you can do some micro benchmarks where you instantiate your AsyncHttpClient and call the invoke method. But better would be to benchmark it end-to-end by running it on a cluster with a realistic workload which you also expect to occur in production.

Cheers,
Till

On Fri, Apr 12, 2019 at 11:29 AM Andy Hoang <[hidden email]> wrote:
Hi Till,
Unfortunately I have to wait for the cluster to upgrade to 1.8 to use that feature: https://issues.apache.org/jira/browse/FLINK-6756
Meanwhile I can reimplement it in the copy-patse manner but I’m still curious if my AsyncHttpClient
 work nicely or not, what would be the down side when you look at it.
I understand the open/close method is will help in term of init/cleaning resource, but how can we benchmark the solution to make sure one is better than the other? What is the key to decide here or we have to try it in production first?
Thank a lot, again

Andy,


On Apr 12, 2019, at 2:44 PM, Till Rohrmann <[hidden email]> wrote:

Hi Andy,

there is also a Scala version of the `RichAsyncFunction`.

In Scala you have to specify a value for class members. This is different from Java.

User code is first instantiated on the client where you create the job topology (basically where you call new RichAsyncHttpClient). The code is then serialized and shipped to the cluster where it is actually executed.

Cheers,
Till

On Fri, Apr 12, 2019 at 5:44 AM Andy Hoang <[hidden email]> wrote:
Hi Till, 
Thanks for your reply, I manage do some experiments and has result as some worked and some not. I hope you can give me a bit more insight:

As your suggestion to impl a `RichAsyncFunction` with transient field, like this and having error

```
Class 'RichAsyncHttpClient' must either be declared abstract or implement abstract member 'executionContext: ExecutionContextExecutor' in ‘com.parcelperform.util.RichAsyncHttpClient’
```

```
class RichAsyncHttpClient() extends RichAsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

PPLogger.getActivityLogger.info("###########INIT ------------------- ")
@transient implicit var materializer: ActorMaterializer
@transient implicit var system: ActorSystem
@transient implicit var executionContext: ExecutionContextExecutor


override def asyncInvoke(input: Shipment, resultFuture: async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {

val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))

resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection)
}
case Failure(x) => {
resultFuture.complete(Iterable(Either(x)).asJavaCollection)
}
}
}

override def open(parameters: Configuration): Unit = {
super.open(parameters)
system = ActorSystem("my-system")
executionContext = system.dispatcher
materializer = ActorMaterializer()
}
}
```

Aslo the Usage of that class, I has error, I guess its because of java/scala issue. In flink doc, for java code they use RichAsyncFunction and for scala they use AsyncFunction:
```
//    AsyncDataStream.unorderedWait(streamShipment, new RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ###### error Type mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual: RichAsyncHttpClient

```

### 

So I try to fix my current code again with transient field and move it into constructor:
```


class AsyncHttpClient( args: Array[String] = Array()) extends AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

@transient implicit lazy val system = {
PPLogger.getActivityLogger.info("###########INIT ------------------- ")
ActorSystem("my-system")
}
@transient implicit lazy val executionContext = {
system.dispatcher
}
@transient implicit lazy val materializer: ActorMaterializer = {
PPLogger.getActivityLogger.info("###########DONE ------------------- ")
ActorMaterializer()
}

override def asyncInvoke(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
// PPLogger.getActivityLogger.info("###########INIT ------------------- ")
// implicit val system = ActorSystem("my-system")
// implicit val executionContext = system.dispatcher
// implicit val materializer: ActorMaterializer = ActorMaterializer()
// PPLogger.getActivityLogger.info("###########DONE ------------------- ")

val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))
resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Right(res.entity)))
}
case Failure(x) => {
resultFuture.complete(Iterable(Left(x)))
}
}

}

override def timeout(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
resultFuture.complete(Iterable(Left(new TimeoutException("Async function call has timed out."))))
}
}
```
And its run ok. The log was print only one. 

I still asking about this because I haven’t understand the term `That way you only create the `ActorMaterialier` on the `TaskManager` where the operator is executed and solve the problem of serializability `. I though for all the code executed inside TaskManger?

Thanks for being patient with me, till here

Andy,




On Apr 11, 2019, at 7:12 PM, Till Rohrmann <[hidden email]> wrote:

Hi Andy,

without being an expert of Akka's http client, I think you should not create a new ActorSystem for every call to `AsyncFunction#asyncInvoke`. What I would recommend you instead is to implement a `RichAsyncFunction` with a transient field for `ActorMaterializer` which you initialize in the `RichAsyncFunction#open` method. That way you only create the `ActorMaterialier` on the `TaskManager` where the operator is executed and solve the problem of serializability and you make it much more efficient because you don't create a new `ActorSystem` for every request.

Cheers,
Till

On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <[hidden email]> wrote:
Hi guys,

I’m try to decide which http client to go with Flink, currently I tested with scalaj and akka http client and both work ok with our current dev environment.
For scalaj its is pretty straight forward since its is just calling an http request with its timeout.

For akka http client its a bit more complicated (I’m new to scala and all), so I’m asking if am I doing it right by create a AsyncFunction like this
```
class AsyncHttpClient( args: Array[String] = Array()) extends AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

override def asyncInvoke(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {

PPLogger.getActivityLogger.info("###########INIT ------------------- ")
implicit val system = ActorSystem("my-system")
implicit val executionContext = system.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer()
val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))
PPLogger.getActivityLogger.info("###########DONE ------------------- ")


resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Right(res.entity)))
}
case Failure(x) => {
resultFuture.complete(Iterable(Left(x)))
}
}

}

override def timeout(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
resultFuture.complete(Iterable(Left(new TimeoutException("Async function call has timed out."))))
}
}
```
I notice that I have to implicit create a bunch of variable inside the asyncInvoke method. I’m not sure if I’m doing it right, or just adding the overhead. I did try to init them in the constructor of the class but the compiler just throw a bunch of Not implemented Serializer error.

My lib:
   "com.typesafe.akka" %% "akka-http" % "10.1.8",
  "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,

My flink:
scala 2.12
flink 1.70



Any reply are appreciated!

Thanks a bunch

Andy,





Reply | Threaded
Open this post in threaded view
|

Re: Scalaj vs akka as http client for Asyncio Flink

andy
Hi Till,

Sorry to bother you again, so I manage to build and work with akka http client in my local
After deploy to yarn node, the actorsystem cant be connected.
 ```
implicit  val system = ActorSystem("my-system")
```
So the line ### 2 was never print, and the method ended up timeout. Honestly I dont know how to debug with this case.

I’m just curious how people ended up using any async http client without hassle?

Thanks,

Andy,

On Apr 12, 2019, at 10:23 PM, Till Rohrmann <[hidden email]> wrote:

Hi Andy,

you can do some micro benchmarks where you instantiate your AsyncHttpClient and call the invoke method. But better would be to benchmark it end-to-end by running it on a cluster with a realistic workload which you also expect to occur in production.

Cheers,
Till

On Fri, Apr 12, 2019 at 11:29 AM Andy Hoang <[hidden email]> wrote:
Hi Till,
Unfortunately I have to wait for the cluster to upgrade to 1.8 to use that feature: https://issues.apache.org/jira/browse/FLINK-6756
Meanwhile I can reimplement it in the copy-patse manner but I’m still curious if my AsyncHttpClient
 work nicely or not, what would be the down side when you look at it.
I understand the open/close method is will help in term of init/cleaning resource, but how can we benchmark the solution to make sure one is better than the other? What is the key to decide here or we have to try it in production first?
Thank a lot, again

Andy,


On Apr 12, 2019, at 2:44 PM, Till Rohrmann <[hidden email]> wrote:

Hi Andy,

there is also a Scala version of the `RichAsyncFunction`.

In Scala you have to specify a value for class members. This is different from Java.

User code is first instantiated on the client where you create the job topology (basically where you call new RichAsyncHttpClient). The code is then serialized and shipped to the cluster where it is actually executed.

Cheers,
Till

On Fri, Apr 12, 2019 at 5:44 AM Andy Hoang <[hidden email]> wrote:
Hi Till, 
Thanks for your reply, I manage do some experiments and has result as some worked and some not. I hope you can give me a bit more insight:

As your suggestion to impl a `RichAsyncFunction` with transient field, like this and having error

```
Class 'RichAsyncHttpClient' must either be declared abstract or implement abstract member 'executionContext: ExecutionContextExecutor' in ‘com.parcelperform.util.RichAsyncHttpClient’
```

```
class RichAsyncHttpClient() extends RichAsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

PPLogger.getActivityLogger.info("###########INIT ------------------- ")
@transient implicit var materializer: ActorMaterializer
@transient implicit var system: ActorSystem
@transient implicit var executionContext: ExecutionContextExecutor


override def asyncInvoke(input: Shipment, resultFuture: async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {

val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))

resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection)
}
case Failure(x) => {
resultFuture.complete(Iterable(Either(x)).asJavaCollection)
}
}
}

override def open(parameters: Configuration): Unit = {
super.open(parameters)
system = ActorSystem("my-system")
executionContext = system.dispatcher
materializer = ActorMaterializer()
}
}
```

Aslo the Usage of that class, I has error, I guess its because of java/scala issue. In flink doc, for java code they use RichAsyncFunction and for scala they use AsyncFunction:
```
//    AsyncDataStream.unorderedWait(streamShipment, new RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ###### error Type mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual: RichAsyncHttpClient

```

### 

So I try to fix my current code again with transient field and move it into constructor:
```


class AsyncHttpClient( args: Array[String] = Array()) extends AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

@transient implicit lazy val system = {
PPLogger.getActivityLogger.info("###########INIT ------------------- ")
ActorSystem("my-system")
}
@transient implicit lazy val executionContext = {
system.dispatcher
}
@transient implicit lazy val materializer: ActorMaterializer = {
PPLogger.getActivityLogger.info("###########DONE ------------------- ")
ActorMaterializer()
}

override def asyncInvoke(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
// PPLogger.getActivityLogger.info("###########INIT ------------------- ")
// implicit val system = ActorSystem("my-system")
// implicit val executionContext = system.dispatcher
// implicit val materializer: ActorMaterializer = ActorMaterializer()
// PPLogger.getActivityLogger.info("###########DONE ------------------- ")

val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))
resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Right(res.entity)))
}
case Failure(x) => {
resultFuture.complete(Iterable(Left(x)))
}
}

}

override def timeout(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
resultFuture.complete(Iterable(Left(new TimeoutException("Async function call has timed out."))))
}
}
```
And its run ok. The log was print only one. 

I still asking about this because I haven’t understand the term `That way you only create the `ActorMaterialier` on the `TaskManager` where the operator is executed and solve the problem of serializability `. I though for all the code executed inside TaskManger?

Thanks for being patient with me, till here

Andy,




On Apr 11, 2019, at 7:12 PM, Till Rohrmann <[hidden email]> wrote:

Hi Andy,

without being an expert of Akka's http client, I think you should not create a new ActorSystem for every call to `AsyncFunction#asyncInvoke`. What I would recommend you instead is to implement a `RichAsyncFunction` with a transient field for `ActorMaterializer` which you initialize in the `RichAsyncFunction#open` method. That way you only create the `ActorMaterialier` on the `TaskManager` where the operator is executed and solve the problem of serializability and you make it much more efficient because you don't create a new `ActorSystem` for every request.

Cheers,
Till

On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <[hidden email]> wrote:
Hi guys,

I’m try to decide which http client to go with Flink, currently I tested with scalaj and akka http client and both work ok with our current dev environment.
For scalaj its is pretty straight forward since its is just calling an http request with its timeout.

For akka http client its a bit more complicated (I’m new to scala and all), so I’m asking if am I doing it right by create a AsyncFunction like this
```
class AsyncHttpClient( args: Array[String] = Array()) extends AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

override def asyncInvoke(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {

PPLogger.getActivityLogger.info("###########INIT ------------------- ")
implicit val system = ActorSystem("my-system")
implicit val executionContext = system.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer()
val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))
PPLogger.getActivityLogger.info("###########DONE ------------------- ")


resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Right(res.entity)))
}
case Failure(x) => {
resultFuture.complete(Iterable(Left(x)))
}
}

}

override def timeout(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
resultFuture.complete(Iterable(Left(new TimeoutException("Async function call has timed out."))))
}
}
```
I notice that I have to implicit create a bunch of variable inside the asyncInvoke method. I’m not sure if I’m doing it right, or just adding the overhead. I did try to init them in the constructor of the class but the compiler just throw a bunch of Not implemented Serializer error.

My lib:
   "com.typesafe.akka" %% "akka-http" % "10.1.8",
  "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,

My flink:
scala 2.12
flink 1.70



Any reply are appreciated!

Thanks a bunch

Andy,






Reply | Threaded
Open this post in threaded view
|

Re: Scalaj vs akka as http client for Asyncio Flink

Till Rohrmann
Check the logs what Akka is logging and verify that the port you try to bind to is free.

Cheers,
Till

On Wed, Apr 17, 2019 at 12:50 PM Andy Hoang <[hidden email]> wrote:
Hi Till,

Sorry to bother you again, so I manage to build and work with akka http client in my local
After deploy to yarn node, the actorsystem cant be connected.
 ```
implicit  val system = ActorSystem("my-system")
```
So the line ### 2 was never print, and the method ended up timeout. Honestly I dont know how to debug with this case.

I’m just curious how people ended up using any async http client without hassle?

Thanks,

Andy,

On Apr 12, 2019, at 10:23 PM, Till Rohrmann <[hidden email]> wrote:

Hi Andy,

you can do some micro benchmarks where you instantiate your AsyncHttpClient and call the invoke method. But better would be to benchmark it end-to-end by running it on a cluster with a realistic workload which you also expect to occur in production.

Cheers,
Till

On Fri, Apr 12, 2019 at 11:29 AM Andy Hoang <[hidden email]> wrote:
Hi Till,
Unfortunately I have to wait for the cluster to upgrade to 1.8 to use that feature: https://issues.apache.org/jira/browse/FLINK-6756
Meanwhile I can reimplement it in the copy-patse manner but I’m still curious if my AsyncHttpClient
 work nicely or not, what would be the down side when you look at it.
I understand the open/close method is will help in term of init/cleaning resource, but how can we benchmark the solution to make sure one is better than the other? What is the key to decide here or we have to try it in production first?
Thank a lot, again

Andy,


On Apr 12, 2019, at 2:44 PM, Till Rohrmann <[hidden email]> wrote:

Hi Andy,

there is also a Scala version of the `RichAsyncFunction`.

In Scala you have to specify a value for class members. This is different from Java.

User code is first instantiated on the client where you create the job topology (basically where you call new RichAsyncHttpClient). The code is then serialized and shipped to the cluster where it is actually executed.

Cheers,
Till

On Fri, Apr 12, 2019 at 5:44 AM Andy Hoang <[hidden email]> wrote:
Hi Till, 
Thanks for your reply, I manage do some experiments and has result as some worked and some not. I hope you can give me a bit more insight:

As your suggestion to impl a `RichAsyncFunction` with transient field, like this and having error

```
Class 'RichAsyncHttpClient' must either be declared abstract or implement abstract member 'executionContext: ExecutionContextExecutor' in ‘com.parcelperform.util.RichAsyncHttpClient’
```

```
class RichAsyncHttpClient() extends RichAsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

PPLogger.getActivityLogger.info("###########INIT ------------------- ")
@transient implicit var materializer: ActorMaterializer
@transient implicit var system: ActorSystem
@transient implicit var executionContext: ExecutionContextExecutor


override def asyncInvoke(input: Shipment, resultFuture: async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {

val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))

resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection)
}
case Failure(x) => {
resultFuture.complete(Iterable(Either(x)).asJavaCollection)
}
}
}

override def open(parameters: Configuration): Unit = {
super.open(parameters)
system = ActorSystem("my-system")
executionContext = system.dispatcher
materializer = ActorMaterializer()
}
}
```

Aslo the Usage of that class, I has error, I guess its because of java/scala issue. In flink doc, for java code they use RichAsyncFunction and for scala they use AsyncFunction:
```
//    AsyncDataStream.unorderedWait(streamShipment, new RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ###### error Type mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual: RichAsyncHttpClient

```

### 

So I try to fix my current code again with transient field and move it into constructor:
```


class AsyncHttpClient( args: Array[String] = Array()) extends AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

@transient implicit lazy val system = {
PPLogger.getActivityLogger.info("###########INIT ------------------- ")
ActorSystem("my-system")
}
@transient implicit lazy val executionContext = {
system.dispatcher
}
@transient implicit lazy val materializer: ActorMaterializer = {
PPLogger.getActivityLogger.info("###########DONE ------------------- ")
ActorMaterializer()
}

override def asyncInvoke(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
// PPLogger.getActivityLogger.info("###########INIT ------------------- ")
// implicit val system = ActorSystem("my-system")
// implicit val executionContext = system.dispatcher
// implicit val materializer: ActorMaterializer = ActorMaterializer()
// PPLogger.getActivityLogger.info("###########DONE ------------------- ")

val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))
resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Right(res.entity)))
}
case Failure(x) => {
resultFuture.complete(Iterable(Left(x)))
}
}

}

override def timeout(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
resultFuture.complete(Iterable(Left(new TimeoutException("Async function call has timed out."))))
}
}
```
And its run ok. The log was print only one. 

I still asking about this because I haven’t understand the term `That way you only create the `ActorMaterialier` on the `TaskManager` where the operator is executed and solve the problem of serializability `. I though for all the code executed inside TaskManger?

Thanks for being patient with me, till here

Andy,




On Apr 11, 2019, at 7:12 PM, Till Rohrmann <[hidden email]> wrote:

Hi Andy,

without being an expert of Akka's http client, I think you should not create a new ActorSystem for every call to `AsyncFunction#asyncInvoke`. What I would recommend you instead is to implement a `RichAsyncFunction` with a transient field for `ActorMaterializer` which you initialize in the `RichAsyncFunction#open` method. That way you only create the `ActorMaterialier` on the `TaskManager` where the operator is executed and solve the problem of serializability and you make it much more efficient because you don't create a new `ActorSystem` for every request.

Cheers,
Till

On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <[hidden email]> wrote:
Hi guys,

I’m try to decide which http client to go with Flink, currently I tested with scalaj and akka http client and both work ok with our current dev environment.
For scalaj its is pretty straight forward since its is just calling an http request with its timeout.

For akka http client its a bit more complicated (I’m new to scala and all), so I’m asking if am I doing it right by create a AsyncFunction like this
```
class AsyncHttpClient( args: Array[String] = Array()) extends AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

override def asyncInvoke(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {

PPLogger.getActivityLogger.info("###########INIT ------------------- ")
implicit val system = ActorSystem("my-system")
implicit val executionContext = system.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer()
val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))
PPLogger.getActivityLogger.info("###########DONE ------------------- ")


resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Right(res.entity)))
}
case Failure(x) => {
resultFuture.complete(Iterable(Left(x)))
}
}

}

override def timeout(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
resultFuture.complete(Iterable(Left(new TimeoutException("Async function call has timed out."))))
}
}
```
I notice that I have to implicit create a bunch of variable inside the asyncInvoke method. I’m not sure if I’m doing it right, or just adding the overhead. I did try to init them in the constructor of the class but the compiler just throw a bunch of Not implemented Serializer error.

My lib:
   "com.typesafe.akka" %% "akka-http" % "10.1.8",
  "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,

My flink:
scala 2.12
flink 1.70



Any reply are appreciated!

Thanks a bunch

Andy,