data enrichment via endpoint, serializable issue

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

data enrichment via endpoint, serializable issue

Steffen Wohlers
Hi all,

I’m new to Apache Flink and I have the following issue:

I would like to enrich data via map function. For that I call a method which calls an endpoint but I get following error message 

„The implementation of the MapFunction is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)“ …
„Caused by: java.io.NotSerializableException: com.aylien.textapi.TextAPIClient“

Is there a smart way to fix that issue?

Regards,

Steffen


Map Function:
DataStream<TweetSentiment> tweetSentimentDataStream = noRTDataStream
.map(new MapFunction<Tweet, TweetSentiment>() {
@Override
public TweetSentiment map(Tweet tweet) throws Exception {
String polarity = "good";
polarity = test.testMethod();
polarity = sentimentAnalysis.sentiment(tweet.getText());
return new TweetSentiment(tweet, polarity, 0);
}
});

Class:

public class SentimentAnalysis implements Serializable {

private TextAPIClient _sentimentClient;

public SentimentAnalysis () {
_sentimentClient = new TextAPIClient(xxx", xxx");
}

public String sentiment(String text) throws Exception{
SentimentParams sentimentParams = new SentimentParams(text, null, null);
Sentiment sentiment = _sentimentClient.sentiment(sentimentParams);

return sentiment.getPolarity();
}
(Analysis via Aylien)
Reply | Threaded
Open this post in threaded view
|

Re: data enrichment via endpoint, serializable issue

Xingcan Cui
Hi Steffen,

You could make the class `TextAPIClient` serializable, or use `RichMapFunction` [1] and instantiate all the required objects in its `open()` method.


Best,
Xingcan

On Jul 19, 2018, at 6:56 PM, Steffen Wohlers <[hidden email]> wrote:

Hi all,

I’m new to Apache Flink and I have the following issue:

I would like to enrich data via map function. For that I call a method which calls an endpoint but I get following error message 

„The implementation of the MapFunction is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)“ …
„Caused by: java.io.NotSerializableException: com.aylien.textapi.TextAPIClient“

Is there a smart way to fix that issue?

Regards,

Steffen


Map Function:
DataStream<TweetSentiment> tweetSentimentDataStream = noRTDataStream
.map(new MapFunction<Tweet, TweetSentiment>() {
@Override
public TweetSentiment map(Tweet tweet) throws Exception {
String polarity = "good";
polarity = test.testMethod();
polarity = sentimentAnalysis.sentiment(tweet.getText());
return new TweetSentiment(tweet, polarity, 0);
}
});

Class:

public class SentimentAnalysis implements Serializable {

private TextAPIClient _sentimentClient;

public SentimentAnalysis () {
_sentimentClient = new TextAPIClient(xxx", xxx");
}

public String sentiment(String text) throws Exception{
SentimentParams sentimentParams = new SentimentParams(text, null, null);
Sentiment sentiment = _sentimentClient.sentiment(sentimentParams);

return sentiment.getPolarity();
}
(Analysis via Aylien)

Reply | Threaded
Open this post in threaded view
|

Re: data enrichment via endpoint, serializable issue

Steffen Wohlers
Hi Xingcan,

option two RichMapFunction works , thanks a lot!


Thanks,
Steffen

On 19. Jul 2018, at 13:59, Xingcan Cui <[hidden email]> wrote:

Hi Steffen,

You could make the class `TextAPIClient` serializable, or use `RichMapFunction` [1] and instantiate all the required objects in its `open()` method.


Best,
Xingcan

On Jul 19, 2018, at 6:56 PM, Steffen Wohlers <[hidden email]> wrote:

Hi all,

I’m new to Apache Flink and I have the following issue:

I would like to enrich data via map function. For that I call a method which calls an endpoint but I get following error message 

„The implementation of the MapFunction is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)“ …
„Caused by: java.io.NotSerializableException: com.aylien.textapi.TextAPIClient“

Is there a smart way to fix that issue?

Regards,

Steffen


Map Function:
DataStream<TweetSentiment> tweetSentimentDataStream = noRTDataStream
.map(new MapFunction<Tweet, TweetSentiment>() {
@Override
public TweetSentiment map(Tweet tweet) throws Exception {
String polarity = "good";
polarity = test.testMethod();
polarity = sentimentAnalysis.sentiment(tweet.getText());
return new TweetSentiment(tweet, polarity, 0);
}
});

Class:

public class SentimentAnalysis implements Serializable {

private TextAPIClient _sentimentClient;

public SentimentAnalysis () {
_sentimentClient = new TextAPIClient(xxx", xxx");
}

public String sentiment(String text) throws Exception{
SentimentParams sentimentParams = new SentimentParams(text, null, null);
Sentiment sentiment = _sentimentClient.sentiment(sentimentParams);

return sentiment.getPolarity();
}
(Analysis via Aylien)