is current elasticsearch-flink connector support elasticsearch 2.x version?
-Madhu |
Hi Madhu,
Not yet. The API has changed slightly. We'll add one very soon. In the meantime I've created an issue to keep track of the status: https://issues.apache.org/jira/browse/FLINK-3115 Thanks, Max On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota <[hidden email]> wrote: > is current elasticsearch-flink connector support elasticsearch 2.x version? > > -Madhu |
i have created working connector for Elasticsearch 2.0 based on elasticsearch-flink connector. I am using it right now but i want official connector from flink. ElasticsearchSink.java
In my Main Class: Map<String, String> config = Maps.newHashMap(); -Madhu On Fri, Dec 4, 2015 at 9:18 AM, Maximilian Michels <[hidden email]> wrote: Hi Madhu, |
Hi Madhu,
Great. Do you want to contribute it back via a GitHub pull request? If not that's also fine. We will try look into the 2.0 connector next week. Best, Max On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota <[hidden email]> wrote: > i have created working connector for Elasticsearch 2.0 based on > elasticsearch-flink connector. I am using it right now but i want official > connector from flink. > > ElasticsearchSink.java > > > import org.apache.flink.api.java.utils.ParameterTool; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > import java.net.InetAddress; > import java.net.UnknownHostException; > import java.util.List; > import java.util.Map; > import java.util.concurrent.atomic.AtomicBoolean; > import java.util.concurrent.atomic.AtomicReference; > > import org.elasticsearch.action.bulk.BulkItemResponse; > import org.elasticsearch.action.bulk.BulkProcessor; > import org.elasticsearch.action.bulk.BulkRequest; > import org.elasticsearch.action.bulk.BulkResponse; > import org.elasticsearch.action.index.IndexRequest; > import org.elasticsearch.client.Client; > import org.elasticsearch.client.transport.TransportClient; > import org.elasticsearch.cluster.node.DiscoveryNode; > import org.elasticsearch.common.settings.Settings; > import org.elasticsearch.common.transport.InetSocketTransportAddress; > import org.elasticsearch.common.unit.ByteSizeUnit; > import org.elasticsearch.common.unit.ByteSizeValue; > import org.elasticsearch.common.unit.TimeValue; > > > public class ElasticsearchSink<T> extends RichSinkFunction<T> { > > public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = > "bulk.flush.max.actions"; > public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = > "bulk.flush.max.size.mb"; > public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = > "bulk.flush.interval.ms"; > > private static final long serialVersionUID = 1L; > private static final int DEFAULT_PORT = 9300; > private static final Logger LOG = > LoggerFactory.getLogger(ElasticsearchSink.class); > > /** > * The user specified config map that we forward to Elasticsearch when > we create the Client. > */ > private final Map<String, String> userConfig; > > /** > * The builder that is used to construct an {@link IndexRequest} from > the incoming element. > */ > private final IndexRequestBuilder<T> indexRequestBuilder; > > /** > * The Client that was either retrieved from a Node or is a > TransportClient. > */ > private transient Client client; > > /** > * Bulk processor that was created using the client > */ > private transient BulkProcessor bulkProcessor; > > /** > * This is set from inside the BulkProcessor listener if there where > failures in processing. > */ > private final AtomicBoolean hasFailure = new AtomicBoolean(false); > > /** > * This is set from inside the BulkProcessor listener if a Throwable was > thrown during processing. > */ > private final AtomicReference<Throwable> failureThrowable = new > AtomicReference<Throwable>(); > > public ElasticsearchSink(Map<String, String> userConfig, > IndexRequestBuilder<T> indexRequestBuilder) { > this.userConfig = userConfig; > this.indexRequestBuilder = indexRequestBuilder; > } > > > @Override > public void open(Configuration configuration) { > > ParameterTool params = ParameterTool.fromMap(userConfig); > Settings settings = Settings.settingsBuilder() > .put(userConfig) > .build(); > > TransportClient transportClient = > TransportClient.builder().settings(settings).build(); > for (String server : params.get("esHost").split(";")) > { > String[] components = server.trim().split(":"); > String host = components[0]; > int port = DEFAULT_PORT; > if (components.length > 1) > { > port = Integer.parseInt(components[1]); > } > > try { > transportClient = transportClient.addTransportAddress(new > InetSocketTransportAddress(InetAddress.getByName(host), port)); > } catch (UnknownHostException e) { > e.printStackTrace(); > } > } > > List<DiscoveryNode> nodes = transportClient.connectedNodes(); > if (nodes.isEmpty()) { > throw new RuntimeException("Client is not connected to any > Elasticsearch nodes!"); > } else { > if (LOG.isDebugEnabled()) { > LOG.info("Connected to nodes: " + nodes.toString()); > } > } > client = transportClient; > > BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder( > client, > new BulkProcessor.Listener() { > public void beforeBulk(long executionId, > BulkRequest request) { > > } > > public void afterBulk(long executionId, > BulkRequest request, > BulkResponse response) { > if (response.hasFailures()) { > for (BulkItemResponse itemResp : > response.getItems()) { > if (itemResp.isFailed()) { > LOG.error("Failed to index document in > Elasticsearch: " + itemResp.getFailureMessage()); > failureThrowable.compareAndSet(null, new > RuntimeException(itemResp.getFailureMessage())); > } > } > hasFailure.set(true); > } > } > > public void afterBulk(long executionId, > BulkRequest request, > Throwable failure) { > LOG.error(failure.getMessage()); > failureThrowable.compareAndSet(null, failure); > hasFailure.set(true); > } > }); > > // This makes flush() blocking > bulkProcessorBuilder.setConcurrentRequests(0); > > > > if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { > > bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)); > } > > if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) { > bulkProcessorBuilder.setBulkSize(new > ByteSizeValue(params.getInt( > CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB)); > } > > if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) { > > bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS))); > } > > bulkProcessor = bulkProcessorBuilder.build(); > } > > > @Override > public void invoke(T element) { > IndexRequest indexRequest = > indexRequestBuilder.createIndexRequest(element, getRuntimeContext()); > > if (LOG.isDebugEnabled()) { > LOG.debug("Emitting IndexRequest: {}", indexRequest); > } > > bulkProcessor.add(indexRequest); > } > > @Override > public void close() { > if (bulkProcessor != null) { > bulkProcessor.close(); > bulkProcessor = null; > } > > if (client != null) { > client.close(); > } > > if (hasFailure.get()) { > Throwable cause = failureThrowable.get(); > if (cause != null) { > throw new RuntimeException("An error occured in > ElasticsearchSink.", cause); > } else { > throw new RuntimeException("An error occured in > ElasticsearchSink."); > > } > } > } > > } > > > In my Main Class: > > > Map<String, String> config = Maps.newHashMap(); > > //Elasticsearch Parameters > > config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, > parameter.get("elasticsearch.bulk.flush.max.actions","1")); > config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, > parameter.get("elasticsearch.bulk.flush.interval.ms","2")); > config.put("cluster.name", parameter.get("elasticsearch.cluster.name")); > config.put("esHost", parameter.get("elasticsearch.server", > "localhost:9300")); > > > DataStreamSink<String> elastic = messageStream.rebalance().addSink(new > ElasticsearchSink<>(config, (IndexRequestBuilder<String>) (element, > runtimeContext) -> { > String[] line = element.toLowerCase().split(" > +(?=(?:([^\"]*\"){2})*[^\"]*$)"); > String measureAndTags = line[0]; > String[] kvSplit = line[1].split("="); > String fieldName = kvSplit[0]; > String fieldValue = kvSplit[1]; > Map<String, String> tags = new HashMap<>(); > String measure = parseMeasureAndTags(measureAndTags, tags); > long time = (long) (Double.valueOf(line[2]) / 1000000); > > Map<String, Object> test = new HashMap<>(); > DateFormat dateFormat = new > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ"); > dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); > > test.put(fieldName, setValue(fieldValue)); > test.put("tags", tags); > test.put("measurement", measure); > test.put("@timestamp", dateFormat.format(new Date(time))); > > return Requests.indexRequest() > .index("metrics") > .type("test") > .source(new Gson().toJson(test).toLowerCase()); > > > })); > > > -Madhu > > > On Fri, Dec 4, 2015 at 9:18 AM, Maximilian Michels <[hidden email]> wrote: >> >> Hi Madhu, >> >> Not yet. The API has changed slightly. We'll add one very soon. In the >> meantime I've created an issue to keep track of the status: >> >> https://issues.apache.org/jira/browse/FLINK-3115 >> >> Thanks, >> Max >> >> On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota >> <[hidden email]> wrote: >> > is current elasticsearch-flink connector support elasticsearch 2.x >> > version? >> > >> > -Madhu > > |
Sure. I can submit the pull request. On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels <[hidden email]> wrote: Hi Madhu, |
shouldn't be better to have both connectors for ES?one for 1.x and another for 2.x? On 4 Dec 2015 20:55, "Madhukar Thota" <[hidden email]> wrote:
|
In reply to this post by Madhukar Thota
Hi Madhu Would you be able to provide the use case here in ElasticSearch with Flink? Thanks Deepak On Sat, Dec 5, 2015 at 1:25 AM, Madhukar Thota <[hidden email]> wrote:
-- |
shouldn't be better to have both connectors for ES?one for 1.x and another for 2.x? IMHO that's the way to go. Thanks Madhukar! Cheers, Max On Sat, Dec 5, 2015 at 6:49 AM, Deepak Sharma <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |