Hello everyone,
I just experienced something weird and I’d like to know if anyone has any idea of what could have happened.
I have a simple Flink cluster version 1.11.3 running on Kubernetes with a single worker.
I was testing a pipeline that connects 2 keyed streams and processes the result with a KeyedCoProcessFunction before writing the results to a database.
I enabled tracing in my logs and started sending test input data that would generate two keys, therefore the job would have 4 streams with 2 keys that would be connected into 2 streams.
In the logs I could see the data from the 4 streams with the correct keys, but the logs of the KeyedCoProcessFunction showed data for only one of the keys, and indeed the other key was never seen in my database.
I re-submitted the job and now it’s behaving as expected without changing the code at all.
Is this a known issue? Has anyone else experienced something similar?
A sample of the code in case it’s useful:
KeyedStream<A,
String>
allEventsWithTopology
=
openedEventsStream
.getSideOutput(Filter.ALL_EVENTS_DISREGARDING_FILTER)
.flatMap(new
TopologicalPartitionKeyAssigner(…))
.name("all-topological-events-stream")
.uid(operatorPrefix
+
"all-topological-events-stream")
.keyBy(keySelector);
DataStream<B>
validCorrelationEvents
=
correlationEventStream
.keyBy(new
CorrelationEventKeySelector())
.connect(allEventsWithTopology)
.process(new
CorrelationEventValidator(…))
.name("valid-correlation-events")
.uid(operatorPrefix
+
"valid-correlation-events");
Regards,
Alexis.