Flink-Elasticsearch connector support for elasticsearch 2.0

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink-Elasticsearch connector support for elasticsearch 2.0

Madhukar Thota
is current elasticsearch-flink connector support elasticsearch 2.x version?

-Madhu
Reply | Threaded
Open this post in threaded view
|

Re: Flink-Elasticsearch connector support for elasticsearch 2.0

Maximilian Michels
Hi Madhu,

Not yet. The API has changed slightly. We'll add one very soon. In the
meantime I've created an issue to keep track of the status:

https://issues.apache.org/jira/browse/FLINK-3115

Thanks,
Max

On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota
<[hidden email]> wrote:
> is current elasticsearch-flink connector support elasticsearch 2.x version?
>
> -Madhu
Reply | Threaded
Open this post in threaded view
|

Re: Flink-Elasticsearch connector support for elasticsearch 2.0

Madhukar Thota
i have created working connector for Elasticsearch 2.0 based on elasticsearch-flink connector. I am using it right now but i want official connector from flink.

ElasticsearchSink.java


import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;


public class ElasticsearchSink<T> extends RichSinkFunction<T> {

public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";

private static final long serialVersionUID = 1L;
private static final int DEFAULT_PORT = 9300;
private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class);

/**
* The user specified config map that we forward to Elasticsearch when we create the Client.
*/
private final Map<String, String> userConfig;

/**
* The builder that is used to construct an {@link IndexRequest} from the incoming element.
*/
private final IndexRequestBuilder<T> indexRequestBuilder;

/**
* The Client that was either retrieved from a Node or is a TransportClient.
*/
private transient Client client;

/**
* Bulk processor that was created using the client
*/
private transient BulkProcessor bulkProcessor;

/**
* This is set from inside the BulkProcessor listener if there where failures in processing.
*/
private final AtomicBoolean hasFailure = new AtomicBoolean(false);

/**
* This is set from inside the BulkProcessor listener if a Throwable was thrown during processing.
*/
private final AtomicReference<Throwable> failureThrowable = new AtomicReference<Throwable>();

public ElasticsearchSink(Map<String, String> userConfig, IndexRequestBuilder<T> indexRequestBuilder) {
this.userConfig = userConfig;
this.indexRequestBuilder = indexRequestBuilder;
}


@Override
public void open(Configuration configuration) {

ParameterTool params = ParameterTool.fromMap(userConfig);
Settings settings = Settings.settingsBuilder()
.put(userConfig)
.build();

TransportClient transportClient = TransportClient.builder().settings(settings).build();
for (String server : params.get("esHost").split(";"))
{
String[] components = server.trim().split(":");
String host = components[0];
int port = DEFAULT_PORT;
if (components.length > 1)
{
port = Integer.parseInt(components[1]);
}

try {
transportClient = transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
} catch (UnknownHostException e) {
e.printStackTrace();
}
}

List<DiscoveryNode> nodes = transportClient.connectedNodes();
if (nodes.isEmpty()) {
throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
} else {
if (LOG.isDebugEnabled()) {
LOG.info("Connected to nodes: " + nodes.toString());
}
}
client = transportClient;

BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
public void beforeBulk(long executionId,
BulkRequest request) {

}

public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) {
if (response.hasFailures()) {
for (BulkItemResponse itemResp : response.getItems()) {
if (itemResp.isFailed()) {
LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
}
}
hasFailure.set(true);
}
}

public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) {
LOG.error(failure.getMessage());
failureThrowable.compareAndSet(null, failure);
hasFailure.set(true);
}
});

// This makes flush() blocking
bulkProcessorBuilder.setConcurrentRequests(0);



if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
}

if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt(
CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
}

if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
}

bulkProcessor = bulkProcessorBuilder.build();
}


@Override
public void invoke(T element) {
IndexRequest indexRequest = indexRequestBuilder.createIndexRequest(element, getRuntimeContext());

if (LOG.isDebugEnabled()) {
LOG.debug("Emitting IndexRequest: {}", indexRequest);
}

bulkProcessor.add(indexRequest);
}

@Override
public void close() {
if (bulkProcessor != null) {
bulkProcessor.close();
bulkProcessor = null;
}

if (client != null) {
client.close();
}

if (hasFailure.get()) {
Throwable cause = failureThrowable.get();
if (cause != null) {
throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
} else {
throw new RuntimeException("An error occured in ElasticsearchSink.");

}
}
}

}

In my Main Class:

Map<String, String> config = Maps.newHashMap();

//Elasticsearch Parameters

config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, parameter.get("elasticsearch.bulk.flush.max.actions","1"));
config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, parameter.get("elasticsearch.bulk.flush.interval.ms","2"));
config.put("cluster.name", parameter.get("elasticsearch.cluster.name"));
config.put("esHost", parameter.get("elasticsearch.server", "localhost:9300"));

DataStreamSink<String> elastic = messageStream.rebalance().addSink(new ElasticsearchSink<>(config, (IndexRequestBuilder<String>) (element, runtimeContext) -> {
String[] line = element.toLowerCase().split(" +(?=(?:([^\"]*\"){2})*[^\"]*$)");
String measureAndTags = line[0];
String[] kvSplit = line[1].split("=");
String fieldName = kvSplit[0];
String fieldValue = kvSplit[1];
Map<String, String> tags = new HashMap<>();
String measure = parseMeasureAndTags(measureAndTags, tags);
long time = (long) (Double.valueOf(line[2]) / 1000000);

Map<String, Object> test = new HashMap<>();
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));

test.put(fieldName, setValue(fieldValue));
test.put("tags", tags);
test.put("measurement", measure);
test.put("@timestamp", dateFormat.format(new Date(time)));

return Requests.indexRequest()
.index("metrics")
.type("test")
.source(new Gson().toJson(test).toLowerCase());


}));

-Madhu

On Fri, Dec 4, 2015 at 9:18 AM, Maximilian Michels <[hidden email]> wrote:
Hi Madhu,

Not yet. The API has changed slightly. We'll add one very soon. In the
meantime I've created an issue to keep track of the status:

https://issues.apache.org/jira/browse/FLINK-3115

Thanks,
Max

On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota
<[hidden email]> wrote:
> is current elasticsearch-flink connector support elasticsearch 2.x version?
>
> -Madhu

Reply | Threaded
Open this post in threaded view
|

Re: Flink-Elasticsearch connector support for elasticsearch 2.0

Maximilian Michels
Hi Madhu,

Great. Do you want to contribute it back via a GitHub pull request? If
not that's also fine. We will try look into the 2.0 connector next
week.

Best,
Max

On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota <[hidden email]> wrote:

> i have created working connector for Elasticsearch 2.0 based on
> elasticsearch-flink connector. I am using it right now but i want official
> connector from flink.
>
> ElasticsearchSink.java
>
>
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.net.InetAddress;
> import java.net.UnknownHostException;
> import java.util.List;
> import java.util.Map;
> import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.concurrent.atomic.AtomicReference;
>
> import org.elasticsearch.action.bulk.BulkItemResponse;
> import org.elasticsearch.action.bulk.BulkProcessor;
> import org.elasticsearch.action.bulk.BulkRequest;
> import org.elasticsearch.action.bulk.BulkResponse;
> import org.elasticsearch.action.index.IndexRequest;
> import org.elasticsearch.client.Client;
> import org.elasticsearch.client.transport.TransportClient;
> import org.elasticsearch.cluster.node.DiscoveryNode;
> import org.elasticsearch.common.settings.Settings;
> import org.elasticsearch.common.transport.InetSocketTransportAddress;
> import org.elasticsearch.common.unit.ByteSizeUnit;
> import org.elasticsearch.common.unit.ByteSizeValue;
> import org.elasticsearch.common.unit.TimeValue;
>
>
> public class ElasticsearchSink<T> extends RichSinkFunction<T> {
>
>     public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
> "bulk.flush.max.actions";
>     public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
> "bulk.flush.max.size.mb";
>     public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
> "bulk.flush.interval.ms";
>
>     private static final long serialVersionUID = 1L;
>     private static final int DEFAULT_PORT = 9300;
>     private static final Logger LOG =
> LoggerFactory.getLogger(ElasticsearchSink.class);
>
>     /**
>      * The user specified config map that we forward to Elasticsearch when
> we create the Client.
>      */
>     private final Map<String, String> userConfig;
>
>     /**
>      * The builder that is used to construct an {@link IndexRequest} from
> the incoming element.
>      */
>     private final IndexRequestBuilder<T> indexRequestBuilder;
>
>     /**
>      * The Client that was either retrieved from a Node or is a
> TransportClient.
>      */
>     private transient Client client;
>
>     /**
>      * Bulk processor that was created using the client
>      */
>     private transient BulkProcessor bulkProcessor;
>
>     /**
>      * This is set from inside the BulkProcessor listener if there where
> failures in processing.
>      */
>     private final AtomicBoolean hasFailure = new AtomicBoolean(false);
>
>     /**
>      * This is set from inside the BulkProcessor listener if a Throwable was
> thrown during processing.
>      */
>     private final AtomicReference<Throwable> failureThrowable = new
> AtomicReference<Throwable>();
>
>     public ElasticsearchSink(Map<String, String> userConfig,
> IndexRequestBuilder<T> indexRequestBuilder) {
>         this.userConfig = userConfig;
>         this.indexRequestBuilder = indexRequestBuilder;
>     }
>
>
>     @Override
>     public void open(Configuration configuration) {
>
>         ParameterTool params = ParameterTool.fromMap(userConfig);
>         Settings settings = Settings.settingsBuilder()
>                 .put(userConfig)
>                 .build();
>
>         TransportClient transportClient =
> TransportClient.builder().settings(settings).build();
>         for (String server : params.get("esHost").split(";"))
>         {
>             String[] components = server.trim().split(":");
>             String host = components[0];
>             int port = DEFAULT_PORT;
>             if (components.length > 1)
>             {
>                 port = Integer.parseInt(components[1]);
>             }
>
>             try {
>                 transportClient = transportClient.addTransportAddress(new
> InetSocketTransportAddress(InetAddress.getByName(host), port));
>             } catch (UnknownHostException e) {
>                 e.printStackTrace();
>             }
>         }
>
>         List<DiscoveryNode> nodes = transportClient.connectedNodes();
>         if (nodes.isEmpty()) {
>             throw new RuntimeException("Client is not connected to any
> Elasticsearch nodes!");
>         } else {
>             if (LOG.isDebugEnabled()) {
>                 LOG.info("Connected to nodes: " + nodes.toString());
>             }
>         }
>         client = transportClient;
>
>         BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
>                 client,
>                 new BulkProcessor.Listener() {
>                     public void beforeBulk(long executionId,
>                                            BulkRequest request) {
>
>                     }
>
>                     public void afterBulk(long executionId,
>                                           BulkRequest request,
>                                           BulkResponse response) {
>                         if (response.hasFailures()) {
>                             for (BulkItemResponse itemResp :
> response.getItems()) {
>                                 if (itemResp.isFailed()) {
>                                     LOG.error("Failed to index document in
> Elasticsearch: " + itemResp.getFailureMessage());
>                                     failureThrowable.compareAndSet(null, new
> RuntimeException(itemResp.getFailureMessage()));
>                                 }
>                             }
>                             hasFailure.set(true);
>                         }
>                     }
>
>                     public void afterBulk(long executionId,
>                                           BulkRequest request,
>                                           Throwable failure) {
>                         LOG.error(failure.getMessage());
>                         failureThrowable.compareAndSet(null, failure);
>                         hasFailure.set(true);
>                     }
>                 });
>
>         // This makes flush() blocking
>         bulkProcessorBuilder.setConcurrentRequests(0);
>
>
>
>         if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
>
> bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
>         }
>
>         if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
>             bulkProcessorBuilder.setBulkSize(new
> ByteSizeValue(params.getInt(
>                     CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
>         }
>
>         if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
>
> bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
>         }
>
>         bulkProcessor = bulkProcessorBuilder.build();
>     }
>
>
>     @Override
>     public void invoke(T element) {
>         IndexRequest indexRequest =
> indexRequestBuilder.createIndexRequest(element, getRuntimeContext());
>
>         if (LOG.isDebugEnabled()) {
>             LOG.debug("Emitting IndexRequest: {}", indexRequest);
>         }
>
>         bulkProcessor.add(indexRequest);
>     }
>
>     @Override
>     public void close() {
>         if (bulkProcessor != null) {
>             bulkProcessor.close();
>             bulkProcessor = null;
>         }
>
>         if (client != null) {
>             client.close();
>         }
>
>         if (hasFailure.get()) {
>             Throwable cause = failureThrowable.get();
>             if (cause != null) {
>                 throw new RuntimeException("An error occured in
> ElasticsearchSink.", cause);
>             } else {
>                 throw new RuntimeException("An error occured in
> ElasticsearchSink.");
>
>             }
>         }
>     }
>
> }
>
>
> In my Main Class:
>
>
> Map<String, String> config = Maps.newHashMap();
>
> //Elasticsearch Parameters
>
> config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS,
> parameter.get("elasticsearch.bulk.flush.max.actions","1"));
> config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS,
> parameter.get("elasticsearch.bulk.flush.interval.ms","2"));
> config.put("cluster.name", parameter.get("elasticsearch.cluster.name"));
> config.put("esHost", parameter.get("elasticsearch.server",
> "localhost:9300"));
>
>
> DataStreamSink<String> elastic = messageStream.rebalance().addSink(new
> ElasticsearchSink<>(config, (IndexRequestBuilder<String>) (element,
> runtimeContext) -> {
>     String[] line = element.toLowerCase().split("
> +(?=(?:([^\"]*\"){2})*[^\"]*$)");
>     String measureAndTags = line[0];
>     String[] kvSplit = line[1].split("=");
>     String fieldName = kvSplit[0];
>     String fieldValue = kvSplit[1];
>     Map<String, String> tags = new HashMap<>();
>     String measure = parseMeasureAndTags(measureAndTags, tags);
>     long time = (long) (Double.valueOf(line[2]) / 1000000);
>
>     Map<String, Object> test = new HashMap<>();
>     DateFormat dateFormat = new
> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>     dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
>
>     test.put(fieldName, setValue(fieldValue));
>     test.put("tags", tags);
>     test.put("measurement", measure);
>     test.put("@timestamp", dateFormat.format(new Date(time)));
>
>     return Requests.indexRequest()
>             .index("metrics")
>             .type("test")
>             .source(new Gson().toJson(test).toLowerCase());
>
>
> }));
>
>
> -Madhu
>
>
> On Fri, Dec 4, 2015 at 9:18 AM, Maximilian Michels <[hidden email]> wrote:
>>
>> Hi Madhu,
>>
>> Not yet. The API has changed slightly. We'll add one very soon. In the
>> meantime I've created an issue to keep track of the status:
>>
>> https://issues.apache.org/jira/browse/FLINK-3115
>>
>> Thanks,
>> Max
>>
>> On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota
>> <[hidden email]> wrote:
>> > is current elasticsearch-flink connector support elasticsearch 2.x
>> > version?
>> >
>> > -Madhu
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink-Elasticsearch connector support for elasticsearch 2.0

Madhukar Thota
Sure. I can submit the pull request.

On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels <[hidden email]> wrote:
Hi Madhu,

Great. Do you want to contribute it back via a GitHub pull request? If
not that's also fine. We will try look into the 2.0 connector next
week.

Best,
Max

On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota <[hidden email]> wrote:
> i have created working connector for Elasticsearch 2.0 based on
> elasticsearch-flink connector. I am using it right now but i want official
> connector from flink.
>
> ElasticsearchSink.java
>
>
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.net.InetAddress;
> import java.net.UnknownHostException;
> import java.util.List;
> import java.util.Map;
> import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.concurrent.atomic.AtomicReference;
>
> import org.elasticsearch.action.bulk.BulkItemResponse;
> import org.elasticsearch.action.bulk.BulkProcessor;
> import org.elasticsearch.action.bulk.BulkRequest;
> import org.elasticsearch.action.bulk.BulkResponse;
> import org.elasticsearch.action.index.IndexRequest;
> import org.elasticsearch.client.Client;
> import org.elasticsearch.client.transport.TransportClient;
> import org.elasticsearch.cluster.node.DiscoveryNode;
> import org.elasticsearch.common.settings.Settings;
> import org.elasticsearch.common.transport.InetSocketTransportAddress;
> import org.elasticsearch.common.unit.ByteSizeUnit;
> import org.elasticsearch.common.unit.ByteSizeValue;
> import org.elasticsearch.common.unit.TimeValue;
>
>
> public class ElasticsearchSink<T> extends RichSinkFunction<T> {
>
>     public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
> "bulk.flush.max.actions";
>     public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
> "bulk.flush.max.size.mb";
>     public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
> "bulk.flush.interval.ms";
>
>     private static final long serialVersionUID = 1L;
>     private static final int DEFAULT_PORT = 9300;
>     private static final Logger LOG =
> LoggerFactory.getLogger(ElasticsearchSink.class);
>
>     /**
>      * The user specified config map that we forward to Elasticsearch when
> we create the Client.
>      */
>     private final Map<String, String> userConfig;
>
>     /**
>      * The builder that is used to construct an {@link IndexRequest} from
> the incoming element.
>      */
>     private final IndexRequestBuilder<T> indexRequestBuilder;
>
>     /**
>      * The Client that was either retrieved from a Node or is a
> TransportClient.
>      */
>     private transient Client client;
>
>     /**
>      * Bulk processor that was created using the client
>      */
>     private transient BulkProcessor bulkProcessor;
>
>     /**
>      * This is set from inside the BulkProcessor listener if there where
> failures in processing.
>      */
>     private final AtomicBoolean hasFailure = new AtomicBoolean(false);
>
>     /**
>      * This is set from inside the BulkProcessor listener if a Throwable was
> thrown during processing.
>      */
>     private final AtomicReference<Throwable> failureThrowable = new
> AtomicReference<Throwable>();
>
>     public ElasticsearchSink(Map<String, String> userConfig,
> IndexRequestBuilder<T> indexRequestBuilder) {
>         this.userConfig = userConfig;
>         this.indexRequestBuilder = indexRequestBuilder;
>     }
>
>
>     @Override
>     public void open(Configuration configuration) {
>
>         ParameterTool params = ParameterTool.fromMap(userConfig);
>         Settings settings = Settings.settingsBuilder()
>                 .put(userConfig)
>                 .build();
>
>         TransportClient transportClient =
> TransportClient.builder().settings(settings).build();
>         for (String server : params.get("esHost").split(";"))
>         {
>             String[] components = server.trim().split(":");
>             String host = components[0];
>             int port = DEFAULT_PORT;
>             if (components.length > 1)
>             {
>                 port = Integer.parseInt(components[1]);
>             }
>
>             try {
>                 transportClient = transportClient.addTransportAddress(new
> InetSocketTransportAddress(InetAddress.getByName(host), port));
>             } catch (UnknownHostException e) {
>                 e.printStackTrace();
>             }
>         }
>
>         List<DiscoveryNode> nodes = transportClient.connectedNodes();
>         if (nodes.isEmpty()) {
>             throw new RuntimeException("Client is not connected to any
> Elasticsearch nodes!");
>         } else {
>             if (LOG.isDebugEnabled()) {
>                 LOG.info("Connected to nodes: " + nodes.toString());
>             }
>         }
>         client = transportClient;
>
>         BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
>                 client,
>                 new BulkProcessor.Listener() {
>                     public void beforeBulk(long executionId,
>                                            BulkRequest request) {
>
>                     }
>
>                     public void afterBulk(long executionId,
>                                           BulkRequest request,
>                                           BulkResponse response) {
>                         if (response.hasFailures()) {
>                             for (BulkItemResponse itemResp :
> response.getItems()) {
>                                 if (itemResp.isFailed()) {
>                                     LOG.error("Failed to index document in
> Elasticsearch: " + itemResp.getFailureMessage());
>                                     failureThrowable.compareAndSet(null, new
> RuntimeException(itemResp.getFailureMessage()));
>                                 }
>                             }
>                             hasFailure.set(true);
>                         }
>                     }
>
>                     public void afterBulk(long executionId,
>                                           BulkRequest request,
>                                           Throwable failure) {
>                         LOG.error(failure.getMessage());
>                         failureThrowable.compareAndSet(null, failure);
>                         hasFailure.set(true);
>                     }
>                 });
>
>         // This makes flush() blocking
>         bulkProcessorBuilder.setConcurrentRequests(0);
>
>
>
>         if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
>
> bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
>         }
>
>         if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
>             bulkProcessorBuilder.setBulkSize(new
> ByteSizeValue(params.getInt(
>                     CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
>         }
>
>         if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
>
> bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
>         }
>
>         bulkProcessor = bulkProcessorBuilder.build();
>     }
>
>
>     @Override
>     public void invoke(T element) {
>         IndexRequest indexRequest =
> indexRequestBuilder.createIndexRequest(element, getRuntimeContext());
>
>         if (LOG.isDebugEnabled()) {
>             LOG.debug("Emitting IndexRequest: {}", indexRequest);
>         }
>
>         bulkProcessor.add(indexRequest);
>     }
>
>     @Override
>     public void close() {
>         if (bulkProcessor != null) {
>             bulkProcessor.close();
>             bulkProcessor = null;
>         }
>
>         if (client != null) {
>             client.close();
>         }
>
>         if (hasFailure.get()) {
>             Throwable cause = failureThrowable.get();
>             if (cause != null) {
>                 throw new RuntimeException("An error occured in
> ElasticsearchSink.", cause);
>             } else {
>                 throw new RuntimeException("An error occured in
> ElasticsearchSink.");
>
>             }
>         }
>     }
>
> }
>
>
> In my Main Class:
>
>
> Map<String, String> config = Maps.newHashMap();
>
> //Elasticsearch Parameters
>
> config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS,
> parameter.get("elasticsearch.bulk.flush.max.actions","1"));
> config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS,
> parameter.get("elasticsearch.bulk.flush.interval.ms","2"));
> config.put("cluster.name", parameter.get("elasticsearch.cluster.name"));
> config.put("esHost", parameter.get("elasticsearch.server",
> "localhost:9300"));
>
>
> DataStreamSink<String> elastic = messageStream.rebalance().addSink(new
> ElasticsearchSink<>(config, (IndexRequestBuilder<String>) (element,
> runtimeContext) -> {
>     String[] line = element.toLowerCase().split("
> +(?=(?:([^\"]*\"){2})*[^\"]*$)");
>     String measureAndTags = line[0];
>     String[] kvSplit = line[1].split("=");
>     String fieldName = kvSplit[0];
>     String fieldValue = kvSplit[1];
>     Map<String, String> tags = new HashMap<>();
>     String measure = parseMeasureAndTags(measureAndTags, tags);
>     long time = (long) (Double.valueOf(line[2]) / 1000000);
>
>     Map<String, Object> test = new HashMap<>();
>     DateFormat dateFormat = new
> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>     dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
>
>     test.put(fieldName, setValue(fieldValue));
>     test.put("tags", tags);
>     test.put("measurement", measure);
>     test.put("@timestamp", dateFormat.format(new Date(time)));
>
>     return Requests.indexRequest()
>             .index("metrics")
>             .type("test")
>             .source(new Gson().toJson(test).toLowerCase());
>
>
> }));
>
>
> -Madhu
>
>
> On Fri, Dec 4, 2015 at 9:18 AM, Maximilian Michels <[hidden email]> wrote:
>>
>> Hi Madhu,
>>
>> Not yet. The API has changed slightly. We'll add one very soon. In the
>> meantime I've created an issue to keep track of the status:
>>
>> https://issues.apache.org/jira/browse/FLINK-3115
>>
>> Thanks,
>> Max
>>
>> On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota
>> <[hidden email]> wrote:
>> > is current elasticsearch-flink connector support elasticsearch 2.x
>> > version?
>> >
>> > -Madhu
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink-Elasticsearch connector support for elasticsearch 2.0

Flavio Pompermaier

shouldn't be better to have both connectors for ES?one for 1.x and another for 2.x?

On 4 Dec 2015 20:55, "Madhukar Thota" <[hidden email]> wrote:
Sure. I can submit the pull request.

On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels <[hidden email]> wrote:
Hi Madhu,

Great. Do you want to contribute it back via a GitHub pull request? If
not that's also fine. We will try look into the 2.0 connector next
week.

Best,
Max

On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota <[hidden email]> wrote:
> i have created working connector for Elasticsearch 2.0 based on
> elasticsearch-flink connector. I am using it right now but i want official
> connector from flink.
>
> ElasticsearchSink.java
>
>
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.net.InetAddress;
> import java.net.UnknownHostException;
> import java.util.List;
> import java.util.Map;
> import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.concurrent.atomic.AtomicReference;
>
> import org.elasticsearch.action.bulk.BulkItemResponse;
> import org.elasticsearch.action.bulk.BulkProcessor;
> import org.elasticsearch.action.bulk.BulkRequest;
> import org.elasticsearch.action.bulk.BulkResponse;
> import org.elasticsearch.action.index.IndexRequest;
> import org.elasticsearch.client.Client;
> import org.elasticsearch.client.transport.TransportClient;
> import org.elasticsearch.cluster.node.DiscoveryNode;
> import org.elasticsearch.common.settings.Settings;
> import org.elasticsearch.common.transport.InetSocketTransportAddress;
> import org.elasticsearch.common.unit.ByteSizeUnit;
> import org.elasticsearch.common.unit.ByteSizeValue;
> import org.elasticsearch.common.unit.TimeValue;
>
>
> public class ElasticsearchSink<T> extends RichSinkFunction<T> {
>
>     public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
> "bulk.flush.max.actions";
>     public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
> "bulk.flush.max.size.mb";
>     public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
> "bulk.flush.interval.ms";
>
>     private static final long serialVersionUID = 1L;
>     private static final int DEFAULT_PORT = 9300;
>     private static final Logger LOG =
> LoggerFactory.getLogger(ElasticsearchSink.class);
>
>     /**
>      * The user specified config map that we forward to Elasticsearch when
> we create the Client.
>      */
>     private final Map<String, String> userConfig;
>
>     /**
>      * The builder that is used to construct an {@link IndexRequest} from
> the incoming element.
>      */
>     private final IndexRequestBuilder<T> indexRequestBuilder;
>
>     /**
>      * The Client that was either retrieved from a Node or is a
> TransportClient.
>      */
>     private transient Client client;
>
>     /**
>      * Bulk processor that was created using the client
>      */
>     private transient BulkProcessor bulkProcessor;
>
>     /**
>      * This is set from inside the BulkProcessor listener if there where
> failures in processing.
>      */
>     private final AtomicBoolean hasFailure = new AtomicBoolean(false);
>
>     /**
>      * This is set from inside the BulkProcessor listener if a Throwable was
> thrown during processing.
>      */
>     private final AtomicReference<Throwable> failureThrowable = new
> AtomicReference<Throwable>();
>
>     public ElasticsearchSink(Map<String, String> userConfig,
> IndexRequestBuilder<T> indexRequestBuilder) {
>         this.userConfig = userConfig;
>         this.indexRequestBuilder = indexRequestBuilder;
>     }
>
>
>     @Override
>     public void open(Configuration configuration) {
>
>         ParameterTool params = ParameterTool.fromMap(userConfig);
>         Settings settings = Settings.settingsBuilder()
>                 .put(userConfig)
>                 .build();
>
>         TransportClient transportClient =
> TransportClient.builder().settings(settings).build();
>         for (String server : params.get("esHost").split(";"))
>         {
>             String[] components = server.trim().split(":");
>             String host = components[0];
>             int port = DEFAULT_PORT;
>             if (components.length > 1)
>             {
>                 port = Integer.parseInt(components[1]);
>             }
>
>             try {
>                 transportClient = transportClient.addTransportAddress(new
> InetSocketTransportAddress(InetAddress.getByName(host), port));
>             } catch (UnknownHostException e) {
>                 e.printStackTrace();
>             }
>         }
>
>         List<DiscoveryNode> nodes = transportClient.connectedNodes();
>         if (nodes.isEmpty()) {
>             throw new RuntimeException("Client is not connected to any
> Elasticsearch nodes!");
>         } else {
>             if (LOG.isDebugEnabled()) {
>                 LOG.info("Connected to nodes: " + nodes.toString());
>             }
>         }
>         client = transportClient;
>
>         BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
>                 client,
>                 new BulkProcessor.Listener() {
>                     public void beforeBulk(long executionId,
>                                            BulkRequest request) {
>
>                     }
>
>                     public void afterBulk(long executionId,
>                                           BulkRequest request,
>                                           BulkResponse response) {
>                         if (response.hasFailures()) {
>                             for (BulkItemResponse itemResp :
> response.getItems()) {
>                                 if (itemResp.isFailed()) {
>                                     LOG.error("Failed to index document in
> Elasticsearch: " + itemResp.getFailureMessage());
>                                     failureThrowable.compareAndSet(null, new
> RuntimeException(itemResp.getFailureMessage()));
>                                 }
>                             }
>                             hasFailure.set(true);
>                         }
>                     }
>
>                     public void afterBulk(long executionId,
>                                           BulkRequest request,
>                                           Throwable failure) {
>                         LOG.error(failure.getMessage());
>                         failureThrowable.compareAndSet(null, failure);
>                         hasFailure.set(true);
>                     }
>                 });
>
>         // This makes flush() blocking
>         bulkProcessorBuilder.setConcurrentRequests(0);
>
>
>
>         if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
>
> bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
>         }
>
>         if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
>             bulkProcessorBuilder.setBulkSize(new
> ByteSizeValue(params.getInt(
>                     CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
>         }
>
>         if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
>
> bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
>         }
>
>         bulkProcessor = bulkProcessorBuilder.build();
>     }
>
>
>     @Override
>     public void invoke(T element) {
>         IndexRequest indexRequest =
> indexRequestBuilder.createIndexRequest(element, getRuntimeContext());
>
>         if (LOG.isDebugEnabled()) {
>             LOG.debug("Emitting IndexRequest: {}", indexRequest);
>         }
>
>         bulkProcessor.add(indexRequest);
>     }
>
>     @Override
>     public void close() {
>         if (bulkProcessor != null) {
>             bulkProcessor.close();
>             bulkProcessor = null;
>         }
>
>         if (client != null) {
>             client.close();
>         }
>
>         if (hasFailure.get()) {
>             Throwable cause = failureThrowable.get();
>             if (cause != null) {
>                 throw new RuntimeException("An error occured in
> ElasticsearchSink.", cause);
>             } else {
>                 throw new RuntimeException("An error occured in
> ElasticsearchSink.");
>
>             }
>         }
>     }
>
> }
>
>
> In my Main Class:
>
>
> Map<String, String> config = Maps.newHashMap();
>
> //Elasticsearch Parameters
>
> config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS,
> parameter.get("elasticsearch.bulk.flush.max.actions","1"));
> config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS,
> parameter.get("elasticsearch.bulk.flush.interval.ms","2"));
> config.put("cluster.name", parameter.get("elasticsearch.cluster.name"));
> config.put("esHost", parameter.get("elasticsearch.server",
> "localhost:9300"));
>
>
> DataStreamSink<String> elastic = messageStream.rebalance().addSink(new
> ElasticsearchSink<>(config, (IndexRequestBuilder<String>) (element,
> runtimeContext) -> {
>     String[] line = element.toLowerCase().split("
> +(?=(?:([^\"]*\"){2})*[^\"]*$)");
>     String measureAndTags = line[0];
>     String[] kvSplit = line[1].split("=");
>     String fieldName = kvSplit[0];
>     String fieldValue = kvSplit[1];
>     Map<String, String> tags = new HashMap<>();
>     String measure = parseMeasureAndTags(measureAndTags, tags);
>     long time = (long) (Double.valueOf(line[2]) / 1000000);
>
>     Map<String, Object> test = new HashMap<>();
>     DateFormat dateFormat = new
> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>     dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
>
>     test.put(fieldName, setValue(fieldValue));
>     test.put("tags", tags);
>     test.put("measurement", measure);
>     test.put("@timestamp", dateFormat.format(new Date(time)));
>
>     return Requests.indexRequest()
>             .index("metrics")
>             .type("test")
>             .source(new Gson().toJson(test).toLowerCase());
>
>
> }));
>
>
> -Madhu
>
>
> On Fri, Dec 4, 2015 at 9:18 AM, Maximilian Michels <[hidden email]> wrote:
>>
>> Hi Madhu,
>>
>> Not yet. The API has changed slightly. We'll add one very soon. In the
>> meantime I've created an issue to keep track of the status:
>>
>> https://issues.apache.org/jira/browse/FLINK-3115
>>
>> Thanks,
>> Max
>>
>> On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota
>> <[hidden email]> wrote:
>> > is current elasticsearch-flink connector support elasticsearch 2.x
>> > version?
>> >
>> > -Madhu
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink-Elasticsearch connector support for elasticsearch 2.0

Deepak Sharma
In reply to this post by Madhukar Thota
Hi Madhu
Would you be able to provide the use case here in ElasticSearch with Flink?

Thanks
Deepak

On Sat, Dec 5, 2015 at 1:25 AM, Madhukar Thota <[hidden email]> wrote:
Sure. I can submit the pull request.

On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels <[hidden email]> wrote:
Hi Madhu,

Great. Do you want to contribute it back via a GitHub pull request? If
not that's also fine. We will try look into the 2.0 connector next
week.

Best,
Max

On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota <[hidden email]> wrote:
> i have created working connector for Elasticsearch 2.0 based on
> elasticsearch-flink connector. I am using it right now but i want official
> connector from flink.
>
> ElasticsearchSink.java
>
>
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.net.InetAddress;
> import java.net.UnknownHostException;
> import java.util.List;
> import java.util.Map;
> import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.concurrent.atomic.AtomicReference;
>
> import org.elasticsearch.action.bulk.BulkItemResponse;
> import org.elasticsearch.action.bulk.BulkProcessor;
> import org.elasticsearch.action.bulk.BulkRequest;
> import org.elasticsearch.action.bulk.BulkResponse;
> import org.elasticsearch.action.index.IndexRequest;
> import org.elasticsearch.client.Client;
> import org.elasticsearch.client.transport.TransportClient;
> import org.elasticsearch.cluster.node.DiscoveryNode;
> import org.elasticsearch.common.settings.Settings;
> import org.elasticsearch.common.transport.InetSocketTransportAddress;
> import org.elasticsearch.common.unit.ByteSizeUnit;
> import org.elasticsearch.common.unit.ByteSizeValue;
> import org.elasticsearch.common.unit.TimeValue;
>
>
> public class ElasticsearchSink<T> extends RichSinkFunction<T> {
>
>     public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
> "bulk.flush.max.actions";
>     public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
> "bulk.flush.max.size.mb";
>     public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
> "bulk.flush.interval.ms";
>
>     private static final long serialVersionUID = 1L;
>     private static final int DEFAULT_PORT = 9300;
>     private static final Logger LOG =
> LoggerFactory.getLogger(ElasticsearchSink.class);
>
>     /**
>      * The user specified config map that we forward to Elasticsearch when
> we create the Client.
>      */
>     private final Map<String, String> userConfig;
>
>     /**
>      * The builder that is used to construct an {@link IndexRequest} from
> the incoming element.
>      */
>     private final IndexRequestBuilder<T> indexRequestBuilder;
>
>     /**
>      * The Client that was either retrieved from a Node or is a
> TransportClient.
>      */
>     private transient Client client;
>
>     /**
>      * Bulk processor that was created using the client
>      */
>     private transient BulkProcessor bulkProcessor;
>
>     /**
>      * This is set from inside the BulkProcessor listener if there where
> failures in processing.
>      */
>     private final AtomicBoolean hasFailure = new AtomicBoolean(false);
>
>     /**
>      * This is set from inside the BulkProcessor listener if a Throwable was
> thrown during processing.
>      */
>     private final AtomicReference<Throwable> failureThrowable = new
> AtomicReference<Throwable>();
>
>     public ElasticsearchSink(Map<String, String> userConfig,
> IndexRequestBuilder<T> indexRequestBuilder) {
>         this.userConfig = userConfig;
>         this.indexRequestBuilder = indexRequestBuilder;
>     }
>
>
>     @Override
>     public void open(Configuration configuration) {
>
>         ParameterTool params = ParameterTool.fromMap(userConfig);
>         Settings settings = Settings.settingsBuilder()
>                 .put(userConfig)
>                 .build();
>
>         TransportClient transportClient =
> TransportClient.builder().settings(settings).build();
>         for (String server : params.get("esHost").split(";"))
>         {
>             String[] components = server.trim().split(":");
>             String host = components[0];
>             int port = DEFAULT_PORT;
>             if (components.length > 1)
>             {
>                 port = Integer.parseInt(components[1]);
>             }
>
>             try {
>                 transportClient = transportClient.addTransportAddress(new
> InetSocketTransportAddress(InetAddress.getByName(host), port));
>             } catch (UnknownHostException e) {
>                 e.printStackTrace();
>             }
>         }
>
>         List<DiscoveryNode> nodes = transportClient.connectedNodes();
>         if (nodes.isEmpty()) {
>             throw new RuntimeException("Client is not connected to any
> Elasticsearch nodes!");
>         } else {
>             if (LOG.isDebugEnabled()) {
>                 LOG.info("Connected to nodes: " + nodes.toString());
>             }
>         }
>         client = transportClient;
>
>         BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
>                 client,
>                 new BulkProcessor.Listener() {
>                     public void beforeBulk(long executionId,
>                                            BulkRequest request) {
>
>                     }
>
>                     public void afterBulk(long executionId,
>                                           BulkRequest request,
>                                           BulkResponse response) {
>                         if (response.hasFailures()) {
>                             for (BulkItemResponse itemResp :
> response.getItems()) {
>                                 if (itemResp.isFailed()) {
>                                     LOG.error("Failed to index document in
> Elasticsearch: " + itemResp.getFailureMessage());
>                                     failureThrowable.compareAndSet(null, new
> RuntimeException(itemResp.getFailureMessage()));
>                                 }
>                             }
>                             hasFailure.set(true);
>                         }
>                     }
>
>                     public void afterBulk(long executionId,
>                                           BulkRequest request,
>                                           Throwable failure) {
>                         LOG.error(failure.getMessage());
>                         failureThrowable.compareAndSet(null, failure);
>                         hasFailure.set(true);
>                     }
>                 });
>
>         // This makes flush() blocking
>         bulkProcessorBuilder.setConcurrentRequests(0);
>
>
>
>         if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
>
> bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
>         }
>
>         if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
>             bulkProcessorBuilder.setBulkSize(new
> ByteSizeValue(params.getInt(
>                     CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
>         }
>
>         if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
>
> bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
>         }
>
>         bulkProcessor = bulkProcessorBuilder.build();
>     }
>
>
>     @Override
>     public void invoke(T element) {
>         IndexRequest indexRequest =
> indexRequestBuilder.createIndexRequest(element, getRuntimeContext());
>
>         if (LOG.isDebugEnabled()) {
>             LOG.debug("Emitting IndexRequest: {}", indexRequest);
>         }
>
>         bulkProcessor.add(indexRequest);
>     }
>
>     @Override
>     public void close() {
>         if (bulkProcessor != null) {
>             bulkProcessor.close();
>             bulkProcessor = null;
>         }
>
>         if (client != null) {
>             client.close();
>         }
>
>         if (hasFailure.get()) {
>             Throwable cause = failureThrowable.get();
>             if (cause != null) {
>                 throw new RuntimeException("An error occured in
> ElasticsearchSink.", cause);
>             } else {
>                 throw new RuntimeException("An error occured in
> ElasticsearchSink.");
>
>             }
>         }
>     }
>
> }
>
>
> In my Main Class:
>
>
> Map<String, String> config = Maps.newHashMap();
>
> //Elasticsearch Parameters
>
> config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS,
> parameter.get("elasticsearch.bulk.flush.max.actions","1"));
> config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS,
> parameter.get("elasticsearch.bulk.flush.interval.ms","2"));
> config.put("cluster.name", parameter.get("elasticsearch.cluster.name"));
> config.put("esHost", parameter.get("elasticsearch.server",
> "localhost:9300"));
>
>
> DataStreamSink<String> elastic = messageStream.rebalance().addSink(new
> ElasticsearchSink<>(config, (IndexRequestBuilder<String>) (element,
> runtimeContext) -> {
>     String[] line = element.toLowerCase().split("
> +(?=(?:([^\"]*\"){2})*[^\"]*$)");
>     String measureAndTags = line[0];
>     String[] kvSplit = line[1].split("=");
>     String fieldName = kvSplit[0];
>     String fieldValue = kvSplit[1];
>     Map<String, String> tags = new HashMap<>();
>     String measure = parseMeasureAndTags(measureAndTags, tags);
>     long time = (long) (Double.valueOf(line[2]) / 1000000);
>
>     Map<String, Object> test = new HashMap<>();
>     DateFormat dateFormat = new
> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>     dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
>
>     test.put(fieldName, setValue(fieldValue));
>     test.put("tags", tags);
>     test.put("measurement", measure);
>     test.put("@timestamp", dateFormat.format(new Date(time)));
>
>     return Requests.indexRequest()
>             .index("metrics")
>             .type("test")
>             .source(new Gson().toJson(test).toLowerCase());
>
>
> }));
>
>
> -Madhu
>
>
> On Fri, Dec 4, 2015 at 9:18 AM, Maximilian Michels <[hidden email]> wrote:
>>
>> Hi Madhu,
>>
>> Not yet. The API has changed slightly. We'll add one very soon. In the
>> meantime I've created an issue to keep track of the status:
>>
>> https://issues.apache.org/jira/browse/FLINK-3115
>>
>> Thanks,
>> Max
>>
>> On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota
>> <[hidden email]> wrote:
>> > is current elasticsearch-flink connector support elasticsearch 2.x
>> > version?
>> >
>> > -Madhu
>
>




--
Reply | Threaded
Open this post in threaded view
|

Re: Flink-Elasticsearch connector support for elasticsearch 2.0

Maximilian Michels
 
shouldn't be better to have both connectors for ES?one for 1.x and another for 2.x?

IMHO that's the way to go.

Thanks Madhukar!

Cheers,
Max

On Sat, Dec 5, 2015 at 6:49 AM, Deepak Sharma <[hidden email]> wrote:
Hi Madhu
Would you be able to provide the use case here in ElasticSearch with Flink?

Thanks
Deepak

On Sat, Dec 5, 2015 at 1:25 AM, Madhukar Thota <[hidden email]> wrote:
Sure. I can submit the pull request.

On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels <[hidden email]> wrote:
Hi Madhu,

Great. Do you want to contribute it back via a GitHub pull request? If
not that's also fine. We will try look into the 2.0 connector next
week.

Best,
Max

On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota <[hidden email]> wrote:
> i have created working connector for Elasticsearch 2.0 based on
> elasticsearch-flink connector. I am using it right now but i want official
> connector from flink.
>
> ElasticsearchSink.java
>
>
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.net.InetAddress;
> import java.net.UnknownHostException;
> import java.util.List;
> import java.util.Map;
> import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.concurrent.atomic.AtomicReference;
>
> import org.elasticsearch.action.bulk.BulkItemResponse;
> import org.elasticsearch.action.bulk.BulkProcessor;
> import org.elasticsearch.action.bulk.BulkRequest;
> import org.elasticsearch.action.bulk.BulkResponse;
> import org.elasticsearch.action.index.IndexRequest;
> import org.elasticsearch.client.Client;
> import org.elasticsearch.client.transport.TransportClient;
> import org.elasticsearch.cluster.node.DiscoveryNode;
> import org.elasticsearch.common.settings.Settings;
> import org.elasticsearch.common.transport.InetSocketTransportAddress;
> import org.elasticsearch.common.unit.ByteSizeUnit;
> import org.elasticsearch.common.unit.ByteSizeValue;
> import org.elasticsearch.common.unit.TimeValue;
>
>
> public class ElasticsearchSink<T> extends RichSinkFunction<T> {
>
>     public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
> "bulk.flush.max.actions";
>     public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
> "bulk.flush.max.size.mb";
>     public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
> "bulk.flush.interval.ms";
>
>     private static final long serialVersionUID = 1L;
>     private static final int DEFAULT_PORT = 9300;
>     private static final Logger LOG =
> LoggerFactory.getLogger(ElasticsearchSink.class);
>
>     /**
>      * The user specified config map that we forward to Elasticsearch when
> we create the Client.
>      */
>     private final Map<String, String> userConfig;
>
>     /**
>      * The builder that is used to construct an {@link IndexRequest} from
> the incoming element.
>      */
>     private final IndexRequestBuilder<T> indexRequestBuilder;
>
>     /**
>      * The Client that was either retrieved from a Node or is a
> TransportClient.
>      */
>     private transient Client client;
>
>     /**
>      * Bulk processor that was created using the client
>      */
>     private transient BulkProcessor bulkProcessor;
>
>     /**
>      * This is set from inside the BulkProcessor listener if there where
> failures in processing.
>      */
>     private final AtomicBoolean hasFailure = new AtomicBoolean(false);
>
>     /**
>      * This is set from inside the BulkProcessor listener if a Throwable was
> thrown during processing.
>      */
>     private final AtomicReference<Throwable> failureThrowable = new
> AtomicReference<Throwable>();
>
>     public ElasticsearchSink(Map<String, String> userConfig,
> IndexRequestBuilder<T> indexRequestBuilder) {
>         this.userConfig = userConfig;
>         this.indexRequestBuilder = indexRequestBuilder;
>     }
>
>
>     @Override
>     public void open(Configuration configuration) {
>
>         ParameterTool params = ParameterTool.fromMap(userConfig);
>         Settings settings = Settings.settingsBuilder()
>                 .put(userConfig)
>                 .build();
>
>         TransportClient transportClient =
> TransportClient.builder().settings(settings).build();
>         for (String server : params.get("esHost").split(";"))
>         {
>             String[] components = server.trim().split(":");
>             String host = components[0];
>             int port = DEFAULT_PORT;
>             if (components.length > 1)
>             {
>                 port = Integer.parseInt(components[1]);
>             }
>
>             try {
>                 transportClient = transportClient.addTransportAddress(new
> InetSocketTransportAddress(InetAddress.getByName(host), port));
>             } catch (UnknownHostException e) {
>                 e.printStackTrace();
>             }
>         }
>
>         List<DiscoveryNode> nodes = transportClient.connectedNodes();
>         if (nodes.isEmpty()) {
>             throw new RuntimeException("Client is not connected to any
> Elasticsearch nodes!");
>         } else {
>             if (LOG.isDebugEnabled()) {
>                 LOG.info("Connected to nodes: " + nodes.toString());
>             }
>         }
>         client = transportClient;
>
>         BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
>                 client,
>                 new BulkProcessor.Listener() {
>                     public void beforeBulk(long executionId,
>                                            BulkRequest request) {
>
>                     }
>
>                     public void afterBulk(long executionId,
>                                           BulkRequest request,
>                                           BulkResponse response) {
>                         if (response.hasFailures()) {
>                             for (BulkItemResponse itemResp :
> response.getItems()) {
>                                 if (itemResp.isFailed()) {
>                                     LOG.error("Failed to index document in
> Elasticsearch: " + itemResp.getFailureMessage());
>                                     failureThrowable.compareAndSet(null, new
> RuntimeException(itemResp.getFailureMessage()));
>                                 }
>                             }
>                             hasFailure.set(true);
>                         }
>                     }
>
>                     public void afterBulk(long executionId,
>                                           BulkRequest request,
>                                           Throwable failure) {
>                         LOG.error(failure.getMessage());
>                         failureThrowable.compareAndSet(null, failure);
>                         hasFailure.set(true);
>                     }
>                 });
>
>         // This makes flush() blocking
>         bulkProcessorBuilder.setConcurrentRequests(0);
>
>
>
>         if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
>
> bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
>         }
>
>         if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
>             bulkProcessorBuilder.setBulkSize(new
> ByteSizeValue(params.getInt(
>                     CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
>         }
>
>         if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
>
> bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
>         }
>
>         bulkProcessor = bulkProcessorBuilder.build();
>     }
>
>
>     @Override
>     public void invoke(T element) {
>         IndexRequest indexRequest =
> indexRequestBuilder.createIndexRequest(element, getRuntimeContext());
>
>         if (LOG.isDebugEnabled()) {
>             LOG.debug("Emitting IndexRequest: {}", indexRequest);
>         }
>
>         bulkProcessor.add(indexRequest);
>     }
>
>     @Override
>     public void close() {
>         if (bulkProcessor != null) {
>             bulkProcessor.close();
>             bulkProcessor = null;
>         }
>
>         if (client != null) {
>             client.close();
>         }
>
>         if (hasFailure.get()) {
>             Throwable cause = failureThrowable.get();
>             if (cause != null) {
>                 throw new RuntimeException("An error occured in
> ElasticsearchSink.", cause);
>             } else {
>                 throw new RuntimeException("An error occured in
> ElasticsearchSink.");
>
>             }
>         }
>     }
>
> }
>
>
> In my Main Class:
>
>
> Map<String, String> config = Maps.newHashMap();
>
> //Elasticsearch Parameters
>
> config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS,
> parameter.get("elasticsearch.bulk.flush.max.actions","1"));
> config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS,
> parameter.get("elasticsearch.bulk.flush.interval.ms","2"));
> config.put("cluster.name", parameter.get("elasticsearch.cluster.name"));
> config.put("esHost", parameter.get("elasticsearch.server",
> "localhost:9300"));
>
>
> DataStreamSink<String> elastic = messageStream.rebalance().addSink(new
> ElasticsearchSink<>(config, (IndexRequestBuilder<String>) (element,
> runtimeContext) -> {
>     String[] line = element.toLowerCase().split("
> +(?=(?:([^\"]*\"){2})*[^\"]*$)");
>     String measureAndTags = line[0];
>     String[] kvSplit = line[1].split("=");
>     String fieldName = kvSplit[0];
>     String fieldValue = kvSplit[1];
>     Map<String, String> tags = new HashMap<>();
>     String measure = parseMeasureAndTags(measureAndTags, tags);
>     long time = (long) (Double.valueOf(line[2]) / 1000000);
>
>     Map<String, Object> test = new HashMap<>();
>     DateFormat dateFormat = new
> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>     dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
>
>     test.put(fieldName, setValue(fieldValue));
>     test.put("tags", tags);
>     test.put("measurement", measure);
>     test.put("@timestamp", dateFormat.format(new Date(time)));
>
>     return Requests.indexRequest()
>             .index("metrics")
>             .type("test")
>             .source(new Gson().toJson(test).toLowerCase());
>
>
> }));
>
>
> -Madhu
>
>
> On Fri, Dec 4, 2015 at 9:18 AM, Maximilian Michels <[hidden email]> wrote:
>>
>> Hi Madhu,
>>
>> Not yet. The API has changed slightly. We'll add one very soon. In the
>> meantime I've created an issue to keep track of the status:
>>
>> https://issues.apache.org/jira/browse/FLINK-3115
>>
>> Thanks,
>> Max
>>
>> On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota
>> <[hidden email]> wrote:
>> > is current elasticsearch-flink connector support elasticsearch 2.x
>> > version?
>> >
>> > -Madhu
>
>




--