Hi guys,
I’m try to decide which http client to go with Flink, currently I tested with scalaj and akka http client and both work ok with our current dev environment. For scalaj its is pretty straight forward since its is just calling an http request with its timeout. For akka http client its a bit more complicated (I’m new to scala and all), so I’m asking if am I doing it right by create a AsyncFunction like this ``` class AsyncHttpClient( args: Array[String] = Array()) extends AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{ ``` I notice that I have to implicit create a bunch of variable inside the asyncInvoke method. I’m not sure if I’m doing it right, or just adding the overhead. I did try to init them in the constructor of the class but the compiler just throw a bunch of Not implemented Serializer error. My lib: "com.typesafe.akka" %% "akka-http" % "10.1.8", "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test, My flink: scala 2.12 flink 1.70 Any reply are appreciated! Thanks a bunch Andy, |
Hi Andy, without being an expert of Akka's http client, I think you should not create a new ActorSystem for every call to `AsyncFunction#asyncInvoke`. What I would recommend you instead is to implement a `RichAsyncFunction` with a transient field for `ActorMaterializer` which you initialize in the `RichAsyncFunction#open` method. That way you only create the `ActorMaterialier` on the `TaskManager` where the operator is executed and solve the problem of serializability and you make it much more efficient because you don't create a new `ActorSystem` for every request. Cheers, Till On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <[hidden email]> wrote:
|
Hi Till, Thanks for your reply, I manage do some experiments and has result as some worked and some not. I hope you can give me a bit more insight: As your suggestion to impl a `RichAsyncFunction` with transient field, like this and having error ``` Class 'RichAsyncHttpClient' must either be declared abstract or implement abstract member 'executionContext: ExecutionContextExecutor' in ‘com.parcelperform.util.RichAsyncHttpClient’ ``` ``` class RichAsyncHttpClient() extends RichAsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{ ``` Aslo the Usage of that class, I has error, I guess its because of java/scala issue. In flink doc, for java code they use RichAsyncFunction and for scala they use AsyncFunction: ``` // AsyncDataStream.unorderedWait(streamShipment, new RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ###### error Type mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual: RichAsyncHttpClient ``` ### So I try to fix my current code again with transient field and move it into constructor: ```
``` And its run ok. The log was print only one. I still asking about this because I haven’t understand the term `That way you only create the `ActorMaterialier` on the `TaskManager` where the operator is executed and solve the problem of serializability `. I though for all the code executed inside TaskManger? Thanks for being patient with me, till here Andy,
|
Hi Andy, there is also a Scala version of the `RichAsyncFunction`. In Scala you have to specify a value for class members. This is different from Java. User code is first instantiated on the client where you create the job topology (basically where you call new RichAsyncHttpClient). The code is then serialized and shipped to the cluster where it is actually executed. Cheers, Till On Fri, Apr 12, 2019 at 5:44 AM Andy Hoang <[hidden email]> wrote:
|
Hi Till, Unfortunately I have to wait for the cluster to upgrade to 1.8 to use that feature: https://issues.apache.org/jira/browse/FLINK-6756 Meanwhile I can reimplement it in the copy-patse manner but I’m still curious if my AsyncHttpClient work nicely or not, what would be the down side when you look at it. I understand the open/close method is will help in term of init/cleaning resource, but how can we benchmark the solution to make sure one is better than the other? What is the key to decide here or we have to try it in production first? Thank a lot, again Andy,
|
Hi Andy, you can do some micro benchmarks where you instantiate your AsyncHttpClient and call the invoke method. But better would be to benchmark it end-to-end by running it on a cluster with a realistic workload which you also expect to occur in production. Cheers, Till On Fri, Apr 12, 2019 at 11:29 AM Andy Hoang <[hidden email]> wrote:
|
Hi Till,
Sorry to bother you again, so I manage to build and work with akka http client in my local After deploy to yarn node, the actorsystem cant be connected. ``` PPLogger.getActivityLogger.info("########### 1") implicit val system = ActorSystem("my-system") PPLogger.getActivityLogger.info("########### 2") ``` So the line ### 2 was never print, and the method ended up timeout. Honestly I dont know how to debug with this case. I’m just curious how people ended up using any async http client without hassle? Thanks, Andy,
|
Check the logs what Akka is logging and verify that the port you try to bind to is free. Cheers, Till On Wed, Apr 17, 2019 at 12:50 PM Andy Hoang <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |