How to assign a UID to a KeyedStream?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

How to assign a UID to a KeyedStream?

Ken Krugler
Hi all,

Just for grins, I disabled auto-generated UIDs for the taxi rides/fares state example in the online tutorial. 

            env.getConfig().disableAutoGeneratedUIDs();

I then added UIDs for all operators, sources & sinks. But I still get the following when calling env.getExecutionPlan() or env.execute():

java.lang.IllegalStateException: Auto generated UIDs have been disabled but no UID or hash has been assigned to operator Partition
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:297)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformTwoInputTransform(StreamGraphGenerator.java:682)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:252)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1529)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1564)
at com.citi.flink.RidesAndFaresTool.main(RidesAndFaresTool.java:63)

The simple workflow is:

        DataStream<TaxiRide> rides = env
                .addSource(new CheckpointedTaxiRideSource(ridesFile, servingSpeedFactor))
                .uid("source: taxi rides")
                .name("taxi rides")
                .filter((TaxiRide ride) -> ride.isStart)
                .uid("filter: only start rides")
                .name("only start rides")
                .keyBy((TaxiRide ride) -> ride.rideId);

        DataStream<TaxiFare> fares = env
                .addSource(new CheckpointedTaxiFareSource(faresFile, servingSpeedFactor))
                .uid("source: taxi fares")
                .name("taxi fares")
                .keyBy((TaxiFare fare) -> fare.rideId);

        DataStreamSink<Tuple2<TaxiRide, TaxiFare>> enriched = rides
                .connect(fares)
                .flatMap(new EnrichmentFunction())
                .uid("function: enrich rides with fares")
                .name("enrich rides with fares")
                .addSink(sink)
                .uid("sink: enriched taxi rides")
                .name("enriched taxi rides");

Internally the exception is thrown when the EnrichFunction (a RichCoFlatMapFunction) is being transformed by StreamGraphGenerator.transformTwoInputTransform().

This calls StreamGraphGenerator.transform() with the two inputs, but the Transformation for each input is a PartitionTransformation.

I don’t see a way to set the UID following the keyBy(), as a KeyedStream creates the PartitionTransformation without a UID.

Any insight into setting the UID properly here? Or should StreamGraphGenerator.transform() skip the no-uid check for PartitionTransformation, since that’s not an operator with state?

Thanks,

— Ken

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr