java.lang.IllegalArgumentException: The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

java.lang.IllegalArgumentException: The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.

chandresh pancholi
Flow
Producer -> Kafka(Avro) -> Flink Connector with Avro deseriser -> FLink -> ES
Kafka - Latest version
Flink : 1.4.2
ES: 5.5.2
@Service
public class FlinkStream {

@Autowired
private ClientService clientService;

@Autowired
private AppConfig appConfig;

@PostConstruct
public void init() {
List<Client> clientList = clientService.getAllEnableTenant();
clientList.stream().forEach(client -> {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ConfluentAvroDeserializationSchema schema = new ConfluentAvroDeserializationSchema(appConfig.getKafkaSchemaRegistry());
Properties properties = buildKafkaConsumerProperties(client.getTopic());

FlinkKafkaConsumer011<String> flinkKafkaConsumer = new FlinkKafkaConsumer011<String>(client.getTopic(), schema, properties);

DataStream<String> kafkaStream = env.addSource(flinkKafkaConsumer);
writeTOEs(kafkaStream, client);
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
});


}

public Properties buildKafkaConsumerProperties(String clientTopic) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.getKafkaBootstrapServers() );
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, appConfig.getKafkaFetchMinBytes() );
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, appConfig.getKafkaAutoCommit());
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, appConfig.getKafkaAutoCommitInterval());
properties.put("specific.avro.reader", true);
properties.put("schema.registry.url", appConfig.getKafkaSchemaRegistry());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, appConfig.getKafkaKeyDeserializer());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, appConfig.getKafkaValueDeserializer());

properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, clientTopic);

return properties;
}

public void writeTOEs(DataStream dataStream, Client client) {
HashMap<String, String> config = new HashMap<>();
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", appConfig.getElasticsearchCluster());

List<InetSocketAddress> transportAddresses = new ArrayList<>();
for (String tokenizedHost: appConfig.getElasticsearchHost().split(",")) {
try {
transportAddresses.add(new InetSocketAddress(InetAddress.getByName(tokenizedHost), 9300));
} catch (UnknownHostException e) {
e.printStackTrace();
}
}

dataStream.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);

return Requests.indexRequest()
.index(client.getIndexName())
.type(client.getIndexName() + "-type")
.source(json);
}

@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));

}
}



--
Chandresh Pancholi
Senior Software Engineer
Flipkart.com
Contact:08951803660
Reply | Threaded
Open this post in threaded view
|

Re: java.lang.IllegalArgumentException: The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.

Chesnay Schepler
Your anonymous ElasticsearchSinkFunction accesses the client variable that is defined outside of the function.
For the function to be serializable, said Client must be as well.

I suggest to turn your function into a named class with a constructor that accepts the indexName.

On 27.03.2018 12:15, chandresh pancholi wrote:
Flow
Producer -> Kafka(Avro) -> Flink Connector with Avro deseriser -> FLink -> ES
Kafka - Latest version
Flink : 1.4.2
ES: 5.5.2
@Service
public class FlinkStream {

    @Autowired
    private ClientService clientService;

    @Autowired
    private AppConfig appConfig;

    @PostConstruct
    public void init() {
        List<Client> clientList = clientService.getAllEnableTenant();
        clientList.stream().forEach(client -> {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            ConfluentAvroDeserializationSchema schema = new ConfluentAvroDeserializationSchema(appConfig.getKafkaSchemaRegistry());
            Properties properties = buildKafkaConsumerProperties(client.getTopic());

            FlinkKafkaConsumer011<String> flinkKafkaConsumer = new FlinkKafkaConsumer011<String>(client.getTopic(), schema, properties);

            DataStream<String> kafkaStream = env.addSource(flinkKafkaConsumer);
            writeTOEs(kafkaStream, client);
            try {
                env.execute();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });


    }

    public Properties buildKafkaConsumerProperties(String clientTopic) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.getKafkaBootstrapServers() );
        properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, appConfig.getKafkaFetchMinBytes() );
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, appConfig.getKafkaAutoCommit());
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, appConfig.getKafkaAutoCommitInterval());
        properties.put("specific.avro.reader", true);
        properties.put("schema.registry.url", appConfig.getKafkaSchemaRegistry());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, appConfig.getKafkaKeyDeserializer());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, appConfig.getKafkaValueDeserializer());

        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, clientTopic);

        return properties;
    }

    public void writeTOEs(DataStream dataStream, Client client) {
        HashMap<String, String> config = new HashMap<>();
        config.put("bulk.flush.max.actions", "1");
        config.put("cluster.name", appConfig.getElasticsearchCluster());

        List<InetSocketAddress> transportAddresses = new ArrayList<>();
        for (String tokenizedHost: appConfig.getElasticsearchHost().split(",")) {
            try {
                transportAddresses.add(new InetSocketAddress(InetAddress.getByName(tokenizedHost), 9300));
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
        }

        dataStream.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
            public IndexRequest createIndexRequest(String element) {
                Map<String, String> json = new HashMap<>();
                json.put("data", element);

                return Requests.indexRequest()
                        .index(client.getIndexName())
                        .type(client.getIndexName() + "-type")
                        .source(json);
            }

            @Override
            public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                indexer.add(createIndexRequest(element));
            }
        }));

    }
}



--
Chandresh Pancholi
Senior Software Engineer
Flipkart.com
Contact:08951803660


Reply | Threaded
Open this post in threaded view
|

Re: java.lang.IllegalArgumentException: The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.

chandresh pancholi
Hi,

Thank you for the response. I have made the suggested changes But now I am getting "Caused by: java.lang.NoClassDefFoundError: scala/Product$class"
I am running my application on SpringBoot 2.0 version. Is there better platform to run Flink Code?

Caused by: java.lang.NoClassDefFoundError: scala/Product$class
    at akka.util.Timeout.<init>(Timeout.scala:13) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorSystem$Settings.<init>(ActorSystem.scala:328) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:683) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:245) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:288) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:263) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorSystem$.create(ActorSystem.scala:191) ~[akka-actor_2.11-2.4.20.jar:na]

On Tue, Mar 27, 2018 at 3:54 PM, Chesnay Schepler <[hidden email]> wrote:
Your anonymous ElasticsearchSinkFunction accesses the client variable that is defined outside of the function.
For the function to be serializable, said Client must be as well.

I suggest to turn your function into a named class with a constructor that accepts the indexName.


On 27.03.2018 12:15, chandresh pancholi wrote:
Flow
Producer -> Kafka(Avro) -> Flink Connector with Avro deseriser -> FLink -> ES
Kafka - Latest version
Flink : 1.4.2
ES: 5.5.2
@Service
public class FlinkStream {

    @Autowired
    private ClientService clientService;

    @Autowired
    private AppConfig appConfig;

    @PostConstruct
    public void init() {
        List<Client> clientList = clientService.getAllEnableTenant();
        clientList.stream().forEach(client -> {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            ConfluentAvroDeserializationSchema schema = new ConfluentAvroDeserializationSchema(appConfig.getKafkaSchemaRegistry());
            Properties properties = buildKafkaConsumerProperties(client.getTopic());

            FlinkKafkaConsumer011<String> flinkKafkaConsumer = new FlinkKafkaConsumer011<String>(client.getTopic(), schema, properties);

            DataStream<String> kafkaStream = env.addSource(flinkKafkaConsumer);
            writeTOEs(kafkaStream, client);
            try {
                env.execute();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });


    }

    public Properties buildKafkaConsumerProperties(String clientTopic) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.getKafkaBootstrapServers() );
        properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, appConfig.getKafkaFetchMinBytes() );
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, appConfig.getKafkaAutoCommit());
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, appConfig.getKafkaAutoCommitInterval());
        properties.put("specific.avro.reader", true);
        properties.put("schema.registry.url", appConfig.getKafkaSchemaRegistry());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, appConfig.getKafkaKeyDeserializer());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, appConfig.getKafkaValueDeserializer());

        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, clientTopic);

        return properties;
    }

    public void writeTOEs(DataStream dataStream, Client client) {
        HashMap<String, String> config = new HashMap<>();
        config.put("bulk.flush.max.actions", "1");
        config.put("cluster.name", appConfig.getElasticsearchCluster());

        List<InetSocketAddress> transportAddresses = new ArrayList<>();
        for (String tokenizedHost: appConfig.getElasticsearchHost().split(",")) {
            try {
                transportAddresses.add(new InetSocketAddress(InetAddress.getByName(tokenizedHost), 9300));
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
        }

        dataStream.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
            public IndexRequest createIndexRequest(String element) {
                Map<String, String> json = new HashMap<>();
                json.put("data", element);

                return Requests.indexRequest()
                        .index(client.getIndexName())
                        .type(client.getIndexName() + "-type")
                        .source(json);
            }

            @Override
            public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                indexer.add(createIndexRequest(element));
            }
        }));

    }
}



--
Chandresh Pancholi
Senior Software Engineer
Flipkart.com
Contact:08951803660





--
Chandresh Pancholi
Senior Software Engineer
Flipkart.com
Contact:08951803660