Http Requests from Flink

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Http Requests from Flink

Ulf Thomas
Hello,

I've been trying to perform HTTP requests from a Flink Program but I wasn't successful :-(.

Does anybody here has done this before and can point me to an working library?

I've attached a small demo project in case someone wants to try to solve this.

Best,

--
--
Ulf Thomas
Software Developer
relayr

flink-http-request.zip (14K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Http Requests from Flink

alex.decastro

Hi Ulf,

I’ve had similar problem, before but from a sink perspective: I had to create a HTTP sink for a Kafka REST API. I’ve used scalaj-http https://github.com/scalaj/scalaj-http which is a wrapper for the corresponding Java lib.

 

For example,

https://github.com/scalaj/scalaj-http

 

For example

 

class HttpSink extends SinkFunction[Message]{
  private val secretkey = new GetToken().token

  def sendMessage(message: Message):String = Http("http://XXX.XXX.XX.XXX:5000/api/message") // <-- GLOBAL var
    .header("Content-Type","application/json")
    .header("Authorization", s"Bearer $secretkey")
    .postData(message.data).asString.body

  @throws[Exception]
  override def invoke(message: Message): Unit = {
    log.info(sendMessage(message))
  }
}

 

I image for a http source, you could send a request to the REST API periodically and convert the micro-batches into a stream. I’d love to know about other alternatives.

 

Cheers,

Alex

From: Ulf Thomas <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, March 2, 2017 at 12:58 PM
To: "[hidden email]" <[hidden email]>
Subject: Http Requests from Flink

 

Hello,

 

I've been trying to perform HTTP requests from a Flink Program but I wasn't successful :-(.

 

Does anybody here has done this before and can point me to an working library?

 

I've attached a small demo project in case someone wants to try to solve this.

 

Best,

 

--

--
Ulf Thomas
Software Developer
relayr

This email and any attachments may contain information which is confidential and/or privileged. The information is intended exclusively for the addressee and the views expressed may not be official policy, but the personal views of the originator. If you are not the intended recipient, be aware that any disclosure, copying, distribution or use of the contents is prohibited. If you have received this email and any file transmitted with it in error, please notify the sender by telephone or return email immediately and delete the material from your computer. Internet communications are not secure and Lab49 is not responsible for their abuse by third parties, nor for any alteration or corruption in transmission, nor for any damage or loss caused by any virus or other defect. Lab49 accepts no liability or responsibility arising out of or in any way connected to this email.
Reply | Threaded
Open this post in threaded view
|

Re: Http Requests from Flink

Yassine MARZOUGUI
Hi Ulf,

I've done HTTP requests in Flink using OkHttp library. I found it practical and easy to use. Here is how I used it to make a POST request for each incoming element in the stream and ouput the response:

DataStream<String> stream = ....

stream.map(new RichMapFunction<String, String>() {

    OkHttpClient client;

    @Override
    public void open(Configuration config) throws IOException {
        client = new OkHttpClient();
    }

    @Override
    public String map(String in) throws Exception {

        okhttp3.Request request = new okhttp3.Request.Builder()
                    .url("http://localhost:8080")
                    .post(RequestBody.create(MediaType.parse("text/plain; charset=utf-8"), in))
                    .build();
            Response response = client.newCall(request).execute();
            if (response.code() != 200) {
                throw new Exception("Failed request");
            }
            String result = response.body().string();
            return result;
    }
})

I hope this helps.

Best,
Yassine


2017-03-02 14:17 GMT+01:00 Alex De Castro <[hidden email]>:

Hi Ulf,

I’ve had similar problem, before but from a sink perspective: I had to create a HTTP sink for a Kafka REST API. I’ve used scalaj-http https://github.com/scalaj/scalaj-http which is a wrapper for the corresponding Java lib.

 

For example,

https://github.com/scalaj/scalaj-http

 

For example

 

class HttpSink extends SinkFunction[Message]{
  private val secretkey = new GetToken().token

  def sendMessage(message: Message):String = Http("http://XXX.XXX.XX.XXX:5000/api/message") // <-- GLOBAL var
    .header("Content-Type","application/json")
    .header("Authorization", s"Bearer $secretkey")
    .postData(message.data).asString.body

  @throws[Exception]
  override def invoke(message: Message): Unit = {
    log.info(sendMessage(message))
  }
}

 

I image for a http source, you could send a request to the REST API periodically and convert the micro-batches into a stream. I’d love to know about other alternatives.

 

Cheers,

Alex

From: Ulf Thomas <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, March 2, 2017 at 12:58 PM
To: "[hidden email]" <[hidden email]>
Subject: Http Requests from Flink

 

Hello,

 

I've been trying to perform HTTP requests from a Flink Program but I wasn't successful :-(.

 

Does anybody here has done this before and can point me to an working library?

 

I've attached a small demo project in case someone wants to try to solve this.

 

Best,

 

--

--
Ulf Thomas
Software Developer
relayr

This email and any attachments may contain information which is confidential and/or privileged. The information is intended exclusively for the addressee and the views expressed may not be official policy, but the personal views of the originator. If you are not the intended recipient, be aware that any disclosure, copying, distribution or use of the contents is prohibited. If you have received this email and any file transmitted with it in error, please notify the sender by telephone or return email immediately and delete the material from your computer. Internet communications are not secure and Lab49 is not responsible for their abuse by third parties, nor for any alteration or corruption in transmission, nor for any damage or loss caused by any virus or other defect. Lab49 accepts no liability or responsibility arising out of or in any way connected to this email.