Folks,
I wrote a custom Data source to test me CEP logic. The custom data source looks like :
public class CustomerDataSource extends RichParallelSourceFunction<Customer> {
private boolean running = true;
private final Random random;
public CustomerDataSource() {
this.random = new Random();
}
@Override
public void run(SourceContext<CustomerMessage> ctx) throws Exception {
while (running) {
new CustomerDataGen().generateMessages().
forEach(element -> ctx.collect(element));
Thread.sleep(10000);
}
}
@Override
public void cancel() {
running = false;
}
}
public class CustomerDataGen {
public CustomerDataGen() {
this.random = new Random();
}
@Override
public List<CustomerMessage> generateMessages() throws InterruptedException {
List<CustomerMessage> messages = new ArrayList<CustomerMessage>();
messages.add(getMessage());
return messages;
}
private CustomerMessage getMessage() {
Instant time = Instant.now();
Timestamp eventTimeStamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(0).build();
Timestamp creationTimeStamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond() -1).setNanos(0).build();
return CustomerMessage.newBuilder().
setName("SomeCustomer").
setEventTimestamp(eventTimeStamp).
setCustomerId("01234").
addAllRegisteredPhoneNumbers(Arrays.asList("+9185216741972")).
setEmail("[hidden email]").
build();
}
}
In my Main program :
.........
env.getConfig().registerTypeWithKryoSerializer(CustomerMessage.class, ProtobufSerializer.class);
env.addSource(new CustomerDataSource());
env.execute();
When I run the program, I get the following exception :
Caused by: java.lang.NullPointerException
at shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
at java.util.AbstractList.add(AbstractList.java:108)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
at com.sample.datasources.custom.CustomertDataSource.lambda$run$0(CustomerDataSource.java:24)
at java.util.ArrayList.forEach(ArrayList.java:1249)
at com.sample.datasources.custom.CustomertDataSource.run(CustomerDataSource.java:24)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
I am having a tough time figuring out why. Can someone help me out as to where am I going wrong?