Hello,
I have a TwitterSource and I'm applying some transformations as filter and map on the resulting stream from twitter. I'm collecting the output in an iterator: iterator = DataStreamUtils.collect(datastream). Then in a parallel thread i periodically check if this iterator.hasNext() and print the next item. I'm using Flink 1.0.3. That program works at the beginning and actually prints some items, however when i leave it running for some more time (Like for example after 40 seconds or 1 minute) then i get 2 exceptions which are: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0. These 2 exceptions result from the line where i'm checking if the iterator hasNext(). I wanted to know why do these exceptions happen in general and also if anyone knows a specific solution for my program, that would be great too. Thanks, Ahmed |
Hi Ahmed, the problem usually occurs, if you use differently initialized Kryo instances where one instance has a different set of classes registered. But your data could also be corrupted because you see an IndexOutOfBoundsException where you try to access an element of an array with size 0 at index 32. In order to debug the problem it would be helpful to see the full stack traces of the errors and the complete error message. Additionally, it would be helpful to see your program so that we could try to reproduce the problem. Cheers, Till On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <[hidden email]> wrote:
|
In reply to this post by Ahmed Nader
Hi Ahmed, I also have the same error that is probably caused by the KryoSerializer. Right now I'm testing a patch to this problem so maybe you could also test it. Unfortunately I'm using Flink 1.0.2 so I don't know whether you can use my KryoSerializer but I think so. Actually I just recreate Input and Output every time in the serialized/deserialize and then I close them. This is my attempt to fix the problem (actually the KryoSerializer class in the flink-core module): /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.api.java.typeutils.runtime.kryo; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.google.common.base.Preconditions; import org.apache.avro.generic.GenericData; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput; import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.objenesis.strategy.StdInstantiatorStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; import java.util.Objects; /** * A type serializer that serializes its type using the Kryo serialization * framework (https://github.com/EsotericSoftware/kryo). * * This serializer is intended as a fallback serializer for the cases that are * not covered by the basic types, tuples, and POJOs. * * @param <T> The type to be serialized. */ public class KryoSerializer<T> extends TypeSerializer<T> { private static final long serialVersionUID = 3L; private static final Logger LOG = LoggerFactory.getLogger(KryoSerializer.class); // ------------------------------------------------------------------------ private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers; private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses; private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> defaultSerializers; private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultSerializerClasses; private final LinkedHashSet<Class<?>> registeredTypes; private final Class<T> type; // ------------------------------------------------------------------------ // The fields below are lazily initialized after duplication or deserialization. private transient Kryo kryo; private transient T copyInstance; // ------------------------------------------------------------------------ public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){ this.type = Preconditions.checkNotNull(type); this.defaultSerializers = executionConfig.getDefaultKryoSerializers(); this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses(); this.registeredTypesWithSerializers = executionConfig.getRegisteredTypesWithKryoSerializers(); this.registeredTypesWithSerializerClasses = executionConfig.getRegisteredTypesWithKryoSerializerClasses(); this.registeredTypes = executionConfig.getRegisteredKryoTypes(); } /** * Copy-constructor that does not copy transient fields. They will be initialized once required. */ protected KryoSerializer(KryoSerializer<T> toCopy) { registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers; registeredTypesWithSerializerClasses = toCopy.registeredTypesWithSerializerClasses; defaultSerializers = toCopy.defaultSerializers; defaultSerializerClasses = toCopy.defaultSerializerClasses; registeredTypes = toCopy.registeredTypes; type = toCopy.type; if(type == null){ throw new NullPointerException("Type class cannot be null."); } } // ------------------------------------------------------------------------ @Override public boolean isImmutableType() { return false; } @Override public KryoSerializer<T> duplicate() { return new KryoSerializer<T>(this); } @Override public T createInstance() { if(Modifier.isAbstract(type.getModifiers()) || Modifier.isInterface(type.getModifiers()) ) { return null; } else { checkKryoInitialized(); try { return kryo.newInstance(type); } catch(Throwable e) { return null; } } } @SuppressWarnings("unchecked") @Override public T copy(T from) { if (from == null) { return null; } checkKryoInitialized(); try { return kryo.copy(from); } catch(KryoException ke) { // kryo was unable to copy it, so we do it through serialization: ByteArrayOutputStream baout = new ByteArrayOutputStream(); Output output = new Output(baout); kryo.writeObject(output, from); output.close(); ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray()); Input input = new Input(bain); return (T)kryo.readObject(input, from.getClass()); } } @Override public T copy(T from, T reuse) { return copy(from); } @Override public int getLength() { return -1; } @Override public void serialize(T record, DataOutputView target) throws IOException { checkKryoInitialized(); DataOutputViewStream outputStream = new DataOutputViewStream(target); Output output = new Output(outputStream); try { // Sanity check: Make sure that the output is cleared/has been flushed by the last call // otherwise data might be written multiple times in case of a previous EOFException if (output.position() != 0) { throw new IllegalStateException("The Kryo Output still contains data from a previous " + "serialize call. It has to be flushed or cleared at the end of the serialize call."); } kryo.writeClassAndObject(output, record); output.flush(); } catch (KryoException ke) { Throwable cause = ke.getCause(); if (cause instanceof EOFException) { throw (EOFException) cause; } else { throw ke; } } finally { try { output.close(); } catch (KryoException ke) { Throwable cause = ke.getCause(); if (cause instanceof EOFException) { throw (EOFException) cause; } else { throw ke; } } } } @SuppressWarnings("unchecked") @Override public T deserialize(DataInputView source) throws IOException { checkKryoInitialized(); DataInputViewStream inputStream = new DataInputViewStream(source); Input input = new NoFetchingInput(inputStream); try { return (T) kryo.readClassAndObject(input); } catch (KryoException ke) { Throwable cause = ke.getCause(); if (cause instanceof EOFException) { throw (EOFException) cause; } else { throw ke; } } finally { try { input.close(); } catch (KryoException ke) { Throwable cause = ke.getCause(); if (cause instanceof EOFException) { throw (EOFException) cause; } else { throw ke; } } } } @Override public T deserialize(T reuse, DataInputView source) throws IOException { return deserialize(source); } @Override public void copy(DataInputView source, DataOutputView target) throws IOException { checkKryoInitialized(); if(this.copyInstance == null){ this.copyInstance = createInstance(); } T tmp = deserialize(copyInstance, source); serialize(tmp, target); } // -------------------------------------------------------------------------------------------- @Override public int hashCode() { return Objects.hash( type, registeredTypes, registeredTypesWithSerializerClasses, defaultSerializerClasses); } @Override public boolean equals(Object obj) { if (obj instanceof KryoSerializer) { KryoSerializer<?> other = (KryoSerializer<?>) obj; // we cannot include the Serializers here because they don't implement the equals method return other.canEqual(this) && type == other.type && registeredTypes.equals(other.registeredTypes) && registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses) && defaultSerializerClasses.equals(other.defaultSerializerClasses); } else { return false; } } @Override public boolean canEqual(Object obj) { return obj instanceof KryoSerializer; } // -------------------------------------------------------------------------------------------- /** * Returns the Chill Kryo Serializer which is implictly added to the classpath via flink-runtime. * Falls back to the default Kryo serializer if it can't be found. * @return The Kryo serializer instance. */ private Kryo getKryoInstance() { try { // check if ScalaKryoInstantiator is in class path (coming from Twitter's Chill library). // This will be true if Flink's Scala API is used. Class<?> chillInstantiatorClazz = Class.forName("com.twitter.chill.ScalaKryoInstantiator"); Object chillInstantiator = chillInstantiatorClazz.newInstance(); // obtain a Kryo instance through Twitter Chill Method m = chillInstantiatorClazz.getMethod("newKryo"); return (Kryo) m.invoke(chillInstantiator); } catch (ClassNotFoundException | InstantiationException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { LOG.warn("Falling back to default Kryo serializer because Chill serializer couldn't be found.", e); Kryo.DefaultInstantiatorStrategy initStrategy = new Kryo.DefaultInstantiatorStrategy(); initStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); Kryo kryo = new Kryo(); kryo.setInstantiatorStrategy(initStrategy); return kryo; } } private void checkKryoInitialized() { if (this.kryo == null) { this.kryo = getKryoInstance(); // Enable reference tracking. kryo.setReferences(true); // Throwable and all subclasses should be serialized via java serialization kryo.addDefaultSerializer(Throwable.class, new JavaSerializer()); // Add default serializers first, so that they type registrations without a serializer // are registered with a default serializer for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry: defaultSerializers.entrySet()) { kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer()); } for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> entry: defaultSerializerClasses.entrySet()) { kryo.addDefaultSerializer(entry.getKey(), entry.getValue()); } // register the type of our class kryo.register(type); // register given types. we do this first so that any registration of a // more specific serializer overrides this for (Class<?> type : registeredTypes) { kryo.register(type); } // register given serializer classes for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e : registeredTypesWithSerializerClasses.entrySet()) { Class<?> typeClass = e.getKey(); Class<? extends Serializer<?>> serializerClass = e.getValue(); Serializer<?> serializer = ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass); kryo.register(typeClass, serializer); } // register given serializers for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e : registeredTypesWithSerializers.entrySet()) { kryo.register(e.getKey(), e.getValue().getSerializer()); } // this is needed for Avro but can not be added on demand. kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializerForArrayList()); kryo.setRegistrationRequired(false); kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); } } // -------------------------------------------------------------------------------------------- // For testing // -------------------------------------------------------------------------------------------- public Kryo getKryo() { checkKryoInitialized(); return this.kryo; } } Best, Flavio On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <[hidden email]> wrote:
Flavio Pompermaier Phone: +(39) 0461 283 702 |
In reply to this post by Till Rohrmann
Hello Till, Thanks so much for your reply. Here's my program: So that's TwitterSource: public class TwitterSource extends Stream { then i initialize it: List<Object> globalEntities = new ArrayList<>();public void runModel(String key) throws Exception { public static class processTweets extends JSONParseFlatMap<Object, Object> { On 8 June 2016 at 16:07, Till Rohrmann <[hidden email]> wrote:
|
In reply to this post by Flavio Pompermaier
Hello Flavio, Thank you so much for replying, however I didn't download Flink locally, I only added dependencies in a maven project. So i don't think I'll be able to modify the KryoSerializer class. But yeah me too i think it's the problem. Thanks, Ahmed On 8 June 2016 at 16:07, Flavio Pompermaier <[hidden email]> wrote:
|
Hi Ahmed, I tried setting up your use case and for me it all seems to work. However, I didn't use the Spring framework and executed the program in a local Flink cluster. Maybe you can compile a self-containing example (including example data) to reproduce your problem and send it to us. Cheers, Till On Wed, Jun 8, 2016 at 5:48 PM, Ahmed Nader <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |