Hi all,
[Of course, right after hitting send I realized I could just do rides.getTransformation().setUid(“blah”), ditto for the fares stream. Might be something to add to the docs, or provide a .uid() method on KeyedStreams for syntactic sugar] Just for grins, I disabled auto-generated UIDs for the taxi rides/fares state example in the online tutorial. env.getConfig().disableAutoGeneratedUIDs(); I then added UIDs for all operators, sources & sinks. But I still get the following when calling env.getExecutionPlan() or env.execute(): java.lang.IllegalStateException: Auto generated UIDs have been disabled but no UID or hash has been assigned to operator Partition at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:297) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformTwoInputTransform(StreamGraphGenerator.java:682) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:252) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1529) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1564) at com.citi.flink.RidesAndFaresTool.main(RidesAndFaresTool.java:63) The simple workflow is: DataStream<TaxiRide> rides = env .addSource(new CheckpointedTaxiRideSource(ridesFile, servingSpeedFactor)) .uid("source: taxi rides") .name("taxi rides") .filter((TaxiRide ride) -> ride.isStart) .uid("filter: only start rides") .name("only start rides") .keyBy((TaxiRide ride) -> ride.rideId); DataStream<TaxiFare> fares = env .addSource(new CheckpointedTaxiFareSource(faresFile, servingSpeedFactor)) .uid("source: taxi fares") .name("taxi fares") .keyBy((TaxiFare fare) -> fare.rideId); DataStreamSink<Tuple2<TaxiRide, TaxiFare>> enriched = rides .connect(fares) .flatMap(new EnrichmentFunction()) .uid("function: enrich rides with fares") .name("enrich rides with fares") .addSink(sink) .uid("sink: enriched taxi rides") .name("enriched taxi rides"); Internally the exception is thrown when the EnrichFunction (a RichCoFlatMapFunction) is being transformed by StreamGraphGenerator.transformTwoInputTransform(). This calls StreamGraphGenerator.transform() with the two inputs, but the Transformation for each input is a PartitionTransformation. I don’t see a way to set the UID following the keyBy(), as a KeyedStream creates the PartitionTransformation without a UID. Any insight into setting the UID properly here? Or should StreamGraphGenerator.transform() skip the no-uid check for PartitionTransformation, since that’s not an operator with state? Thanks, — Ken -------------------------- Ken Krugler custom big data solutions & training Hadoop, Cascading, Cassandra & Solr |
Hi Ken, This is actually a bug that a Partition should not require a UID. It is fixed in 1.9.2 and 1.10. see FLINK-14910. Thanks, Zhu Zhu Ken Krugler <[hidden email]> 于2020年1月10日周五 上午7:51写道:
|
Free forum by Nabble | Edit this page |