Data loss when connecting keyed streams

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Data loss when connecting keyed streams

sardaesp

Hello everyone,

 

I just experienced something weird and I’d like to know if anyone has any idea of what could have happened.

 

I have a simple Flink cluster version 1.11.3 running on Kubernetes with a single worker.

I was testing a pipeline that connects 2 keyed streams and processes the result with a KeyedCoProcessFunction before writing the results to a database.

I enabled tracing in my logs and started sending test input data that would generate two keys, therefore the job would have 4 streams with 2 keys that would be connected into 2 streams.

In the logs I could see the data from the 4 streams with the correct keys, but the logs of the KeyedCoProcessFunction showed data for only one of the keys, and indeed the other key was never seen in my database.

I re-submitted the job and now it’s behaving as expected without changing the code at all.

 

Is this a known issue? Has anyone else experienced something similar?

 

A sample of the code in case it’s useful:

 

KeyedStream<A, String> allEventsWithTopology = openedEventsStream
       
.getSideOutput(Filter.ALL_EVENTS_DISREGARDING_FILTER)
        .flatMap(
new TopologicalPartitionKeyAssigner())
        .name(
"all-topological-events-stream")
        .uid(
operatorPrefix + "all-topological-events-stream")
        .keyBy(
keySelector);

DataStream<B> validCorrelationEvents = correlationEventStream
       
.keyBy(new CorrelationEventKeySelector())
        .connect(
allEventsWithTopology)
        .process(
new CorrelationEventValidator())
        .name(
"valid-correlation-events")
        .uid(
operatorPrefix + "valid-correlation-events");

 

Regards,

Alexis.