Null pointer exception while trying to serialize a protobuf message

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Null pointer exception while trying to serialize a protobuf message

Sridhar Chellappa
Folks,

I wrote a custom Data source to test me CEP logic. The custom data source looks like :

public class CustomerDataSource extends RichParallelSourceFunction<Customer> {
private boolean running = true;
private final Random random;

public CustomerDataSource() {
this.random = new Random();
}

@Override
public void run(SourceContext<CustomerMessage> ctx) throws Exception {
while (running) {
new CustomerDataGen().generateMessages().
forEach(element -> ctx.collect(element));

Thread.sleep(10000);
}
}

@Override
public void cancel() {
running = false;
}
}

public class CustomerDataGen {

public CustomerDataGen() {
this.random = new Random();
}
@Override
public List<CustomerMessage> generateMessages() throws InterruptedException {
List<CustomerMessage> messages = new ArrayList<CustomerMessage>();

messages.add(getMessage());
return messages;
}

private CustomerMessage getMessage() {
Instant time = Instant.now();
Timestamp eventTimeStamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(0).build();
Timestamp creationTimeStamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond() -1).setNanos(0).build();
return CustomerMessage.newBuilder().
setName("SomeCustomer").
setEventTimestamp(eventTimeStamp).
setCustomerId("01234").
addAllRegisteredPhoneNumbers(Arrays.asList("+9185216741972")).
 setEmail("[hidden email]").
build();
}
}

In my Main program :

.........
env.getConfig().registerTypeWithKryoSerializer(CustomerMessage.class, ProtobufSerializer.class);
env.addSource(new CustomerDataSource());
env.execute();


When I run the program, I get the following exception :


Caused by: java.lang.NullPointerException
at shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
at java.util.AbstractList.add(AbstractList.java:108)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
at com.sample.datasources.custom.CustomertDataSource.lambda$run$0(CustomerDataSource.java:24)
at java.util.ArrayList.forEach(ArrayList.java:1249)
at com.sample.datasources.custom.CustomertDataSource.run(CustomerDataSource.java:24)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
I am having a tough time figuring out why. Can someone help me out as to where am I going wrong?

Reply | Threaded
Open this post in threaded view
|

Re: Null pointer exception while trying to serialize a protobuf message

Ted Yu
Can you show how CustomerMessage is defined ?

Thanks

On Fri, Aug 4, 2017 at 7:22 AM, Sridhar Chellappa <[hidden email]> wrote:
Folks,

I wrote a custom Data source to test me CEP logic. The custom data source looks like :

public class CustomerDataSource extends RichParallelSourceFunction<Customer> {
private boolean running = true;
private final Random random;

public CustomerDataSource() {
this.random = new Random();
}

@Override
public void run(SourceContext<CustomerMessage> ctx) throws Exception {
while (running) {
new CustomerDataGen().generateMessages().
forEach(element -> ctx.collect(element));

Thread.sleep(10000);
}
}

@Override
public void cancel() {
running = false;
}
}

public class CustomerDataGen {

public CustomerDataGen() {
this.random = new Random();
}
@Override
public List<CustomerMessage> generateMessages() throws InterruptedException {
List<CustomerMessage> messages = new ArrayList<CustomerMessage>();

messages.add(getMessage());
return messages;
}

private CustomerMessage getMessage() {
Instant time = Instant.now();
Timestamp eventTimeStamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(0).build();
Timestamp creationTimeStamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond() -1).setNanos(0).build();
return CustomerMessage.newBuilder().
setName("SomeCustomer").
setEventTimestamp(eventTimeStamp).
setCustomerId("01234").
addAllRegisteredPhoneNumbers(Arrays.asList("+9185216741972")).
 setEmail("[hidden email]").
build();
}
}

In my Main program :

.........
env.getConfig().registerTypeWithKryoSerializer(CustomerMessage.class, ProtobufSerializer.class);
env.addSource(new CustomerDataSource());
env.execute();


When I run the program, I get the following exception :


Caused by: java.lang.NullPointerException
at shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
at java.util.AbstractList.add(AbstractList.java:108)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
at com.sample.datasources.custom.CustomertDataSource.lambda$run$0(CustomerDataSource.java:24)
at java.util.ArrayList.forEach(ArrayList.java:1249)
at com.sample.datasources.custom.CustomertDataSource.run(CustomerDataSource.java:24)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
I am having a tough time figuring out why. Can someone help me out as to where am I going wrong?


Reply | Threaded
Open this post in threaded view
|

Re: Null pointer exception while trying to serialize a protobuf message

Ted Yu
I searched in Flink (and hbase) for GeneratedMessageV3 but didn't find any reference.

Which version of protobuf did you use to generate the class ?

Please copy user@ in the future so that more people can help.

On Fri, Aug 4, 2017 at 8:27 AM, Sridhar Chellappa <[hidden email]> wrote:
public final class CustomerMessage extends GeneratedMessageV3 implements CustomerMessageOrBuilder {
    private int bitField0_;
    public static final int CUSTOMER_ID_FIELD_NUMBER = 1;
    private volatile Object customerId_;
    public static final int EVENT_TIMESTAMP_FIELD_NUMBER = 2;
    private Timestamp eventTimestamp_;
    public static final int NAME_FIELD_NUMBER = 4;
    private volatile Object name_;
    public static final int EMAIL_FIELD_NUMBER = 5;
    private volatile Object email_;
    public static final int PHONE_FIELD_NUMBER = 6;
    private volatile Object phone_;
    public static final int DEVICE_ID_LOGGED_IN_FROM_FIELD_NUMBER = 7;
    private volatile Object deviceIdLoggedInFrom_;
    public static final int REGISTERED_PHONES_FIELD_NUMBER = 8;
    private LazyStringList registeredPhones_;
    private byte memoizedIsInitialized;
    private static final long serialVersionUID = 0L;
    private static final CustomerMessage DEFAULT_INSTANCE = new CustomerMessage();
    private static final Parser<CustomerMessage> PARSER = new AbstractParser() {
        public CustomerMessage parsePartialFrom(CodedInputStream input, ExtensionRegistryLite extensionRegistry) throws InvalidProtocolBufferException {
            return new CustomerMessage(input, extensionRegistry, null);
        }
    };

    private CustomerMessage(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) {
        super(builder);
        this.memoizedIsInitialized = -1;
    }

    private CustomerMessage() {
        this.memoizedIsInitialized = -1;
        this.customerId_ = "";
        this.name_ = "";
        this.email_ = "";
        this.phone_ = "";
        this.deviceIdLoggedInFrom_ = "";
        this.registeredPhones_ = LazyStringArrayList.EMPTY;
    }

    public final UnknownFieldSet getUnknownFields() {
        return UnknownFieldSet.getDefaultInstance();
    }

    private CustomerMessage(CodedInputStream input, ExtensionRegistryLite extensionRegistry) throws InvalidProtocolBufferException {
        this();
        int mutable_bitField0_ = 0;

        try {
            boolean e = false;

            while(!e) {
                int tag = input.readTag();
                String s;
                switch(tag) {
                case 0:
                    e = true;
                    break;
                case 10:
                    s = input.readStringRequireUtf8();
                    this.customerId_ = s;
                    break;
                case 18:
                    com.google.protobuf.Timestamp.Builder s1 = null;
                    if(this.eventTimestamp_ != null) {
                        s1 = this.eventTimestamp_.toBuilder();
                    }

                    this.eventTimestamp_ = (Timestamp)input.readMessage(Timestamp.parser(), extensionRegistry);
                    if(s1 != null) {
                        s1.mergeFrom(this.eventTimestamp_);
                        this.eventTimestamp_ = s1.buildPartial();
                    }
                    break;
                case 34:
                    s = input.readStringRequireUtf8();
                    this.name_ = s;
                    break;
                case 42:
                    s = input.readStringRequireUtf8();
                    this.email_ = s;
                    break;
                case 50:
                    s = input.readStringRequireUtf8();
                    this.phone_ = s;
                    break;
                case 58:
                    s = input.readStringRequireUtf8();
                    this.deviceIdLoggedInFrom_ = s;
                    break;
                case 66:
                    s = input.readStringRequireUtf8();
                    if((mutable_bitField0_ & 64) != 64) {
                        this.registeredPhones_ = new LazyStringArrayList();
                        mutable_bitField0_ |= 64;
                    }

                    this.registeredPhones_.add(s);
                    break;
                default:
                    if(!input.skipField(tag)) {
                        e = true;
                    }
                }
            }
        } catch (InvalidProtocolBufferException var11) {
            throw var11.setUnfinishedMessage(this);
        } catch (IOException var12) {
            throw (new InvalidProtocolBufferException(var12)).setUnfinishedMessage(this);
        } finally {
            if((mutable_bitField0_ & 64) == 64) {
                this.registeredPhones_ = this.registeredPhones_.getUnmodifiableView();
            }

            this.makeExtensionsImmutable();
        }

    }

    public static final Descriptor getDescriptor() {
        return CustomerLoginProto.internal_static_gojek_esb_customer_CustomerMessage_descriptor;
    }

    protected FieldAccessorTable internalGetFieldAccessorTable() {
        return CustomerLoginProto.internal_static_gojek_esb_customer_CustomerMessage_fieldAccessorTable.ensureFieldAccessorsInitialized(CustomerMessage.class, CustomerMessage.Builder.class);
    }

    public String getCustomerId() {
        Object ref = this.customerId_;
        if(ref instanceof String) {
            return (String)ref;
        } else {
            ByteString bs = (ByteString)ref;
            String s = bs.toStringUtf8();
            this.customerId_ = s;
            return s;
        }
    }

    public ByteString getCustomerIdBytes() {
        Object ref = this.customerId_;
        if(ref instanceof String) {
            ByteString b = ByteString.copyFromUtf8((String)ref);
            this.customerId_ = b;
            return b;
        } else {
            return (ByteString)ref;
        }
    }

    public boolean hasEventTimestamp() {
        return this.eventTimestamp_ != null;
    }

    public Timestamp getEventTimestamp() {
        return this.eventTimestamp_ == null?Timestamp.getDefaultInstance():this.eventTimestamp_;
    }

    public TimestampOrBuilder getEventTimestampOrBuilder() {
        return this.getEventTimestamp();
    }

    public String getName() {
        Object ref = this.name_;
        if(ref instanceof String) {
            return (String)ref;
        } else {
            ByteString bs = (ByteString)ref;
            String s = bs.toStringUtf8();
            this.name_ = s;
            return s;
        }
    }

    public ByteString getNameBytes() {
        Object ref = this.name_;
        if(ref instanceof String) {
            ByteString b = ByteString.copyFromUtf8((String)ref);
            this.name_ = b;
            return b;
        } else {
            return (ByteString)ref;
        }
    }

    public String getEmail() {
        Object ref = this.email_;
        if(ref instanceof String) {
            return (String)ref;
        } else {
            ByteString bs = (ByteString)ref;
            String s = bs.toStringUtf8();
            this.email_ = s;
            return s;
        }
    }

    public ByteString getEmailBytes() {
        Object ref = this.email_;
        if(ref instanceof String) {
            ByteString b = ByteString.copyFromUtf8((String)ref);
            this.email_ = b;
            return b;
        } else {
            return (ByteString)ref;
        }
    }

    public String getPhone() {
        Object ref = this.phone_;
        if(ref instanceof String) {
            return (String)ref;
        } else {
            ByteString bs = (ByteString)ref;
            String s = bs.toStringUtf8();
            this.phone_ = s;
            return s;
        }
    }

    public ByteString getPhoneBytes() {
        Object ref = this.phone_;
        if(ref instanceof String) {
            ByteString b = ByteString.copyFromUtf8((String)ref);
            this.phone_ = b;
            return b;
        } else {
            return (ByteString)ref;
        }
    }

    public String getDeviceIdLoggedInFrom() {
        Object ref = this.deviceIdLoggedInFrom_;
        if(ref instanceof String) {
            return (String)ref;
        } else {
            ByteString bs = (ByteString)ref;
            String s = bs.toStringUtf8();
            this.deviceIdLoggedInFrom_ = s;
            return s;
        }
    }

    public ByteString getDeviceIdLoggedInFromBytes() {
        Object ref = this.deviceIdLoggedInFrom_;
        if(ref instanceof String) {
            ByteString b = ByteString.copyFromUtf8((String)ref);
            this.deviceIdLoggedInFrom_ = b;
            return b;
        } else {
            return (ByteString)ref;
        }
    }

    public ProtocolStringList getRegisteredDevicesList() {
        return this.registeredPhones_;
    }

    public int getRegisteredDevicesCount() {
        return this.registeredPhones_.size();
    }

    public String getRegisteredDevices(int index) {
        return (String)this.registeredPhones_.get(index);
    }

    public ByteString getRegisteredDevicesBytes(int index) {
        return this.registeredPhones_.getByteString(index);
    }

    public final boolean isInitialized() {
        byte isInitialized = this.memoizedIsInitialized;
        if(isInitialized == 1) {
            return true;
        } else if(isInitialized == 0) {
            return false;
        } else {
            this.memoizedIsInitialized = 1;
            return true;
        }
    }

    public void writeTo(CodedOutputStream output) throws IOException {
        if(!this.getCustomerIdBytes().isEmpty()) {
            GeneratedMessageV3.writeString(output, 1, this.customerId_);
        }

        if(this.eventTimestamp_ != null) {
            output.writeMessage(2, this.getEventTimestamp());
        }

        if(!this.getNameBytes().isEmpty()) {
            GeneratedMessageV3.writeString(output, 4, this.name_);
        }

        if(!this.getEmailBytes().isEmpty()) {
            GeneratedMessageV3.writeString(output, 5, this.email_);
        }

        if(!this.getPhoneBytes().isEmpty()) {
            GeneratedMessageV3.writeString(output, 6, this.phone_);
        }

        if(!this.getDeviceIdLoggedInFromBytes().isEmpty()) {
            GeneratedMessageV3.writeString(output, 7, this.deviceIdLoggedInFrom_);
        }

        for(int i = 0; i < this.registeredPhones_.size(); ++i) {
            GeneratedMessageV3.writeString(output, 8, this.registeredPhones_.getRaw(i));
        }

    }

    public int getSerializedSize() {
        int size = this.memoizedSize;
        if(size != -1) {
            return size;
        } else {
            size = 0;
            if(!this.getCustomerIdBytes().isEmpty()) {
                size += GeneratedMessageV3.computeStringSize(1, this.customerId_);
            }

            if(this.eventTimestamp_ != null) {
                size += CodedOutputStream.computeMessageSize(2, this.getEventTimestamp());
            }

            if(!this.getNameBytes().isEmpty()) {
                size += GeneratedMessageV3.computeStringSize(4, this.name_);
            }

            if(!this.getEmailBytes().isEmpty()) {
                size += GeneratedMessageV3.computeStringSize(5, this.email_);
            }

            if(!this.getPhoneBytes().isEmpty()) {
                size += GeneratedMessageV3.computeStringSize(6, this.phone_);
            }

            if(!this.getDeviceIdLoggedInFromBytes().isEmpty()) {
                size += GeneratedMessageV3.computeStringSize(7, this.deviceIdLoggedInFrom_);
            }

            int dataSize = 0;

            for(int i = 0; i < this.registeredPhones_.size(); ++i) {
                dataSize += computeStringSizeNoTag(this.registeredPhones_.getRaw(i));
            }

            size += dataSize;
            size += 1 * this.getRegisteredDevicesList().size();
            this.memoizedSize = size;
            return size;
        }
    }

    public boolean equals(Object obj) {
        if(obj == this) {
            return true;
        } else if(!(obj instanceof CustomerMessage)) {
            return super.equals(obj);
        } else {
            CustomerMessage other = (CustomerMessage)obj;
            boolean result = true;
            result = result && this.getCustomerId().equals(other.getCustomerId());
            result = result && this.hasEventTimestamp() == other.hasEventTimestamp();
            if(this.hasEventTimestamp()) {
                result = result && this.getEventTimestamp().equals(other.getEventTimestamp());
            }

            result = result && this.getName().equals(other.getName());
            result = result && this.getEmail().equals(other.getEmail());
            result = result && this.getPhone().equals(other.getPhone());
            result = result && this.getDeviceIdLoggedInFrom().equals(other.getDeviceIdLoggedInFrom());
            result = result && this.getRegisteredDevicesList().equals(other.getRegisteredDevicesList());
            return result;
        }
    }

    public int hashCode() {
        if(this.memoizedHashCode != 0) {
            return this.memoizedHashCode;
        } else {
            byte hash = 41;
            int hash1 = 19 * hash + this.getDescriptorForType().hashCode();
            hash1 = 37 * hash1 + 1;
            hash1 = 53 * hash1 + this.getCustomerId().hashCode();
            if(this.hasEventTimestamp()) {
                hash1 = 37 * hash1 + 2;
                hash1 = 53 * hash1 + this.getEventTimestamp().hashCode();
            }

            hash1 = 37 * hash1 + 4;
            hash1 = 53 * hash1 + this.getName().hashCode();
            hash1 = 37 * hash1 + 5;
            hash1 = 53 * hash1 + this.getEmail().hashCode();
            hash1 = 37 * hash1 + 6;
            hash1 = 53 * hash1 + this.getPhone().hashCode();
            hash1 = 37 * hash1 + 7;
            hash1 = 53 * hash1 + this.getDeviceIdLoggedInFrom().hashCode();
            if(this.getRegisteredDevicesCount() > 0) {
                hash1 = 37 * hash1 + 8;
                hash1 = 53 * hash1 + this.getRegisteredDevicesList().hashCode();
            }

            hash1 = 29 * hash1 + this.unknownFields.hashCode();
            this.memoizedHashCode = hash1;
            return hash1;
        }
    }

    public static CustomerMessage parseFrom(ByteString data) throws InvalidProtocolBufferException {
        return (CustomerMessage)PARSER.parseFrom(data);
    }

    public static CustomerMessage parseFrom(ByteString data, ExtensionRegistryLite extensionRegistry) throws InvalidProtocolBufferException {
        return (CustomerMessage)PARSER.parseFrom(data, extensionRegistry);
    }

    public static CustomerMessage parseFrom(byte[] data) throws InvalidProtocolBufferException {
        return (CustomerMessage)PARSER.parseFrom(data);
    }

    public static CustomerMessage parseFrom(byte[] data, ExtensionRegistryLite extensionRegistry) throws InvalidProtocolBufferException {
        return (CustomerMessage)PARSER.parseFrom(data, extensionRegistry);
    }

    public static CustomerMessage parseFrom(InputStream input) throws IOException {
        return (CustomerMessage)GeneratedMessageV3.parseWithIOException(PARSER, input);
    }

    public static CustomerMessage parseFrom(InputStream input, ExtensionRegistryLite extensionRegistry) throws IOException {
        return (CustomerMessage)GeneratedMessageV3.parseWithIOException(PARSER, input, extensionRegistry);
    }

    public static CustomerMessage parseDelimitedFrom(InputStream input) throws IOException {
        return (CustomerMessage)GeneratedMessageV3.parseDelimitedWithIOException(PARSER, input);
    }

    public static CustomerMessage parseDelimitedFrom(InputStream input, ExtensionRegistryLite extensionRegistry) throws IOException {
        return (CustomerMessage)GeneratedMessageV3.parseDelimitedWithIOException(PARSER, input, extensionRegistry);
    }

    public static CustomerMessage parseFrom(CodedInputStream input) throws IOException {
        return (CustomerMessage)GeneratedMessageV3.parseWithIOException(PARSER, input);
    }

    public static CustomerMessage parseFrom(CodedInputStream input, ExtensionRegistryLite extensionRegistry) throws IOException {
        return (CustomerMessage)GeneratedMessageV3.parseWithIOException(PARSER, input, extensionRegistry);
    }

    public CustomerMessage.Builder newBuilderForType() {
        return newBuilder();
    }

    public static CustomerMessage.Builder newBuilder() {
        return DEFAULT_INSTANCE.toBuilder();
    }

    public static CustomerMessage.Builder newBuilder(CustomerMessage prototype) {
        return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
    }

    public CustomerMessage.Builder toBuilder() {
        return this == DEFAULT_INSTANCE?new CustomerMessage.Builder(null):(new CustomerMessage.Builder(null)).mergeFrom(this);
    }

    protected CustomerMessage.Builder newBuilderForType(BuilderParent parent) {
        CustomerMessage.Builder builder = new CustomerMessage.Builder(parent, null);
        return builder;
    }

    public static CustomerMessage getDefaultInstance() {
        return DEFAULT_INSTANCE;
    }

    public static Parser<CustomerMessage> parser() {
        return PARSER;
    }

    public Parser<CustomerMessage> getParserForType() {
        return PARSER;
    }

    public CustomerMessage getDefaultInstanceForType() {
        return DEFAULT_INSTANCE;
    }

    public static final class Builder extends com.google.protobuf.GeneratedMessageV3.Builder<CustomerMessage.Builder> implements CustomerMessageOrBuilder {
        private int bitField0_;
        private Object customerId_;
        private Timestamp eventTimestamp_;
        private SingleFieldBuilderV3<Timestamp, com.google.protobuf.Timestamp.Builder, TimestampOrBuilder> eventTimestampBuilder_;
        private Object name_;
        private Object email_;
        private Object phone_;
        private Object deviceIdLoggedInFrom_;
        private LazyStringList registeredPhones_;

        public static final Descriptor getDescriptor() {
            return CustomerLoginProto.internal_static_gojek_esb_customer_CustomerMessage_descriptor;
        }

        protected FieldAccessorTable internalGetFieldAccessorTable() {
            return CustomerLoginProto.internal_static_gojek_esb_customer_CustomerMessage_fieldAccessorTable.ensureFieldAccessorsInitialized(CustomerMessage.class, CustomerMessage.Builder.class);
        }

        private Builder() {
            this.customerId_ = "";
            this.eventTimestamp_ = null;
            this.name_ = "";
            this.email_ = "";
            this.phone_ = "";
            this.deviceIdLoggedInFrom_ = "";
            this.registeredPhones_ = LazyStringArrayList.EMPTY;
            this.maybeForceBuilderInitialization();
        }

        private Builder(BuilderParent parent) {
            super(parent);
            this.customerId_ = "";
            this.eventTimestamp_ = null;
            this.name_ = "";
            this.email_ = "";
            this.phone_ = "";
            this.deviceIdLoggedInFrom_ = "";
            this.registeredPhones_ = LazyStringArrayList.EMPTY;
            this.maybeForceBuilderInitialization();
        }

        private void maybeForceBuilderInitialization() {
            if(CustomerMessage.alwaysUseFieldBuilders) {
                ;
            }

        }

        public CustomerMessage.Builder clear() {
            super.clear();
            this.customerId_ = "";
            if(this.eventTimestampBuilder_ == null) {
                this.eventTimestamp_ = null;
            } else {
                this.eventTimestamp_ = null;
                this.eventTimestampBuilder_ = null;
            }

            this.name_ = "";
            this.email_ = "";
            this.phone_ = "";
            this.deviceIdLoggedInFrom_ = "";
            this.registeredPhones_ = LazyStringArrayList.EMPTY;
            this.bitField0_ &= -65;
            return this;
        }

        public Descriptor getDescriptorForType() {
            return CustomerLoginProto.internal_static_gojek_esb_customer_CustomerMessage_descriptor;
        }

        public CustomerMessage getDefaultInstanceForType() {
            return CustomerMessage.DEFAULT_INSTANCE;
        }

        public CustomerMessage build() {
            CustomerMessage result = this.buildPartial();
            if(!result.isInitialized()) {
                throw newUninitializedMessageException(result);
            } else {
                return result;
            }
        }

        public CustomerMessage buildPartial() {
            CustomerMessage result = new CustomerMessage(this, null);
            int from_bitField0_ = this.bitField0_;
            byte to_bitField0_ = 0;
            result.customerId_ = this.customerId_;
            if(this.eventTimestampBuilder_ == null) {
                result.eventTimestamp_ = this.eventTimestamp_;
            } else {
                result.eventTimestamp_ = (Timestamp)this.eventTimestampBuilder_.build();
            }

            result.name_ = this.name_;
            result.email_ = this.email_;
            result.phone_ = this.phone_;
            result.deviceIdLoggedInFrom_ = this.deviceIdLoggedInFrom_;
            if((this.bitField0_ & 64) == 64) {
                this.registeredPhones_ = this.registeredPhones_.getUnmodifiableView();
                this.bitField0_ &= -65;
            }

            result.registeredPhones_ = this.registeredPhones_;
            result.bitField0_ = to_bitField0_;
            this.onBuilt();
            return result;
        }

        public CustomerMessage.Builder clone() {
            return (CustomerMessage.Builder)super.clone();
        }

        public CustomerMessage.Builder setField(FieldDescriptor field, Object value) {
            return (CustomerMessage.Builder)super.setField(field, value);
        }

        public CustomerMessage.Builder clearField(FieldDescriptor field) {
            return (CustomerMessage.Builder)super.clearField(field);
        }

        public CustomerMessage.Builder clearOneof(OneofDescriptor oneof) {
            return (CustomerMessage.Builder)super.clearOneof(oneof);
        }

        public CustomerMessage.Builder setRepeatedField(FieldDescriptor field, int index, Object value) {
            return (CustomerMessage.Builder)super.setRepeatedField(field, index, value);
        }

        public CustomerMessage.Builder addRepeatedField(FieldDescriptor field, Object value) {
            return (CustomerMessage.Builder)super.addRepeatedField(field, value);
        }

        public CustomerMessage.Builder mergeFrom(Message other) {
            if(other instanceof CustomerMessage) {
                return this.mergeFrom((CustomerMessage)other);
            } else {
                super.mergeFrom(other);
                return this;
            }
        }

        public CustomerMessage.Builder mergeFrom(CustomerMessage other) {
            if(other == CustomerMessage.DEFAULT_INSTANCE) {
                return this;
            } else {
                if(!other.getCustomerId().isEmpty()) {
                    this.customerId_ = other.customerId_;
                    this.onChanged();
                }

                if(other.hasEventTimestamp()) {
                    this.mergeEventTimestamp(other.getEventTimestamp());
                }

                if(!other.getName().isEmpty()) {
                    this.name_ = other.name_;
                    this.onChanged();
                }

                if(!other.getEmail().isEmpty()) {
                    this.email_ = other.email_;
                    this.onChanged();
                }

                if(!other.getPhone().isEmpty()) {
                    this.phone_ = other.phone_;
                    this.onChanged();
                }

                if(!other.getDeviceIdLoggedInFrom().isEmpty()) {
                    this.deviceIdLoggedInFrom_ = other.deviceIdLoggedInFrom_;
                    this.onChanged();
                }

                if(!other.registeredPhones_.isEmpty()) {
                    if(this.registeredPhones_.isEmpty()) {
                        this.registeredPhones_ = other.registeredPhones_;
                        this.bitField0_ &= -65;
                    } else {
                        this.ensureRegisteredDevicesIsMutable();
                        this.registeredPhones_.addAll(other.registeredPhones_);
                    }

                    this.onChanged();
                }

                this.onChanged();
                return this;
            }
        }

        public final boolean isInitialized() {
            return true;
        }

        public CustomerMessage.Builder mergeFrom(CodedInputStream input, ExtensionRegistryLite extensionRegistry) throws IOException {
            CustomerMessage parsedMessage = null;

            try {
                parsedMessage = (CustomerMessage)CustomerMessage.PARSER.parsePartialFrom(input, extensionRegistry);
            } catch (InvalidProtocolBufferException var8) {
                parsedMessage = (CustomerMessage)var8.getUnfinishedMessage();
                throw var8.unwrapIOException();
            } finally {
                if(parsedMessage != null) {
                    this.mergeFrom(parsedMessage);
                }

            }

            return this;
        }

        public String getCustomerId() {
            Object ref = this.customerId_;
            if(!(ref instanceof String)) {
                ByteString bs = (ByteString)ref;
                String s = bs.toStringUtf8();
                this.customerId_ = s;
                return s;
            } else {
                return (String)ref;
            }
        }

        public ByteString getCustomerIdBytes() {
            Object ref = this.customerId_;
            if(ref instanceof String) {
                ByteString b = ByteString.copyFromUtf8((String)ref);
                this.customerId_ = b;
                return b;
            } else {
                return (ByteString)ref;
            }
        }

        public CustomerMessage.Builder setCustomerId(String value) {
            if(value == null) {
                throw new NullPointerException();
            } else {
                this.customerId_ = value;
                this.onChanged();
                return this;
            }
        }

        public CustomerMessage.Builder clearCustomerId() {
            this.customerId_ = CustomerMessage.DEFAULT_INSTANCE.getCustomerId();
            this.onChanged();
            return this;
        }

        public CustomerMessage.Builder setCustomerIdBytes(ByteString value) {
            if(value == null) {
                throw new NullPointerException();
            } else {
                CustomerMessage.checkByteStringIsUtf8(value);
                this.customerId_ = value;
                this.onChanged();
                return this;
            }
        }

        public boolean hasEventTimestamp() {
            return this.eventTimestampBuilder_ != null || this.eventTimestamp_ != null;
        }

        public Timestamp getEventTimestamp() {
            return this.eventTimestampBuilder_ == null?(this.eventTimestamp_ == null?Timestamp.getDefaultInstance():this.eventTimestamp_):(Timestamp)this.eventTimestampBuilder_.getMessage();
        }

        public CustomerMessage.Builder setEventTimestamp(Timestamp value) {
            if(this.eventTimestampBuilder_ == null) {
                if(value == null) {
                    throw new NullPointerException();
                }

                this.eventTimestamp_ = value;
                this.onChanged();
            } else {
                this.eventTimestampBuilder_.setMessage(value);
            }

            return this;
        }

        public CustomerMessage.Builder setEventTimestamp(com.google.protobuf.Timestamp.Builder builderForValue) {
            if(this.eventTimestampBuilder_ == null) {
                this.eventTimestamp_ = builderForValue.build();
                this.onChanged();
            } else {
                this.eventTimestampBuilder_.setMessage(builderForValue.build());
            }

            return this;
        }

        public CustomerMessage.Builder mergeEventTimestamp(Timestamp value) {
            if(this.eventTimestampBuilder_ == null) {
                if(this.eventTimestamp_ != null) {
                    this.eventTimestamp_ = Timestamp.newBuilder(this.eventTimestamp_).mergeFrom(value).buildPartial();
                } else {
                    this.eventTimestamp_ = value;
                }

                this.onChanged();
            } else {
                this.eventTimestampBuilder_.mergeFrom(value);
            }

            return this;
        }

        public CustomerMessage.Builder clearEventTimestamp() {
            if(this.eventTimestampBuilder_ == null) {
                this.eventTimestamp_ = null;
                this.onChanged();
            } else {
                this.eventTimestamp_ = null;
                this.eventTimestampBuilder_ = null;
            }

            return this;
        }

        public com.google.protobuf.Timestamp.Builder getEventTimestampBuilder() {
            this.onChanged();
            return (com.google.protobuf.Timestamp.Builder)this.getEventTimestampFieldBuilder().getBuilder();
        }

        public TimestampOrBuilder getEventTimestampOrBuilder() {
            return (TimestampOrBuilder)(this.eventTimestampBuilder_ != null?(TimestampOrBuilder)this.eventTimestampBuilder_.getMessageOrBuilder():(this.eventTimestamp_ == null?Timestamp.getDefaultInstance():this.eventTimestamp_));
        }

        private SingleFieldBuilderV3<Timestamp, com.google.protobuf.Timestamp.Builder, TimestampOrBuilder> getEventTimestampFieldBuilder() {
            if(this.eventTimestampBuilder_ == null) {
                this.eventTimestampBuilder_ = new SingleFieldBuilderV3(this.getEventTimestamp(), this.getParentForChildren(), this.isClean());
                this.eventTimestamp_ = null;
            }

            return this.eventTimestampBuilder_;
        }

        public String getName() {
            Object ref = this.name_;
            if(!(ref instanceof String)) {
                ByteString bs = (ByteString)ref;
                String s = bs.toStringUtf8();
                this.name_ = s;
                return s;
            } else {
                return (String)ref;
            }
        }

        public ByteString getNameBytes() {
            Object ref = this.name_;
            if(ref instanceof String) {
                ByteString b = ByteString.copyFromUtf8((String)ref);
                this.name_ = b;
                return b;
            } else {
                return (ByteString)ref;
            }
        }

        public CustomerMessage.Builder setName(String value) {
            if(value == null) {
                throw new NullPointerException();
            } else {
                this.name_ = value;
                this.onChanged();
                return this;
            }
        }

        public CustomerMessage.Builder clearName() {
            this.name_ = CustomerMessage.DEFAULT_INSTANCE.getName();
            this.onChanged();
            return this;
        }

        public CustomerMessage.Builder setNameBytes(ByteString value) {
            if(value == null) {
                throw new NullPointerException();
            } else {
                CustomerMessage.checkByteStringIsUtf8(value);
                this.name_ = value;
                this.onChanged();
                return this;
            }
        }

        public String getEmail() {
            Object ref = this.email_;
            if(!(ref instanceof String)) {
                ByteString bs = (ByteString)ref;
                String s = bs.toStringUtf8();
                this.email_ = s;
                return s;
            } else {
                return (String)ref;
            }
        }

        public ByteString getEmailBytes() {
            Object ref = this.email_;
            if(ref instanceof String) {
                ByteString b = ByteString.copyFromUtf8((String)ref);
                this.email_ = b;
                return b;
            } else {
                return (ByteString)ref;
            }
        }

        public CustomerMessage.Builder setEmail(String value) {
            if(value == null) {
                throw new NullPointerException();
            } else {
                this.email_ = value;
                this.onChanged();
                return this;
            }
        }

        public CustomerMessage.Builder clearEmail() {
            this.email_ = CustomerMessage.DEFAULT_INSTANCE.getEmail();
            this.onChanged();
            return this;
        }

        public CustomerMessage.Builder setEmailBytes(ByteString value) {
            if(value == null) {
                throw new NullPointerException();
            } else {
                CustomerMessage.checkByteStringIsUtf8(value);
                this.email_ = value;
                this.onChanged();
                return this;
            }
        }

        public String getPhone() {
            Object ref = this.phone_;
            if(!(ref instanceof String)) {
                ByteString bs = (ByteString)ref;
                String s = bs.toStringUtf8();
                this.phone_ = s;
                return s;
            } else {
                return (String)ref;
            }
        }

        public ByteString getPhoneBytes() {
            Object ref = this.phone_;
            if(ref instanceof String) {
                ByteString b = ByteString.copyFromUtf8((String)ref);
                this.phone_ = b;
                return b;
            } else {
                return (ByteString)ref;
            }
        }

        public CustomerMessage.Builder setPhone(String value) {
            if(value == null) {
                throw new NullPointerException();
            } else {
                this.phone_ = value;
                this.onChanged();
                return this;
            }
        }

        public CustomerMessage.Builder clearPhone() {
            this.phone_ = CustomerMessage.DEFAULT_INSTANCE.getPhone();
            this.onChanged();
            return this;
        }

        public CustomerMessage.Builder setPhoneBytes(ByteString value) {
            if(value == null) {
                throw new NullPointerException();
            } else {
                CustomerMessage.checkByteStringIsUtf8(value);
                this.phone_ = value;
                this.onChanged();
                return this;
            }
        }

        public String getDeviceIdLoggedInFrom() {
            Object ref = this.deviceIdLoggedInFrom_;
            if(!(ref instanceof String)) {
                ByteString bs = (ByteString)ref;
                String s = bs.toStringUtf8();
                this.deviceIdLoggedInFrom_ = s;
                return s;
            } else {
                return (String)ref;
            }
        }

        public ByteString getDeviceIdLoggedInFromBytes() {
            Object ref = this.deviceIdLoggedInFrom_;
            if(ref instanceof String) {
                ByteString b = ByteString.copyFromUtf8((String)ref);
                this.deviceIdLoggedInFrom_ = b;
                return b;
            } else {
                return (ByteString)ref;
            }
        }

        public CustomerMessage.Builder setDeviceIdLoggedInFrom(String value) {
            if(value == null) {
                throw new NullPointerException();
            } else {
                this.deviceIdLoggedInFrom_ = value;
                this.onChanged();
                return this;
            }
        }

        public CustomerMessage.Builder clearDeviceIdLoggedInFrom() {
            this.deviceIdLoggedInFrom_ = CustomerMessage.DEFAULT_INSTANCE.getDeviceIdLoggedInFrom();
            this.onChanged();
            return this;
        }

        public CustomerMessage.Builder setDeviceIdLoggedInFromBytes(ByteString value) {
            if(value == null) {
                throw new NullPointerException();
            } else {
                CustomerMessage.checkByteStringIsUtf8(value);
                this.deviceIdLoggedInFrom_ = value;
                this.onChanged();
                return this;
            }
        }

        private void ensureRegisteredDevicesIsMutable() {
            if((this.bitField0_ & 64) != 64) {
                this.registeredPhones_ = new LazyStringArrayList(this.registeredPhones_);
                this.bitField0_ |= 64;
            }

        }

        public ProtocolStringList getRegisteredDevicesList() {
            return this.registeredPhones_.getUnmodifiableView();
        }

        public int getRegisteredDevicesCount() {
            return this.registeredPhones_.size();
        }

        public String getRegisteredDevices(int index) {
            return (String)this.registeredPhones_.get(index);
        }

        public ByteString getRegisteredDevicesBytes(int index) {
            return this.registeredPhones_.getByteString(index);
        }

        public CustomerMessage.Builder setRegisteredDevices(int index, String value) {
            if(value == null) {
                throw new NullPointerException();
            } else {
                this.ensureRegisteredDevicesIsMutable();
                this.registeredPhones_.set(index, value);
                this.onChanged();
                return this;
            }
        }

        public CustomerMessage.Builder addRegisteredDevices(String value) {
            if(value == null) {
                throw new NullPointerException();
            } else {
                this.ensureRegisteredDevicesIsMutable();
                this.registeredPhones_.add(value);
                this.onChanged();
                return this;
            }
        }

        public CustomerMessage.Builder addAllRegisteredDevices(Iterable<String> values) {
            this.ensureRegisteredDevicesIsMutable();
            com.google.protobuf.AbstractMessageLite.Builder.addAll(values, this.registeredPhones_);
            this.onChanged();
            return this;
        }

        public CustomerMessage.Builder clearRegisteredDevices() {
            this.registeredPhones_ = LazyStringArrayList.EMPTY;
            this.bitField0_ &= -65;
            this.onChanged();
            return this;
        }

        public CustomerMessage.Builder addRegisteredDevicesBytes(ByteString value) {
            if(value == null) {
                throw new NullPointerException();
            } else {
                CustomerMessage.checkByteStringIsUtf8(value);
                this.ensureRegisteredDevicesIsMutable();
                this.registeredPhones_.add(value);
                this.onChanged();
                return this;
            }
        }

        public final CustomerMessage.Builder setUnknownFields(UnknownFieldSet unknownFields) {
            return this;
        }

        public final CustomerMessage.Builder mergeUnknownFields(UnknownFieldSet unknownFields) {
            return this;
        }
    }
}



On Fri, Aug 4, 2017 at 8:07 PM, Ted Yu <[hidden email]> wrote:
Can you show how CustomerMessage is defined ?

Thanks

On Fri, Aug 4, 2017 at 7:22 AM, Sridhar Chellappa <[hidden email]> wrote:
Folks,

I wrote a custom Data source to test me CEP logic. The custom data source looks like :

public class CustomerDataSource extends RichParallelSourceFunction<Customer> {
private boolean running = true;
private final Random random;

public CustomerDataSource() {
this.random = new Random();
}

@Override
public void run(SourceContext<CustomerMessage> ctx) throws Exception {
while (running) {
new CustomerDataGen().generateMessages().
forEach(element -> ctx.collect(element));

Thread.sleep(10000);
}
}

@Override
public void cancel() {
running = false;
}
}

public class CustomerDataGen {

public CustomerDataGen() {
this.random = new Random();
}
@Override
public List<CustomerMessage> generateMessages() throws InterruptedException {
List<CustomerMessage> messages = new ArrayList<CustomerMessage>();

messages.add(getMessage());
return messages;
}

private CustomerMessage getMessage() {
Instant time = Instant.now();
Timestamp eventTimeStamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(0).build();
Timestamp creationTimeStamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond() -1).setNanos(0).build();
return CustomerMessage.newBuilder().
setName("SomeCustomer").
setEventTimestamp(eventTimeStamp).
setCustomerId("01234").
addAllRegisteredPhoneNumbers(Arrays.asList("+9185216741972")).
 setEmail("[hidden email]").
build();
}
}

In my Main program :

.........
env.getConfig().registerTypeWithKryoSerializer(CustomerMessage.class, ProtobufSerializer.class);
env.addSource(new CustomerDataSource());
env.execute();


When I run the program, I get the following exception :


Caused by: java.lang.NullPointerException
at shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
at java.util.AbstractList.add(AbstractList.java:108)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
at com.sample.datasources.custom.CustomertDataSource.lambda$run$0(CustomerDataSource.java:24)
at java.util.ArrayList.forEach(ArrayList.java:1249)
at com.sample.datasources.custom.CustomertDataSource.run(CustomerDataSource.java:24)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
I am having a tough time figuring out why. Can someone help me out as to where am I going wrong?