Hello all, I have been looking into different
StreamPartitioner<T> implementations of Flink, and I noticed they come with an implementation of
selectChannel(), as defined in the
ChannelSelector<T> interface. In order to understand better the actions of a
StreamPartitioner during execution, I set up Flink on a single server with one
TaskManager that had 16 slots. Then, I submitted a job, with a
HashPartitioner (through a
keyBy() transformation), and remote debugged it to see when the
HashPartitioner’s selectChannel() method is called. Unfortunately, the breakpoint is never reached and the job completes successfully. Is the previous behavior normal? If yes, why is the breakpoint never reached?
Does it have to do with running the job in an environment with local slots? Also, what determines the number of channels when a job is executed? Does it have to do with the number of available slots in the downstream operation of the partitioner? Thank you for your time and I appreciate any answers/comments/indications. Kind Regards. Nikos R. Katsipoulakis Department of Computer Science, University of Pittsburgh |
Hey Nikos,
slots are only relevant for scheduling tasks. The number of outgoing channels depends on the number of parallel subtasks that consume a produced intermediate result stream, say the result of a source operator. If you have a job with a simple source->keyBy->map flow with parallelism X you will have X outgoing channels at the source operator, one for each consuming map subtask. This is what is exposed by the underlying channel selector. For 1.1 the mentioned HashPartitioner should be called as you describe. In 1.2 this has been replaced by the KeyGroupStreamPartitioner. That the HashPartitioner method is not called is probably due to the fact that you are debugging this remotely. Have you tried it from within your local IDE, too? – Ufuk On Wed, Jan 25, 2017 at 2:12 PM, Katsipoulakis, Nikolaos Romanos <[hidden email]> wrote: > Hello all, > > > > I have been looking into different StreamPartitioner<T> implementations of > Flink, and I noticed they come with an implementation of selectChannel(), as > defined in the ChannelSelector<T> interface. In order to understand better > the actions of a StreamPartitioner during execution, I set up Flink on a > single server with one TaskManager that had 16 slots. Then, I submitted a > job, with a HashPartitioner (through a keyBy() transformation), and remote > debugged it to see when the HashPartitioner’s selectChannel() method is > called. Unfortunately, the breakpoint is never reached and the job completes > successfully. Is the previous behavior normal? If yes, why is the breakpoint > never reached? Does it have to do with running the job in an environment > with local slots? Also, what determines the number of channels when a job is > executed? Does it have to do with the number of available slots in the > downstream operation of the partitioner? > > > > Thank you for your time and I appreciate any answers/comments/indications. > > > > Kind Regards. > > > > Nikos R. Katsipoulakis > > Department of Computer Science, > > University of Pittsburgh > > |
Hello Ufuk,
First, thank you very much for your quick reply and for clarifying the difference between channels and slots. Turning to debugging and visiting the breakpoint inside the HashPartitioner, I need to inform you that I am using IntelliJ IDE and I have set up the environment as a maven project with the following dependencies: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.10</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.10</artifactId> <version>1.1.4</version> </dependency> Therefore, I assume that my local environment is using Flink 1.1. In addition, I set the parallelism to 8 for the operation after the keyBy() transformation to have 8 sub-tasks perform concurrently. Unfortunately, a breakpoint inside HashPartioner's selectChannel() is not reached. Am I doing something wrong? Kind Regards, Nikos R. Katsipoulakis, Department of Computer Science University of Pittsburgh -----Original Message----- From: Ufuk Celebi [mailto:[hidden email]] Sent: Wednesday, January 25, 2017 10:06 AM To: [hidden email] Subject: Re: When is the StreamPartitioner<T> selectChannel() method called Hey Nikos, slots are only relevant for scheduling tasks. The number of outgoing channels depends on the number of parallel subtasks that consume a produced intermediate result stream, say the result of a source operator. If you have a job with a simple source->keyBy->map flow with parallelism X you will have X outgoing channels at the source operator, one for each consuming map subtask. This is what is exposed by the underlying channel selector. For 1.1 the mentioned HashPartitioner should be called as you describe. In 1.2 this has been replaced by the KeyGroupStreamPartitioner. That the HashPartitioner method is not called is probably due to the fact that you are debugging this remotely. Have you tried it from within your local IDE, too? – Ufuk On Wed, Jan 25, 2017 at 2:12 PM, Katsipoulakis, Nikolaos Romanos <[hidden email]> wrote: > Hello all, > > > > I have been looking into different StreamPartitioner<T> > implementations of Flink, and I noticed they come with an > implementation of selectChannel(), as defined in the > ChannelSelector<T> interface. In order to understand better the > actions of a StreamPartitioner during execution, I set up Flink on a > single server with one TaskManager that had 16 slots. Then, I > submitted a job, with a HashPartitioner (through a keyBy() > transformation), and remote debugged it to see when the > HashPartitioner’s selectChannel() method is called. Unfortunately, the > breakpoint is never reached and the job completes successfully. Is the > previous behavior normal? If yes, why is the breakpoint never reached? > Does it have to do with running the job in an environment with local > slots? Also, what determines the number of channels when a job is executed? Does it have to do with the number of available slots in the downstream operation of the partitioner? > > > > Thank you for your time and I appreciate any answers/comments/indications. > > > > Kind Regards. > > > > Nikos R. Katsipoulakis > > Department of Computer Science, > > University of Pittsburgh > > |
Hello again,
As a follow-up email, on the same topology, I set break-points inside the selectChannel() methods for RebalancePartitioner and ForwardPartitioner and they are reached. Unfortunately, the same break-point set on the HashPartitioner is not reached. To make sure that an instance of a HashPartitioner is created for the keyBy() transformation, I set a breakpoint inside the StreamGraphGenerator transform() method and verify that a HashPartitioner object is created. My topology is submitted in the following way: // Phase 0: input setup DataStream<Tuple3<Long, Integer, Integer>> stream = env.fromCollection(…) .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<Long, Integer>>() { @Override public long extractAscendingTimestamp(Tuple2<Long, Integer> event) { return event.f0; } }).map( (Tuple2<Long, Integer> e) -> new Tuple3<Long, Integer, Integer>(e.f0, e.f1, 1)); // Phase 1: parallel partial sum, with a parallelism of N (N > 1) DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream.keyBy(1).window(TumblingEventTimeWindows.of(Time.seconds(10)).sum(2).setParallelism(N); // Phase 2: serial full aggregation and ordering, with a parallelism of 1 DataStream<String> phaseTwo = phaseOne .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)) .apply(new AllWindowsFunction<Tuple3<Long, Integer, Integer>, String, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<Tuple3<Long, Integer, Integer>> values, Collector<String> out) throws Exception { ... List<Integer> topTenValues = ...; StringBuilder strBuilder = new StringBuilder(); for (Integer t : topTenValues) strBuilder.append(Integer.toString(t) + “,”); out.collect(strBuilder.toString()); }); Am I missing something or is this a bug? Nikos R. Katsipoulakis, Department of Computer Science University of Pittsburgh -----Original Message----- From: Katsipoulakis, Nikolaos Romanos [mailto:[hidden email]] Sent: Wednesday, January 25, 2017 10:33 AM To: [hidden email] Subject: RE: When is the StreamPartitioner<T> selectChannel() method called Hello Ufuk, First, thank you very much for your quick reply and for clarifying the difference between channels and slots. Turning to debugging and visiting the breakpoint inside the HashPartitioner, I need to inform you that I am using IntelliJ IDE and I have set up the environment as a maven project with the following dependencies: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.10</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.10</artifactId> <version>1.1.4</version> </dependency> Therefore, I assume that my local environment is using Flink 1.1. In addition, I set the parallelism to 8 for the operation after the keyBy() transformation to have 8 sub-tasks perform concurrently. Unfortunately, a breakpoint inside HashPartioner's selectChannel() is not reached. Am I doing something wrong? Kind Regards, Nikos R. Katsipoulakis, Department of Computer Science University of Pittsburgh -----Original Message----- From: Ufuk Celebi [mailto:[hidden email]] Sent: Wednesday, January 25, 2017 10:06 AM To: [hidden email] Subject: Re: When is the StreamPartitioner<T> selectChannel() method called Hey Nikos, slots are only relevant for scheduling tasks. The number of outgoing channels depends on the number of parallel subtasks that consume a produced intermediate result stream, say the result of a source operator. If you have a job with a simple source->keyBy->map flow with parallelism X you will have X outgoing channels at the source operator, one for each consuming map subtask. This is what is exposed by the underlying channel selector. For 1.1 the mentioned HashPartitioner should be called as you describe. In 1.2 this has been replaced by the KeyGroupStreamPartitioner. That the HashPartitioner method is not called is probably due to the fact that you are debugging this remotely. Have you tried it from within your local IDE, too? – Ufuk On Wed, Jan 25, 2017 at 2:12 PM, Katsipoulakis, Nikolaos Romanos <[hidden email]> wrote: > Hello all, > > > > I have been looking into different StreamPartitioner<T> > implementations of Flink, and I noticed they come with an > implementation of selectChannel(), as defined in the > ChannelSelector<T> interface. In order to understand better the > actions of a StreamPartitioner during execution, I set up Flink on a > single server with one TaskManager that had 16 slots. Then, I > submitted a job, with a HashPartitioner (through a keyBy() > transformation), and remote debugged it to see when the > HashPartitioner’s selectChannel() method is called. Unfortunately, the > breakpoint is never reached and the job completes successfully. Is the > previous behavior normal? If yes, why is the breakpoint never reached? > Does it have to do with running the job in an environment with local > slots? Also, what determines the number of channels when a job is executed? Does it have to do with the number of available slots in the downstream operation of the partitioner? > > > > Thank you for your time and I appreciate any answers/comments/indications. > > > > Kind Regards. > > > > Nikos R. Katsipoulakis > > Department of Computer Science, > > University of Pittsburgh > > |
Free forum by Nabble | Edit this page |