ElasticsearchSink Serialization Error

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

ElasticsearchSink Serialization Error

vijikarthi
Hello,

I am seeing below error when I try to use ElasticsearchSink. It complains about serialization and looks like it is leading to "IndexRequestBuilder" implementation. I have tried the suggestion as mentioned in http://stackoverflow.com/questions/33246864/elasticsearch-sink-seralizability (changed from anonymous class to concrete class) but it did not help. However, when I call "ElasticsearchSink<>(config, transports, null)" by passing "null" for "IndexRequestBuilder" then I don't see the serialization error. This suggests the problem could be with the IndexRequestBuilder implementation but I am not able to move further.

Could someone please let me know what's the right way to use ElasticsearchSink() API? 

Build Details
Flink 1.2.0
Elastic Search 5.3.0


Error Message


org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields.
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1539)
        at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:161)
        at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1076)

Code Snippet

```
private ElasticsearchSink<Result>  sinkToElasticSearch(AppConfiguration appConfiguration) throws Exception {

String host = appConfiguration.getPipeline().getElasticSearch().getHost();
int port = appConfiguration.getPipeline().getElasticSearch().getPort();
String cluster = appConfiguration.getPipeline().getElasticSearch().getCluster();

Map<String, String> config = new HashMap<>();
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", cluster);

List<TransportAddress> transports = new ArrayList<>();
transports.add(new InetSocketTransportAddress(host, port));

return new ElasticsearchSink<>(config, transports, new ResultIndexRequestBuilder(appConfiguration));
}

public class ResultIndexRequestBuilder implements IndexRequestBuilder<Result>, Serializable {

private String index;
private String type;
//private transient Gson gson = new Gson();

public ResultIndexRequestBuilder() {}

public ResultIndexRequestBuilder(AppConfiguration appConfiguration) {
index = appConfiguration.getPipeline().getElasticSearch().getIndex();
type = appConfiguration.getPipeline().getElasticSearch().getType();
}

@Override
public IndexRequest createIndexRequest(Result result, RuntimeContext ctx) {
Gson gson = new Gson();
String resultAsJson = gson.toJson(result);
System.out.println(resultAsJson);
Map<String, String> jsonMap = new HashMap<>();
jsonMap.put("data", resultAsJson);

return Requests.indexRequest()
.index(index)
.type(type)
.source(jsonMap);
}
```

Regards
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: ElasticsearchSink Serialization Error

Aljoscha Krettek
Hi,
ResultIndexRequestBuilder is a non-static inner class. This means it has a pointer to the enclosing instance. If you make it a static inner class your code should work.

Best,
Aljoscha
On 28. Apr 2017, at 04:57, Vijay Srinivasaraghavan <[hidden email]> wrote:

Hello,

I am seeing below error when I try to use ElasticsearchSink. It complains about serialization and looks like it is leading to "IndexRequestBuilder" implementation. I have tried the suggestion as mentioned in http://stackoverflow.com/questions/33246864/elasticsearch-sink-seralizability (changed from anonymous class to concrete class) but it did not help. However, when I call "ElasticsearchSink<>(config, transports, null)" by passing "null" for "IndexRequestBuilder" then I don't see the serialization error. This suggests the problem could be with the IndexRequestBuilder implementation but I am not able to move further.

Could someone please let me know what's the right way to use ElasticsearchSink() API? 

Build Details
Flink 1.2.0
Elastic Search 5.3.0


Error Message


org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields.
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1539)
        at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:161)
        at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1076)

Code Snippet

```
private ElasticsearchSink<Result>  sinkToElasticSearch(AppConfiguration appConfiguration) throws Exception {

String host = appConfiguration.getPipeline().getElasticSearch().getHost();
int port = appConfiguration.getPipeline().getElasticSearch().getPort();
String cluster = appConfiguration.getPipeline().getElasticSearch().getCluster();

Map<String, String> config = new HashMap<>();
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", cluster);

List<TransportAddress> transports = new ArrayList<>();
transports.add(new InetSocketTransportAddress(host, port));

return new ElasticsearchSink<>(config, transports, new ResultIndexRequestBuilder(appConfiguration));
}

public class ResultIndexRequestBuilder implements IndexRequestBuilder<Result>, Serializable {

private String index;
private String type;
//private transient Gson gson = new Gson();

public ResultIndexRequestBuilder() {}

public ResultIndexRequestBuilder(AppConfiguration appConfiguration) {
index = appConfiguration.getPipeline().getElasticSearch().getIndex();
type = appConfiguration.getPipeline().getElasticSearch().getType();
}

@Override
public IndexRequest createIndexRequest(Result result, RuntimeContext ctx) {
Gson gson = new Gson();
String resultAsJson = gson.toJson(result);
System.out.println(resultAsJson);
Map<String, String> jsonMap = new HashMap<>();
jsonMap.put("data", resultAsJson);

return Requests.indexRequest()
.index(index)
.type(type)
.source(jsonMap);
}
```

Regards
Vijay