com.esotericsoftware.kryo.KryoException and java.lang.IndexOutOfBoundsException

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

com.esotericsoftware.kryo.KryoException and java.lang.IndexOutOfBoundsException

Ahmed Nader
Hello,
I have a TwitterSource and I'm applying some transformations as filter and map on the resulting stream from twitter. I'm collecting the output in an iterator: iterator = DataStreamUtils.collect(datastream). Then in a parallel thread i periodically check if this iterator.hasNext() and print the next item. I'm using Flink 1.0.3.
That program works at the beginning and actually prints some items, however when i leave it running for some more time (Like for example after 40 seconds or 1 minute) then i get 2 exceptions which are: 
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0. 
These 2 exceptions result from the line where i'm checking if the iterator hasNext(). 

I wanted to know why do these exceptions happen in general and also if anyone knows a specific solution for my program, that would be great too.
Thanks,
Ahmed
Reply | Threaded
Open this post in threaded view
|

Re: com.esotericsoftware.kryo.KryoException and java.lang.IndexOutOfBoundsException

Till Rohrmann
Hi Ahmed,

the problem usually occurs, if you use differently initialized Kryo instances where one instance has a different set of classes registered. But your data could also be corrupted because you see an IndexOutOfBoundsException where you try to access an element of an array with size 0 at index 32.  

In order to debug the problem it would be helpful to see the full stack traces of the errors and the complete error message. Additionally, it would be helpful to see your program so that we could try to reproduce the problem.

Cheers,
Till

On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <[hidden email]> wrote:
Hello,
I have a TwitterSource and I'm applying some transformations as filter and map on the resulting stream from twitter. I'm collecting the output in an iterator: iterator = DataStreamUtils.collect(datastream). Then in a parallel thread i periodically check if this iterator.hasNext() and print the next item. I'm using Flink 1.0.3.
That program works at the beginning and actually prints some items, however when i leave it running for some more time (Like for example after 40 seconds or 1 minute) then i get 2 exceptions which are: 
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0. 
These 2 exceptions result from the line where i'm checking if the iterator hasNext(). 

I wanted to know why do these exceptions happen in general and also if anyone knows a specific solution for my program, that would be great too.
Thanks,
Ahmed

Reply | Threaded
Open this post in threaded view
|

Re: com.esotericsoftware.kryo.KryoException and java.lang.IndexOutOfBoundsException

Flavio Pompermaier
In reply to this post by Ahmed Nader
Hi Ahmed,
I also have the same error that is probably caused by the KryoSerializer.
Right now I'm testing a patch to this problem so maybe you could also test it. Unfortunately I'm using Flink 1.0.2 so I don't know whether you can use my KryoSerializer but I think so. Actually I just recreate Input and Output every time in the serialized/deserialize and then I close them.

This is my attempt to fix the problem (actually the KryoSerializer class in the flink-core module):


/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.api.java.typeutils.runtime.kryo;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.base.Preconditions;

import org.apache.avro.generic.GenericData;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.objenesis.strategy.StdInstantiatorStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;

/**
 * A type serializer that serializes its type using the Kryo serialization
 * 
 * This serializer is intended as a fallback serializer for the cases that are
 * not covered by the basic types, tuples, and POJOs.
 *
 * @param <T> The type to be serialized.
 */
public class KryoSerializer<T> extends TypeSerializer<T> {

private static final long serialVersionUID = 3L;

private static final Logger LOG = LoggerFactory.getLogger(KryoSerializer.class);

// ------------------------------------------------------------------------

private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses;
private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultSerializerClasses;
private final LinkedHashSet<Class<?>> registeredTypes;

private final Class<T> type;
// ------------------------------------------------------------------------
// The fields below are lazily initialized after duplication or deserialization.

private transient Kryo kryo;
private transient T copyInstance;
// ------------------------------------------------------------------------

public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
this.type = Preconditions.checkNotNull(type);

this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses();
this.registeredTypesWithSerializers = executionConfig.getRegisteredTypesWithKryoSerializers();
this.registeredTypesWithSerializerClasses = executionConfig.getRegisteredTypesWithKryoSerializerClasses();
this.registeredTypes = executionConfig.getRegisteredKryoTypes();
}

/**
* Copy-constructor that does not copy transient fields. They will be initialized once required.
*/
protected KryoSerializer(KryoSerializer<T> toCopy) {
registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers;
registeredTypesWithSerializerClasses = toCopy.registeredTypesWithSerializerClasses;
defaultSerializers = toCopy.defaultSerializers;
defaultSerializerClasses = toCopy.defaultSerializerClasses;
registeredTypes = toCopy.registeredTypes;

type = toCopy.type;
if(type == null){
throw new NullPointerException("Type class cannot be null.");
}
}

// ------------------------------------------------------------------------

@Override
public boolean isImmutableType() {
return false;
}

@Override
public KryoSerializer<T> duplicate() {
return new KryoSerializer<T>(this);
}

@Override
public T createInstance() {
if(Modifier.isAbstract(type.getModifiers()) || Modifier.isInterface(type.getModifiers()) ) {
return null;
} else {
checkKryoInitialized();
try {
return kryo.newInstance(type);
} catch(Throwable e) {
return null;
}
}
}

@SuppressWarnings("unchecked")
@Override
public T copy(T from) {
if (from == null) {
return null;
}
checkKryoInitialized();
try {
return kryo.copy(from);
}
catch(KryoException ke) {
// kryo was unable to copy it, so we do it through serialization:
ByteArrayOutputStream baout = new ByteArrayOutputStream();
Output output = new Output(baout);

kryo.writeObject(output, from);

output.close();

ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
Input input = new Input(bain);

return (T)kryo.readObject(input, from.getClass());
}
}
@Override
public T copy(T from, T reuse) {
return copy(from);
}

@Override
public int getLength() {
return -1;
}

@Override
public void serialize(T record, DataOutputView target) throws IOException {
checkKryoInitialized();
  DataOutputViewStream outputStream = new DataOutputViewStream(target);
  Output output = new Output(outputStream);

try {
   // Sanity check: Make sure that the output is cleared/has been flushed by the last call
       // otherwise data might be written multiple times in case of a previous EOFException
       if (output.position() != 0) {
           throw new IllegalStateException("The Kryo Output still contains data from a previous " +
               "serialize call. It has to be flushed or cleared at the end of the serialize call.");
       }
       
kryo.writeClassAndObject(output, record);
output.flush();
}
catch (KryoException ke) {
Throwable cause = ke.getCause();
if (cause instanceof EOFException) {
throw (EOFException) cause;
}
else {
throw ke;
}
} finally {
     try {
              output.close();
            } catch (KryoException ke) {
              Throwable cause = ke.getCause();
  
              if (cause instanceof EOFException) {
                  throw (EOFException) cause;
              } else {
                  throw ke;
              }
          }
}
}

@SuppressWarnings("unchecked")
@Override
public T deserialize(DataInputView source) throws IOException {
checkKryoInitialized();
DataInputViewStream inputStream = new DataInputViewStream(source);
Input input = new NoFetchingInput(inputStream);

try {
return (T) kryo.readClassAndObject(input);
} catch (KryoException ke) {
Throwable cause = ke.getCause();

if (cause instanceof EOFException) {
throw (EOFException) cause;
} else {
throw ke;
}
} finally {
 try {
   input.close();
 } catch (KryoException ke) {
            Throwable cause = ke.getCause();

            if (cause instanceof EOFException) {
                throw (EOFException) cause;
            } else {
                throw ke;
            }
          }
}
}
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
return deserialize(source);
}

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
checkKryoInitialized();
if(this.copyInstance == null){
this.copyInstance = createInstance();
}

T tmp = deserialize(copyInstance, source);
serialize(tmp, target);
}
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return Objects.hash(
type,
registeredTypes,
registeredTypesWithSerializerClasses,
defaultSerializerClasses);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof KryoSerializer) {
KryoSerializer<?> other = (KryoSerializer<?>) obj;

// we cannot include the Serializers here because they don't implement the equals method
return other.canEqual(this) &&
type == other.type &&
registeredTypes.equals(other.registeredTypes) &&
registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses) &&
defaultSerializerClasses.equals(other.defaultSerializerClasses);
} else {
return false;
}
}

@Override
public boolean canEqual(Object obj) {
return obj instanceof KryoSerializer;
}

// --------------------------------------------------------------------------------------------

/**
* Returns the Chill Kryo Serializer which is implictly added to the classpath via flink-runtime.
* Falls back to the default Kryo serializer if it can't be found.
* @return The Kryo serializer instance.
*/
private Kryo getKryoInstance() {

try {
// check if ScalaKryoInstantiator is in class path (coming from Twitter's Chill library).
// This will be true if Flink's Scala API is used.
Class<?> chillInstantiatorClazz = Class.forName("com.twitter.chill.ScalaKryoInstantiator");
Object chillInstantiator = chillInstantiatorClazz.newInstance();

// obtain a Kryo instance through Twitter Chill
Method m = chillInstantiatorClazz.getMethod("newKryo");

return (Kryo) m.invoke(chillInstantiator);
} catch (ClassNotFoundException | InstantiationException | NoSuchMethodException |
IllegalAccessException | InvocationTargetException e) {

LOG.warn("Falling back to default Kryo serializer because Chill serializer couldn't be found.", e);

Kryo.DefaultInstantiatorStrategy initStrategy = new Kryo.DefaultInstantiatorStrategy();
initStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());

Kryo kryo = new Kryo();
kryo.setInstantiatorStrategy(initStrategy);

return kryo;
}
}

private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = getKryoInstance();

// Enable reference tracking. 
kryo.setReferences(true);
// Throwable and all subclasses should be serialized via java serialization
kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());

// Add default serializers first, so that they type registrations without a serializer
// are registered with a default serializer
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry: defaultSerializers.entrySet()) {
kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer());
}

for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> entry: defaultSerializerClasses.entrySet()) {
kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
}

// register the type of our class
kryo.register(type);

// register given types. we do this first so that any registration of a
// more specific serializer overrides this
for (Class<?> type : registeredTypes) {
kryo.register(type);
}

// register given serializer classes
for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e : registeredTypesWithSerializerClasses.entrySet()) {
Class<?> typeClass = e.getKey();
Class<? extends Serializer<?>> serializerClass = e.getValue();

Serializer<?> serializer =
ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass);
kryo.register(typeClass, serializer);
}

// register given serializers
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e : registeredTypesWithSerializers.entrySet()) {
kryo.register(e.getKey(), e.getValue().getSerializer());
}
// this is needed for Avro but can not be added on demand.
kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializerForArrayList());

kryo.setRegistrationRequired(false);
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
}
}

// --------------------------------------------------------------------------------------------
// For testing
// --------------------------------------------------------------------------------------------
public Kryo getKryo() {
checkKryoInitialized();
return this.kryo;
}
}

Best,
Flavio

On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <[hidden email]> wrote:
Hello,
I have a TwitterSource and I'm applying some transformations as filter and map on the resulting stream from twitter. I'm collecting the output in an iterator: iterator = DataStreamUtils.collect(datastream). Then in a parallel thread i periodically check if this iterator.hasNext() and print the next item. I'm using Flink 1.0.3.
That program works at the beginning and actually prints some items, however when i leave it running for some more time (Like for example after 40 seconds or 1 minute) then i get 2 exceptions which are: 
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0. 
These 2 exceptions result from the line where i'm checking if the iterator hasNext(). 

I wanted to know why do these exceptions happen in general and also if anyone knows a specific solution for my program, that would be great too.
Thanks,
Ahmed



--

Flavio Pompermaier
Development Department
_______________________________________________
OKKAMSrl www.okkam.it

Phone: +(39) 0461 283 702
Fax: + (39) 0461 186 6433
Email: [hidden email]
Headquarters: Trento (Italy), via G.B. Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.

Reply | Threaded
Open this post in threaded view
|

Re: com.esotericsoftware.kryo.KryoException and java.lang.IndexOutOfBoundsException

Ahmed Nader
In reply to this post by Till Rohrmann
Hello Till,
Thanks  so much for your reply. Here's my program:
So that's TwitterSource: 
public class TwitterSource extends Stream {
private static final long serialVersionUID = 1L;
protected transient BlockingQueue<String> queue;
protected int queueSize = 10000;
private transient BasicClient client;
private int waitSec = 5;

private int maxNumberOfTweets;
private int currentNumberOfTweets;

private volatile boolean isRunning = true;

public TwitterSource(int numberOfTweets) {
this.maxNumberOfTweets = numberOfTweets;
currentNumberOfTweets = 0;
}

public void initializeConnection() {
queue = new LinkedBlockingQueue<>(queueSize);

UserstreamEndpoint endpoint = new UserstreamEndpoint ();
endpoint.stallWarnings(false);

Authentication auth = authenticate();

initializeClient(endpoint, auth);

}

public OAuth1 authenticate() {
return new OAuth1("---");
}

protected void initializeClient(DefaultStreamingEndpoint endpoint, Authentication auth) {
client = new ClientBuilder().name("twitterSourceClient").hosts(Constants.USERSTREAM_HOST)
.endpoint(endpoint).authentication(auth)
.processor(new StringDelimitedProcessor(queue)).build();

client.connect();
}

@Override
public void run(SourceContext<Object> sourceContext) throws Exception {
initializeConnection();
while (isRunning) {
sourceContext.collect(queue.take());
currentNumberOfTweets++;
if (maxNumberOfTweets != -1 && currentNumberOfTweets >= maxNumberOfTweets) {
break;
}
Thread.sleep(1000);
}
}

@Override
public void cancel() {
isRunning = false;
} }
then i initialize it:
List<Object> globalEntities = new ArrayList<>();
Iterator iterator;
public void runModel(String key) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment(); DataStream twitter = env.addSource(new TwitterSource(30)).flatMap(new processTweets()); twitter.filter(new FilterFunction<Object>() {
    @Override
public boolean filter(Object tweet) throws Exception {
Tweet Singletweet = (Tweet) tweet
;
return Singletweet.search(key);
}
}).print()
;
iterator = DataStreamUtils.collect(datastream); } //this method is called periodically with an Ajax call every 2 seconds
public void viewResults(Model model) {
if (iterator != null) {
if (iterator.hasNext()) {
globalEntities.add(iter.next());
}
}
if (!globalEntities.isEmpty()) {
model.addAttribute("list", globalEntities);
}
}
public static class processTweets extends JSONParseFlatMap<Object, Object> {
@Override
public void flatMap(Object value, Collector<Object> out) throws Exception {
try {
//if (getString((String)value, "user.lang").equals("en")) {
Tweet tweet = new Tweet();
// message of tweet
tweet.setText(getString((String) value, "text"));
tweet.setUser(getString((String) value, "user.name"));
out.collect(tweet);
// }
} catch (JSONException e) {
// the JSON was not parsed correctly
}
}
}
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class Tweet {
private String user;
private String text;

public Tweet() {

}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}

public String getText() {
return text;
}

public void setText(String text) {
this.text = text;
}

public String toString() {
return this.user+" : "+this.text;
}

public boolean search(String key) {
String patternString = ".*"+key+".*";
Pattern pattern = Pattern.compile(patternString, Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(this.toString());
return matcher.find();
}
}
And that's the stack trace:
2016-06-08 17:20:10.091  INFO 13564 --- [om Source (1/1)] o.apache.flink.runtime.taskmanager.Task  : Source: Custom Source (1/1) switched to RUNNING
2016-06-08 17:20:10.096  INFO 13564 --- [lt-dispatcher-2] o.a.f.r.executiongraph.ExecutionGraph    : Sink: Unnamed (1/1) (50d42a893093705f278bd0aa099a53d3) switched from DEPLOYING to RUNNING
2016-06-08 17:20:10.097  INFO 13564 --- [lt-dispatcher-5] o.a.f.r.executiongraph.ExecutionGraph    : Flat Map -> Filter -> Sink: Unnamed (4/4) (3c98bbdab04256d73f1f405669d007a8) switched from DEPLOYING to RUNNING
2016-06-08 17:20:10.097  INFO 13564 --- [lt-dispatcher-3] o.a.f.r.executiongraph.ExecutionGraph    : Flat Map -> Filter -> Sink: Unnamed (3/4) (49c23924bae61cead7158bc817c22d0b) switched from DEPLOYING to RUNNING
2016-06-08 17:20:10.097  INFO 13564 --- [lt-dispatcher-4] o.a.f.r.executiongraph.ExecutionGraph    : Flat Map -> Filter -> Sink: Unnamed (2/4) (7cfc6b655d7bb70beb901012094db0e5) switched from DEPLOYING to RUNNING
2016-06-08 17:20:10.099  INFO 13564 --- [lt-dispatcher-7] o.a.f.r.executiongraph.ExecutionGraph    : Flat Map -> Filter -> Sink: Unnamed (1/4) (6afafe555579dc31e1c974c7238d486c) switched from DEPLOYING to RUNNING
2016-06-08 17:20:10.099  INFO 13564 --- [lt-dispatcher-6] o.a.flink.runtime.client.JobClientActor  : 06/08/2016 17:20:10 Sink: Unnamed(1/1) switched to RUNNING 
06/08/2016 17:20:10 Sink: Unnamed(1/1) switched to RUNNING 
2016-06-08 17:20:10.100  INFO 13564 --- [lt-dispatcher-9] o.a.f.r.executiongraph.ExecutionGraph    : Source: Custom Source (1/1) (92f08ef0a7d715478bf9fe60e8bc4dea) switched from DEPLOYING to RUNNING
2016-06-08 17:20:10.107  INFO 13564 --- [lt-dispatcher-6] o.a.flink.runtime.client.JobClientActor  : 06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(4/4) switched to RUNNING 
06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(4/4) switched to RUNNING 
2016-06-08 17:20:10.108  INFO 13564 --- [lt-dispatcher-6] o.a.flink.runtime.client.JobClientActor  : 06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(3/4) switched to RUNNING 
06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(3/4) switched to RUNNING 
2016-06-08 17:20:10.109  INFO 13564 --- [lt-dispatcher-6] o.a.flink.runtime.client.JobClientActor  : 06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(2/4) switched to RUNNING 
06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(2/4) switched to RUNNING 
2016-06-08 17:20:10.109  INFO 13564 --- [lt-dispatcher-6] o.a.flink.runtime.client.JobClientActor  : 06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(1/4) switched to RUNNING 
06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(1/4) switched to RUNNING 
2016-06-08 17:20:10.109  INFO 13564 --- [lt-dispatcher-6] o.a.flink.runtime.client.JobClientActor  : 06/08/2016 17:20:10 Source: Custom Source(1/1) switched to RUNNING 
06/08/2016 17:20:10 Source: Custom Source(1/1) switched to RUNNING 
2016-06-08 17:20:10.124  WARN 13564 --- [: Unnamed (4/4)] o.a.f.s.runtime.tasks.StreamTask         : No state backend has been specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.125  INFO 13564 --- [: Unnamed (4/4)] o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap memory (checkpoint to jobmanager)
2016-06-08 17:20:10.129  WARN 13564 --- [: Unnamed (3/4)] o.a.f.s.runtime.tasks.StreamTask         : No state backend has been specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.132  INFO 13564 --- [: Unnamed (3/4)] o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap memory (checkpoint to jobmanager)
2016-06-08 17:20:10.132  WARN 13564 --- [: Unnamed (1/1)] o.a.f.s.runtime.tasks.StreamTask         : No state backend has been specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.132  INFO 13564 --- [: Unnamed (1/1)] o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap memory (checkpoint to jobmanager)
2016-06-08 17:20:10.136  WARN 13564 --- [: Unnamed (4/4)] o.a.f.s.runtime.tasks.StreamTask         : No state backend has been specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.136  INFO 13564 --- [: Unnamed (4/4)] o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap memory (checkpoint to jobmanager)
2016-06-08 17:20:10.137  WARN 13564 --- [: Unnamed (4/4)] o.a.f.s.runtime.tasks.StreamTask         : No state backend has been specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.137  INFO 13564 --- [: Unnamed (4/4)] o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap memory (checkpoint to jobmanager)
2016-06-08 17:20:10.138  WARN 13564 --- [: Unnamed (2/4)] o.a.f.s.runtime.tasks.StreamTask         : No state backend has been specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.138  INFO 13564 --- [: Unnamed (2/4)] o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap memory (checkpoint to jobmanager)
2016-06-08 17:20:10.139  WARN 13564 --- [: Unnamed (2/4)] o.a.f.s.runtime.tasks.StreamTask         : No state backend has been specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.139  INFO 13564 --- [: Unnamed (2/4)] o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap memory (checkpoint to jobmanager)
2016-06-08 17:20:10.140  WARN 13564 --- [: Unnamed (2/4)] o.a.f.s.runtime.tasks.StreamTask         : No state backend has been specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.140  INFO 13564 --- [: Unnamed (2/4)] o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap memory (checkpoint to jobmanager)
2016-06-08 17:20:10.148  WARN 13564 --- [: Unnamed (3/4)] o.a.f.s.runtime.tasks.StreamTask         : No state backend has been specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.148  INFO 13564 --- [: Unnamed (3/4)] o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap memory (checkpoint to jobmanager)
2016-06-08 17:20:10.149  WARN 13564 --- [: Unnamed (3/4)] o.a.f.s.runtime.tasks.StreamTask         : No state backend has been specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.149  INFO 13564 --- [: Unnamed (3/4)] o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap memory (checkpoint to jobmanager)
2016-06-08 17:20:10.150  WARN 13564 --- [om Source (1/1)] o.a.f.s.runtime.tasks.StreamTask         : No state backend has been specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.150  INFO 13564 --- [om Source (1/1)] o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap memory (checkpoint to jobmanager)
2016-06-08 17:20:10.158  WARN 13564 --- [: Unnamed (1/4)] o.a.f.s.runtime.tasks.StreamTask         : No state backend has been specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.159  INFO 13564 --- [: Unnamed (1/4)] o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap memory (checkpoint to jobmanager)
2016-06-08 17:20:10.159  WARN 13564 --- [: Unnamed (1/4)] o.a.f.s.runtime.tasks.StreamTask         : No state backend has been specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.160  INFO 13564 --- [: Unnamed (1/4)] o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap memory (checkpoint to jobmanager)
2016-06-08 17:20:10.160  WARN 13564 --- [: Unnamed (1/4)] o.a.f.s.runtime.tasks.StreamTask         : No state backend has been specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.160  INFO 13564 --- [: Unnamed (1/4)] o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap memory (checkpoint to jobmanager)
2016-06-08 17:20:10.359  INFO 13564 --- [om Source (1/1)] com.twitter.hbc.httpclient.BasicClient   : New connection executed: twitterSourceClient, endpoint: /1.1/user.json?delimited=length
2016-06-08 17:20:10.622  INFO 13564 --- [ent-io-thread-0] com.twitter.hbc.httpclient.ClientBase    : twitterSourceClient Establishing a connection
2016-06-08 17:20:11.687  INFO 13564 --- [ent-io-thread-0] com.twitter.hbc.httpclient.ClientBase    : twitterSourceClient Processing connection data

Then Here are some results printed:

2> Fast Company : Homemade brings an Etsy mindset to food https://t.co/9uaOTPka58 https://t.co/VB7IFIxrdM
3> tagesthemen : Noch immer #Flüchtlinge auf der #Balkanroute. An der Grenze zu #Ungarn wartet Stacheldraht - der hat aber Löcher.
4> BuzzFeed News : Maria Sharapova says that she will fight back against the ITF's decision to suspend her https://t.co/DuNhDUv64f https://t.co/3BZcfR9Vid

Then  the exceptions are thrown:

2016-06-08 17:20:42.326 ERROR 13564 --- [io-8080-exec-10] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 97] with root cause

com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 97 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[kryo-2.24.0.jar:na] at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) ~[flink-core-1.0.3.jar:1.0.3] at org.apache.flink.contrib.streaming.DataStreamIterator.readNextFromStream(DataStreamIterator.java:108) ~[flink-streaming-contrib-0.10.2.jar:0.10.2] at org.apache.flink.contrib.streaming.DataStreamIterator.hasNext(DataStreamIterator.java:78) ~[flink-streaming-contrib-0.10.2.jar:0.10.2] at com.example.controllers.MiningModelController.viewResults(MiningModelController.java:559) ~[classes/:na] at com.example.controllers.MiningModelController$$FastClassBySpringCGLIB$$d468bf0d.invoke(<generated>) ~[classes/:na] at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) ~[spring-core-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:651) ~[spring-aop-4.2.5.RELEASE.jar:4.2.5.RELEASE] at com.example.controllers.MiningModelController$$EnhancerBySpringCGLIB$$de7f687c.viewResults(<generated>) ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_73] at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(HttpServlet.service(HttpServlet.java:729) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:292) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) ~[tomcat-embed-websocket-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:87) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:77) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:121) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:212) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:141) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:522) AbstractConnectionHandler.process(AbstractProtocol.java:672) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1500) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1456) [tomcat-embed-core-8.0.32.jar:8.0.32] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_73] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_73] at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.0.32.jar:8.0.32] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]

java.lang.IndexOutOfBoundsException: Index: 99, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:653) ~[na:1.8.0_73] at java.util.ArrayList.get(ArrayList.java:429) ~[na:1.8.0_73] at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) ~[kryo-2.24.0.jar:na] at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) ~[flink-core-1.0.3.jar:1.0.3] at org.apache.flink.contrib.streaming.DataStreamIterator.readNextFromStream(DataStreamIterator.java:108) ~[flink-streaming-contrib-0.10.2.jar:0.10.2] at org.apache.flink.contrib.streaming.DataStreamIterator.hasNext(DataStreamIterator.java:78) ~[flink-streaming-contrib-0.10.2.jar:0.10.2] at com.example.controllers.MiningModelController.viewResults(MiningModelController.java:559) ~[classes/:na] at com.example.controllers.MiningModelController$$FastClassBySpringCGLIB$$d468bf0d.invoke(<generated>) ~[classes/:na] at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) ~[spring-core-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:651) ~[spring-aop-4.2.5.RELEASE.jar:4.2.5.RELEASE] at com.example.controllers.MiningModelController$$EnhancerBySpringCGLIB$$de7f687c.viewResults(<generated>) ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_73] at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:292) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) ~[tomcat-embed-websocket-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:87) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:77) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:212) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:141) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:522) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1095) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:672) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1500) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1456) [tomcat-embed-core-8.0.32.jar:8.0.32] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_73] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_73] at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.0.32.jar:8.0.32] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]

And please note that the line where these exceptions point is the line that I'm checking the condition if(iterator.hasNext())

Thanks,
Ahmed




On 8 June 2016 at 16:07, Till Rohrmann <[hidden email]> wrote:
Hi Ahmed,

the problem usually occurs, if you use differently initialized Kryo instances where one instance has a different set of classes registered. But your data could also be corrupted because you see an IndexOutOfBoundsException where you try to access an element of an array with size 0 at index 32.  

In order to debug the problem it would be helpful to see the full stack traces of the errors and the complete error message. Additionally, it would be helpful to see your program so that we could try to reproduce the problem.

Cheers,
Till

On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <[hidden email]> wrote:
Hello,
I have a TwitterSource and I'm applying some transformations as filter and map on the resulting stream from twitter. I'm collecting the output in an iterator: iterator = DataStreamUtils.collect(datastream). Then in a parallel thread i periodically check if this iterator.hasNext() and print the next item. I'm using Flink 1.0.3.
That program works at the beginning and actually prints some items, however when i leave it running for some more time (Like for example after 40 seconds or 1 minute) then i get 2 exceptions which are: 
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0. 
These 2 exceptions result from the line where i'm checking if the iterator hasNext(). 

I wanted to know why do these exceptions happen in general and also if anyone knows a specific solution for my program, that would be great too.
Thanks,
Ahmed


Reply | Threaded
Open this post in threaded view
|

Re: com.esotericsoftware.kryo.KryoException and java.lang.IndexOutOfBoundsException

Ahmed Nader
In reply to this post by Flavio Pompermaier
Hello Flavio,
Thank you so much for replying, however I didn't download Flink locally, I only added dependencies in a maven project. So i don't think I'll be able to modify the KryoSerializer class. But yeah me too i think it's the problem.
Thanks,
Ahmed

On 8 June 2016 at 16:07, Flavio Pompermaier <[hidden email]> wrote:
Hi Ahmed,
I also have the same error that is probably caused by the KryoSerializer.
Right now I'm testing a patch to this problem so maybe you could also test it. Unfortunately I'm using Flink 1.0.2 so I don't know whether you can use my KryoSerializer but I think so. Actually I just recreate Input and Output every time in the serialized/deserialize and then I close them.

This is my attempt to fix the problem (actually the KryoSerializer class in the flink-core module):


/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.api.java.typeutils.runtime.kryo;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.base.Preconditions;

import org.apache.avro.generic.GenericData;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.objenesis.strategy.StdInstantiatorStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;

/**
 * A type serializer that serializes its type using the Kryo serialization
 * 
 * This serializer is intended as a fallback serializer for the cases that are
 * not covered by the basic types, tuples, and POJOs.
 *
 * @param <T> The type to be serialized.
 */
public class KryoSerializer<T> extends TypeSerializer<T> {

private static final long serialVersionUID = 3L;

private static final Logger LOG = LoggerFactory.getLogger(KryoSerializer.class);

// ------------------------------------------------------------------------

private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses;
private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultSerializerClasses;
private final LinkedHashSet<Class<?>> registeredTypes;

private final Class<T> type;
// ------------------------------------------------------------------------
// The fields below are lazily initialized after duplication or deserialization.

private transient Kryo kryo;
private transient T copyInstance;
// ------------------------------------------------------------------------

public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
this.type = Preconditions.checkNotNull(type);

this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses();
this.registeredTypesWithSerializers = executionConfig.getRegisteredTypesWithKryoSerializers();
this.registeredTypesWithSerializerClasses = executionConfig.getRegisteredTypesWithKryoSerializerClasses();
this.registeredTypes = executionConfig.getRegisteredKryoTypes();
}

/**
* Copy-constructor that does not copy transient fields. They will be initialized once required.
*/
protected KryoSerializer(KryoSerializer<T> toCopy) {
registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers;
registeredTypesWithSerializerClasses = toCopy.registeredTypesWithSerializerClasses;
defaultSerializers = toCopy.defaultSerializers;
defaultSerializerClasses = toCopy.defaultSerializerClasses;
registeredTypes = toCopy.registeredTypes;

type = toCopy.type;
if(type == null){
throw new NullPointerException("Type class cannot be null.");
}
}

// ------------------------------------------------------------------------

@Override
public boolean isImmutableType() {
return false;
}

@Override
public KryoSerializer<T> duplicate() {
return new KryoSerializer<T>(this);
}

@Override
public T createInstance() {
if(Modifier.isAbstract(type.getModifiers()) || Modifier.isInterface(type.getModifiers()) ) {
return null;
} else {
checkKryoInitialized();
try {
return kryo.newInstance(type);
} catch(Throwable e) {
return null;
}
}
}

@SuppressWarnings("unchecked")
@Override
public T copy(T from) {
if (from == null) {
return null;
}
checkKryoInitialized();
try {
return kryo.copy(from);
}
catch(KryoException ke) {
// kryo was unable to copy it, so we do it through serialization:
ByteArrayOutputStream baout = new ByteArrayOutputStream();
Output output = new Output(baout);

kryo.writeObject(output, from);

output.close();

ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
Input input = new Input(bain);

return (T)kryo.readObject(input, from.getClass());
}
}
@Override
public T copy(T from, T reuse) {
return copy(from);
}

@Override
public int getLength() {
return -1;
}

@Override
public void serialize(T record, DataOutputView target) throws IOException {
checkKryoInitialized();
  DataOutputViewStream outputStream = new DataOutputViewStream(target);
  Output output = new Output(outputStream);

try {
   // Sanity check: Make sure that the output is cleared/has been flushed by the last call
       // otherwise data might be written multiple times in case of a previous EOFException
       if (output.position() != 0) {
           throw new IllegalStateException("The Kryo Output still contains data from a previous " +
               "serialize call. It has to be flushed or cleared at the end of the serialize call.");
       }
       
kryo.writeClassAndObject(output, record);
output.flush();
}
catch (KryoException ke) {
Throwable cause = ke.getCause();
if (cause instanceof EOFException) {
throw (EOFException) cause;
}
else {
throw ke;
}
} finally {
     try {
              output.close();
            } catch (KryoException ke) {
              Throwable cause = ke.getCause();
  
              if (cause instanceof EOFException) {
                  throw (EOFException) cause;
              } else {
                  throw ke;
              }
          }
}
}

@SuppressWarnings("unchecked")
@Override
public T deserialize(DataInputView source) throws IOException {
checkKryoInitialized();
DataInputViewStream inputStream = new DataInputViewStream(source);
Input input = new NoFetchingInput(inputStream);

try {
return (T) kryo.readClassAndObject(input);
} catch (KryoException ke) {
Throwable cause = ke.getCause();

if (cause instanceof EOFException) {
throw (EOFException) cause;
} else {
throw ke;
}
} finally {
 try {
   input.close();
 } catch (KryoException ke) {
            Throwable cause = ke.getCause();

            if (cause instanceof EOFException) {
                throw (EOFException) cause;
            } else {
                throw ke;
            }
          }
}
}
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
return deserialize(source);
}

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
checkKryoInitialized();
if(this.copyInstance == null){
this.copyInstance = createInstance();
}

T tmp = deserialize(copyInstance, source);
serialize(tmp, target);
}
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return Objects.hash(
type,
registeredTypes,
registeredTypesWithSerializerClasses,
defaultSerializerClasses);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof KryoSerializer) {
KryoSerializer<?> other = (KryoSerializer<?>) obj;

// we cannot include the Serializers here because they don't implement the equals method
return other.canEqual(this) &&
type == other.type &&
registeredTypes.equals(other.registeredTypes) &&
registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses) &&
defaultSerializerClasses.equals(other.defaultSerializerClasses);
} else {
return false;
}
}

@Override
public boolean canEqual(Object obj) {
return obj instanceof KryoSerializer;
}

// --------------------------------------------------------------------------------------------

/**
* Returns the Chill Kryo Serializer which is implictly added to the classpath via flink-runtime.
* Falls back to the default Kryo serializer if it can't be found.
* @return The Kryo serializer instance.
*/
private Kryo getKryoInstance() {

try {
// check if ScalaKryoInstantiator is in class path (coming from Twitter's Chill library).
// This will be true if Flink's Scala API is used.
Class<?> chillInstantiatorClazz = Class.forName("com.twitter.chill.ScalaKryoInstantiator");
Object chillInstantiator = chillInstantiatorClazz.newInstance();

// obtain a Kryo instance through Twitter Chill
Method m = chillInstantiatorClazz.getMethod("newKryo");

return (Kryo) m.invoke(chillInstantiator);
} catch (ClassNotFoundException | InstantiationException | NoSuchMethodException |
IllegalAccessException | InvocationTargetException e) {

LOG.warn("Falling back to default Kryo serializer because Chill serializer couldn't be found.", e);

Kryo.DefaultInstantiatorStrategy initStrategy = new Kryo.DefaultInstantiatorStrategy();
initStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());

Kryo kryo = new Kryo();
kryo.setInstantiatorStrategy(initStrategy);

return kryo;
}
}

private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = getKryoInstance();

// Enable reference tracking. 
kryo.setReferences(true);
// Throwable and all subclasses should be serialized via java serialization
kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());

// Add default serializers first, so that they type registrations without a serializer
// are registered with a default serializer
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry: defaultSerializers.entrySet()) {
kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer());
}

for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> entry: defaultSerializerClasses.entrySet()) {
kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
}

// register the type of our class
kryo.register(type);

// register given types. we do this first so that any registration of a
// more specific serializer overrides this
for (Class<?> type : registeredTypes) {
kryo.register(type);
}

// register given serializer classes
for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e : registeredTypesWithSerializerClasses.entrySet()) {
Class<?> typeClass = e.getKey();
Class<? extends Serializer<?>> serializerClass = e.getValue();

Serializer<?> serializer =
ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass);
kryo.register(typeClass, serializer);
}

// register given serializers
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e : registeredTypesWithSerializers.entrySet()) {
kryo.register(e.getKey(), e.getValue().getSerializer());
}
// this is needed for Avro but can not be added on demand.
kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializerForArrayList());

kryo.setRegistrationRequired(false);
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
}
}

// --------------------------------------------------------------------------------------------
// For testing
// --------------------------------------------------------------------------------------------
public Kryo getKryo() {
checkKryoInitialized();
return this.kryo;
}
}

Best,
Flavio

On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <[hidden email]> wrote:
Hello,
I have a TwitterSource and I'm applying some transformations as filter and map on the resulting stream from twitter. I'm collecting the output in an iterator: iterator = DataStreamUtils.collect(datastream). Then in a parallel thread i periodically check if this iterator.hasNext() and print the next item. I'm using Flink 1.0.3.
That program works at the beginning and actually prints some items, however when i leave it running for some more time (Like for example after 40 seconds or 1 minute) then i get 2 exceptions which are: 
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0. 
These 2 exceptions result from the line where i'm checking if the iterator hasNext(). 

I wanted to know why do these exceptions happen in general and also if anyone knows a specific solution for my program, that would be great too.
Thanks,
Ahmed



--

Flavio Pompermaier
Development Department
_______________________________________________
OKKAMSrl www.okkam.it

Phone: +(39) 0461 283 702
Fax: + (39) 0461 186 6433
Email: [hidden email]
Headquarters: Trento (Italy), via G.B. Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.


Reply | Threaded
Open this post in threaded view
|

Re: com.esotericsoftware.kryo.KryoException and java.lang.IndexOutOfBoundsException

Till Rohrmann
Hi Ahmed,

I tried setting up your use case and for me it all seems to work. However, I didn't use the Spring framework and executed the program in a local Flink cluster.

Maybe you can compile a self-containing example (including example data) to reproduce your problem and send it to us.

Cheers,
Till

On Wed, Jun 8, 2016 at 5:48 PM, Ahmed Nader <[hidden email]> wrote:
Hello Flavio,
Thank you so much for replying, however I didn't download Flink locally, I only added dependencies in a maven project. So i don't think I'll be able to modify the KryoSerializer class. But yeah me too i think it's the problem.
Thanks,
Ahmed

On 8 June 2016 at 16:07, Flavio Pompermaier <[hidden email]> wrote:
Hi Ahmed,
I also have the same error that is probably caused by the KryoSerializer.
Right now I'm testing a patch to this problem so maybe you could also test it. Unfortunately I'm using Flink 1.0.2 so I don't know whether you can use my KryoSerializer but I think so. Actually I just recreate Input and Output every time in the serialized/deserialize and then I close them.

This is my attempt to fix the problem (actually the KryoSerializer class in the flink-core module):


/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.api.java.typeutils.runtime.kryo;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.base.Preconditions;

import org.apache.avro.generic.GenericData;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.objenesis.strategy.StdInstantiatorStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;

/**
 * A type serializer that serializes its type using the Kryo serialization
 * 
 * This serializer is intended as a fallback serializer for the cases that are
 * not covered by the basic types, tuples, and POJOs.
 *
 * @param <T> The type to be serialized.
 */
public class KryoSerializer<T> extends TypeSerializer<T> {

private static final long serialVersionUID = 3L;

private static final Logger LOG = LoggerFactory.getLogger(KryoSerializer.class);

// ------------------------------------------------------------------------

private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses;
private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultSerializerClasses;
private final LinkedHashSet<Class<?>> registeredTypes;

private final Class<T> type;
// ------------------------------------------------------------------------
// The fields below are lazily initialized after duplication or deserialization.

private transient Kryo kryo;
private transient T copyInstance;
// ------------------------------------------------------------------------

public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
this.type = Preconditions.checkNotNull(type);

this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses();
this.registeredTypesWithSerializers = executionConfig.getRegisteredTypesWithKryoSerializers();
this.registeredTypesWithSerializerClasses = executionConfig.getRegisteredTypesWithKryoSerializerClasses();
this.registeredTypes = executionConfig.getRegisteredKryoTypes();
}

/**
* Copy-constructor that does not copy transient fields. They will be initialized once required.
*/
protected KryoSerializer(KryoSerializer<T> toCopy) {
registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers;
registeredTypesWithSerializerClasses = toCopy.registeredTypesWithSerializerClasses;
defaultSerializers = toCopy.defaultSerializers;
defaultSerializerClasses = toCopy.defaultSerializerClasses;
registeredTypes = toCopy.registeredTypes;

type = toCopy.type;
if(type == null){
throw new NullPointerException("Type class cannot be null.");
}
}

// ------------------------------------------------------------------------

@Override
public boolean isImmutableType() {
return false;
}

@Override
public KryoSerializer<T> duplicate() {
return new KryoSerializer<T>(this);
}

@Override
public T createInstance() {
if(Modifier.isAbstract(type.getModifiers()) || Modifier.isInterface(type.getModifiers()) ) {
return null;
} else {
checkKryoInitialized();
try {
return kryo.newInstance(type);
} catch(Throwable e) {
return null;
}
}
}

@SuppressWarnings("unchecked")
@Override
public T copy(T from) {
if (from == null) {
return null;
}
checkKryoInitialized();
try {
return kryo.copy(from);
}
catch(KryoException ke) {
// kryo was unable to copy it, so we do it through serialization:
ByteArrayOutputStream baout = new ByteArrayOutputStream();
Output output = new Output(baout);

kryo.writeObject(output, from);

output.close();

ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
Input input = new Input(bain);

return (T)kryo.readObject(input, from.getClass());
}
}
@Override
public T copy(T from, T reuse) {
return copy(from);
}

@Override
public int getLength() {
return -1;
}

@Override
public void serialize(T record, DataOutputView target) throws IOException {
checkKryoInitialized();
  DataOutputViewStream outputStream = new DataOutputViewStream(target);
  Output output = new Output(outputStream);

try {
   // Sanity check: Make sure that the output is cleared/has been flushed by the last call
       // otherwise data might be written multiple times in case of a previous EOFException
       if (output.position() != 0) {
           throw new IllegalStateException("The Kryo Output still contains data from a previous " +
               "serialize call. It has to be flushed or cleared at the end of the serialize call.");
       }
       
kryo.writeClassAndObject(output, record);
output.flush();
}
catch (KryoException ke) {
Throwable cause = ke.getCause();
if (cause instanceof EOFException) {
throw (EOFException) cause;
}
else {
throw ke;
}
} finally {
     try {
              output.close();
            } catch (KryoException ke) {
              Throwable cause = ke.getCause();
  
              if (cause instanceof EOFException) {
                  throw (EOFException) cause;
              } else {
                  throw ke;
              }
          }
}
}

@SuppressWarnings("unchecked")
@Override
public T deserialize(DataInputView source) throws IOException {
checkKryoInitialized();
DataInputViewStream inputStream = new DataInputViewStream(source);
Input input = new NoFetchingInput(inputStream);

try {
return (T) kryo.readClassAndObject(input);
} catch (KryoException ke) {
Throwable cause = ke.getCause();

if (cause instanceof EOFException) {
throw (EOFException) cause;
} else {
throw ke;
}
} finally {
 try {
   input.close();
 } catch (KryoException ke) {
            Throwable cause = ke.getCause();

            if (cause instanceof EOFException) {
                throw (EOFException) cause;
            } else {
                throw ke;
            }
          }
}
}
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
return deserialize(source);
}

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
checkKryoInitialized();
if(this.copyInstance == null){
this.copyInstance = createInstance();
}

T tmp = deserialize(copyInstance, source);
serialize(tmp, target);
}
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return Objects.hash(
type,
registeredTypes,
registeredTypesWithSerializerClasses,
defaultSerializerClasses);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof KryoSerializer) {
KryoSerializer<?> other = (KryoSerializer<?>) obj;

// we cannot include the Serializers here because they don't implement the equals method
return other.canEqual(this) &&
type == other.type &&
registeredTypes.equals(other.registeredTypes) &&
registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses) &&
defaultSerializerClasses.equals(other.defaultSerializerClasses);
} else {
return false;
}
}

@Override
public boolean canEqual(Object obj) {
return obj instanceof KryoSerializer;
}

// --------------------------------------------------------------------------------------------

/**
* Returns the Chill Kryo Serializer which is implictly added to the classpath via flink-runtime.
* Falls back to the default Kryo serializer if it can't be found.
* @return The Kryo serializer instance.
*/
private Kryo getKryoInstance() {

try {
// check if ScalaKryoInstantiator is in class path (coming from Twitter's Chill library).
// This will be true if Flink's Scala API is used.
Class<?> chillInstantiatorClazz = Class.forName("com.twitter.chill.ScalaKryoInstantiator");
Object chillInstantiator = chillInstantiatorClazz.newInstance();

// obtain a Kryo instance through Twitter Chill
Method m = chillInstantiatorClazz.getMethod("newKryo");

return (Kryo) m.invoke(chillInstantiator);
} catch (ClassNotFoundException | InstantiationException | NoSuchMethodException |
IllegalAccessException | InvocationTargetException e) {

LOG.warn("Falling back to default Kryo serializer because Chill serializer couldn't be found.", e);

Kryo.DefaultInstantiatorStrategy initStrategy = new Kryo.DefaultInstantiatorStrategy();
initStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());

Kryo kryo = new Kryo();
kryo.setInstantiatorStrategy(initStrategy);

return kryo;
}
}

private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = getKryoInstance();

// Enable reference tracking. 
kryo.setReferences(true);
// Throwable and all subclasses should be serialized via java serialization
kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());

// Add default serializers first, so that they type registrations without a serializer
// are registered with a default serializer
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry: defaultSerializers.entrySet()) {
kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer());
}

for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> entry: defaultSerializerClasses.entrySet()) {
kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
}

// register the type of our class
kryo.register(type);

// register given types. we do this first so that any registration of a
// more specific serializer overrides this
for (Class<?> type : registeredTypes) {
kryo.register(type);
}

// register given serializer classes
for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e : registeredTypesWithSerializerClasses.entrySet()) {
Class<?> typeClass = e.getKey();
Class<? extends Serializer<?>> serializerClass = e.getValue();

Serializer<?> serializer =
ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass);
kryo.register(typeClass, serializer);
}

// register given serializers
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e : registeredTypesWithSerializers.entrySet()) {
kryo.register(e.getKey(), e.getValue().getSerializer());
}
// this is needed for Avro but can not be added on demand.
kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializerForArrayList());

kryo.setRegistrationRequired(false);
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
}
}

// --------------------------------------------------------------------------------------------
// For testing
// --------------------------------------------------------------------------------------------
public Kryo getKryo() {
checkKryoInitialized();
return this.kryo;
}
}

Best,
Flavio

On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <[hidden email]> wrote:
Hello,
I have a TwitterSource and I'm applying some transformations as filter and map on the resulting stream from twitter. I'm collecting the output in an iterator: iterator = DataStreamUtils.collect(datastream). Then in a parallel thread i periodically check if this iterator.hasNext() and print the next item. I'm using Flink 1.0.3.
That program works at the beginning and actually prints some items, however when i leave it running for some more time (Like for example after 40 seconds or 1 minute) then i get 2 exceptions which are: 
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0. 
These 2 exceptions result from the line where i'm checking if the iterator hasNext(). 

I wanted to know why do these exceptions happen in general and also if anyone knows a specific solution for my program, that would be great too.
Thanks,
Ahmed



--

Flavio Pompermaier
Development Department
_______________________________________________
OKKAMSrl www.okkam.it

Phone: +(39) 0461 283 702
Fax: + (39) 0461 186 6433
Email: [hidden email]
Headquarters: Trento (Italy), via G.B. Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.