Hi guys, I am trying to test a job that should run a number of tasks to read from a RDBMS using an improved JDBC connector. The connection and the reading run smoothly, but I cannot seem to be able to move above the limit of 8 concurrent threads running. 8 is of course the number of cores of my machine. |
Any help here? I think that the problem is that the JobManager creates the executionContext of the scheduler with
val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool()) and thus the number of concurrently running threads is limited to the number of cores (using the default constructor of the ForkJoinPool). What do you think? On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli <[hidden email]> wrote:
|
Hey Stefano,
this should work by setting the parallelism on the environment, e.g. env.setParallelism(32) Is this what you are doing? The task threads are not part of a pool, but each submitted task creates its own Thread. – Ufuk On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier <[hidden email]> wrote: > Any help here? I think that the problem is that the JobManager creates the > executionContext of the scheduler with > > val executionContext = ExecutionContext.fromExecutor(new > ForkJoinPool()) > > and thus the number of concurrently running threads is limited to the number > of cores (using the default constructor of the ForkJoinPool). > What do you think? > > > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli <[hidden email]> > wrote: >> >> Hi guys, >> >> I am trying to test a job that should run a number of tasks to read from a >> RDBMS using an improved JDBC connector. The connection and the reading run >> smoothly, but I cannot seem to be able to move above the limit of 8 >> concurrent threads running. 8 is of course the number of cores of my >> machine. >> >> I have tried working around configurations and settings, but the Executor >> within the ExecutionContext keeps on having a parallelism of 8. Although, of >> course, the parallelism of the execution environment is much higher (in fact >> I have many more tasks to be allocated). >> >> I feel it may be an issue of the LocalMiniCluster configuration that may >> just override/neglect my wish for higher degree of parallelism. Is there a >> way for me to work around this issue? >> >> please let me know. Thanks a lot for you help! :-) >> >> saluti, >> Stefano > > > |
Well, in theory yes. Each task has a thread, but only a number is run in parallel (the job of the scheduler). Parallelism is set in the environment. However, whereas the parallelism parameter is set and read correctly, when it comes to actual starting of the threads, the number is fix to 8. We run a debugger to get to the point where the thread was started. As Flavio mentioned, the ExecutionContext has the parallelims set to 8. We have a pool of connections to a RDBS and il logs the creation of just 8 connections although parallelism is much higher. My question is whether this is a bug (or a feature) of the LocalMiniCluster. :-) I am not scala expert, but I see some variable assignment in setting up of the MiniCluster, involving parallelism and 'default values'. Default values in terms of parallelism are based on the number of cores.thanks a lot for the support! 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <[hidden email]>: Hey Stefano, |
Hi, for what do you use the Cheers, On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <[hidden email]> wrote:
|
In fact, I don't use it. I just had to crawl back the runtime implementation to get to the point where parallelism was switching from 32 to 8. saluti,2016-03-29 12:24 GMT+02:00 Till Rohrmann <[hidden email]>:
|
Then it shouldn’t be a problem. The On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <[hidden email]> wrote:
|
That is exactly my point. I should have 32 threads running, but I have only 8. 32 Task are created, but only only 8 are run concurrently. Flavio and I will try to make a simple program to produce the problem. If we solve our issues on the way, we'll let you know. thanks a lot anyway.2016-03-29 12:44 GMT+02:00 Till Rohrmann <[hidden email]>:
|
Perhaps there is a misunderstanding on my side over the parallelism and split management given a data source. We started from the current JDBCInputFormat to make it multi-thread. Then, given a space of keys, we create the splits based on a fetchsize set as a parameter. In the open, we get a connection from the pool, and execute a query using the split interval. This sets the 'resultSet', and then the DatasourceTask iterates between reachedEnd, next and close. On close, the connection is returned to the pool. We set parallelism to 32, and we would expect 32 connection opened but the connections opened are just 8. Perhaps we are doing something wrong, still to figure out what. :-/ thanks a lot for your help. saluti, Stefano 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <[hidden email]>:
|
Do you have the code somewhere online? Maybe someone can have a quick
look over it later. I'm pretty sure that is indeed a problem with the custom input format. – Ufuk On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <[hidden email]> wrote: > Perhaps there is a misunderstanding on my side over the parallelism and > split management given a data source. > > We started from the current JDBCInputFormat to make it multi-thread. Then, > given a space of keys, we create the splits based on a fetchsize set as a > parameter. In the open, we get a connection from the pool, and execute a > query using the split interval. This sets the 'resultSet', and then the > DatasourceTask iterates between reachedEnd, next and close. On close, the > connection is returned to the pool. We set parallelism to 32, and we would > expect 32 connection opened but the connections opened are just 8. > > We tried to make an example with the textinputformat, but being a > delimitedinpurformat, the open is called sequentially when statistics are > built, and then the processing is executed in parallel just after all the > open are executed. This is not feasible in our case, because there would be > millions of queries before the statistics are collected. > > Perhaps we are doing something wrong, still to figure out what. :-/ > > thanks a lot for your help. > > saluti, > Stefano > > > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <[hidden email]>: >> >> That is exactly my point. I should have 32 threads running, but I have >> only 8. 32 Task are created, but only only 8 are run concurrently. Flavio >> and I will try to make a simple program to produce the problem. If we solve >> our issues on the way, we'll let you know. >> >> thanks a lot anyway. >> >> saluti, >> Stefano >> >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <[hidden email]>: >>> >>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run >>> futures and their callbacks. But as Ufuk said, each task will spawn it’s own >>> thread and if you set the parallelism to 32 then you should have 32 threads >>> running. >>> >>> >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <[hidden email]> >>> wrote: >>>> >>>> In fact, I don't use it. I just had to crawl back the runtime >>>> implementation to get to the point where parallelism was switching from 32 >>>> to 8. >>>> >>>> saluti, >>>> Stefano >>>> >>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <[hidden email]>: >>>>> >>>>> Hi, >>>>> >>>>> for what do you use the ExecutionContext? That should actually be >>>>> something which you shouldn’t be concerned with since it is only used >>>>> internally by the runtime. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> >>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <[hidden email]> >>>>> wrote: >>>>>> >>>>>> Well, in theory yes. Each task has a thread, but only a number is run >>>>>> in parallel (the job of the scheduler). Parallelism is set in the >>>>>> environment. However, whereas the parallelism parameter is set and read >>>>>> correctly, when it comes to actual starting of the threads, the number is >>>>>> fix to 8. We run a debugger to get to the point where the thread was >>>>>> started. As Flavio mentioned, the ExecutionContext has the parallelims set >>>>>> to 8. We have a pool of connections to a RDBS and il logs the creation of >>>>>> just 8 connections although parallelism is much higher. >>>>>> >>>>>> My question is whether this is a bug (or a feature) of the >>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some variable >>>>>> assignment in setting up of the MiniCluster, involving parallelism and >>>>>> 'default values'. Default values in terms of parallelism are based on the >>>>>> number of cores. >>>>>> >>>>>> thanks a lot for the support! >>>>>> >>>>>> saluti, >>>>>> Stefano >>>>>> >>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <[hidden email]>: >>>>>>> >>>>>>> Hey Stefano, >>>>>>> >>>>>>> this should work by setting the parallelism on the environment, e.g. >>>>>>> >>>>>>> env.setParallelism(32) >>>>>>> >>>>>>> Is this what you are doing? >>>>>>> >>>>>>> The task threads are not part of a pool, but each submitted task >>>>>>> creates its own Thread. >>>>>>> >>>>>>> – Ufuk >>>>>>> >>>>>>> >>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier >>>>>>> <[hidden email]> wrote: >>>>>>> > Any help here? I think that the problem is that the JobManager >>>>>>> > creates the >>>>>>> > executionContext of the scheduler with >>>>>>> > >>>>>>> > val executionContext = ExecutionContext.fromExecutor(new >>>>>>> > ForkJoinPool()) >>>>>>> > >>>>>>> > and thus the number of concurrently running threads is limited to >>>>>>> > the number >>>>>>> > of cores (using the default constructor of the ForkJoinPool). >>>>>>> > What do you think? >>>>>>> > >>>>>>> > >>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli >>>>>>> > <[hidden email]> >>>>>>> > wrote: >>>>>>> >> >>>>>>> >> Hi guys, >>>>>>> >> >>>>>>> >> I am trying to test a job that should run a number of tasks to >>>>>>> >> read from a >>>>>>> >> RDBMS using an improved JDBC connector. The connection and the >>>>>>> >> reading run >>>>>>> >> smoothly, but I cannot seem to be able to move above the limit of >>>>>>> >> 8 >>>>>>> >> concurrent threads running. 8 is of course the number of cores of >>>>>>> >> my >>>>>>> >> machine. >>>>>>> >> >>>>>>> >> I have tried working around configurations and settings, but the >>>>>>> >> Executor >>>>>>> >> within the ExecutionContext keeps on having a parallelism of 8. >>>>>>> >> Although, of >>>>>>> >> course, the parallelism of the execution environment is much >>>>>>> >> higher (in fact >>>>>>> >> I have many more tasks to be allocated). >>>>>>> >> >>>>>>> >> I feel it may be an issue of the LocalMiniCluster configuration >>>>>>> >> that may >>>>>>> >> just override/neglect my wish for higher degree of parallelism. Is >>>>>>> >> there a >>>>>>> >> way for me to work around this issue? >>>>>>> >> >>>>>>> >> please let me know. Thanks a lot for you help! :-) >>>>>>> >> >>>>>>> >> saluti, >>>>>>> >> Stefano >>>>>>> > >>>>>>> > >>>>>>> > >>>>>> >>>>>> >>>>> >>>> >>> >> > |
Hi Ufuk, here is our preliminary input formar implementation:https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119 2016-03-30 10:13 GMT+02:00 Ufuk Celebi <[hidden email]>: Do you have the code somewhere online? Maybe someone can have a quick |
We've finally created a running example (For Flink 0.10.2) of our improved JDBC imputformat that you can run from an IDE (it creates an in-memory derby database with 1000 rows and batch of 10) at https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
The first time you run the program you have to comment the following line: stmt.executeUpdate("Drop Table users "); In your pom declare the following dependencies: <dependency> <groupId>org.apache.derby</groupId> <artifactId>derby</artifactId> <version>10.10.1.1</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.4.2</version> </dependency> In my laptop I have 8 cores and if I put parallelism to 16 I expect to see 16 calls to the connection pool (i.e. '==================== CREATING NEW CONNECTION!') while I see only 8 (up to my maximum number of cores). The number of created task instead is correct (16). I hope this could help in understanding where the problem is! Best and thank in advance, Flavio On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli <[hidden email]> wrote:
|
Any feedback about our JDBC InputFormat issue..?
On Thu, Apr 7, 2016 at 12:37 PM, Flavio Pompermaier <[hidden email]> wrote:
|
Sounds actually not like a Flink issue. I would look into the commons pool docs. Maybe they size their pools by default with the number of cores, so the pool has only 8 threads, and other requests are queues? On Wed, Apr 13, 2016 at 10:29 AM, Flavio Pompermaier <[hidden email]> wrote:
|
Sounds you are damn right! thanks for the insight, dumb on us for not checking this before. saluti,2016-04-13 11:05 GMT+02:00 Stephan Ewen <[hidden email]>:
|
No problem ;-) On Wed, Apr 13, 2016 at 11:38 AM, Stefano Bortoli <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |