threads, parallelism and task managers

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

threads, parallelism and task managers

Stefano Bortoli
Hi guys,

I am trying to test a job that should run a number of tasks to read from a RDBMS using an improved JDBC connector. The connection and the reading run smoothly, but I cannot seem to be able to move above the limit of 8 concurrent threads running. 8 is of course the number of cores of my machine.

I have tried working around configurations and settings, but the Executor within the ExecutionContext keeps on having a parallelism of 8. Although, of course, the parallelism of the execution environment is much higher (in fact I have many more tasks to be allocated).

I feel it may be an issue of the LocalMiniCluster configuration that may just override/neglect my wish for higher degree of parallelism. Is there a way for me to work around this issue?

please let me know. Thanks a lot for you help! :-)

saluti,
Stefano
Reply | Threaded
Open this post in threaded view
|

Re: threads, parallelism and task managers

Flavio Pompermaier
Any help here? I think that the problem is that the JobManager creates the executionContext of the scheduler with

       val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())

and thus the number of concurrently running threads is limited to the number of cores (using the default constructor of the ForkJoinPool).
What do you think?


On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli <[hidden email]> wrote:
Hi guys,

I am trying to test a job that should run a number of tasks to read from a RDBMS using an improved JDBC connector. The connection and the reading run smoothly, but I cannot seem to be able to move above the limit of 8 concurrent threads running. 8 is of course the number of cores of my machine.

I have tried working around configurations and settings, but the Executor within the ExecutionContext keeps on having a parallelism of 8. Although, of course, the parallelism of the execution environment is much higher (in fact I have many more tasks to be allocated).

I feel it may be an issue of the LocalMiniCluster configuration that may just override/neglect my wish for higher degree of parallelism. Is there a way for me to work around this issue?

please let me know. Thanks a lot for you help! :-)

saluti,
Stefano


Reply | Threaded
Open this post in threaded view
|

Re: threads, parallelism and task managers

Ufuk Celebi
Hey Stefano,

this should work by setting the parallelism on the environment, e.g.

env.setParallelism(32)

Is this what you are doing?

The task threads are not part of a pool, but each submitted task
creates its own Thread.

– Ufuk


On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
<[hidden email]> wrote:

> Any help here? I think that the problem is that the JobManager creates the
> executionContext of the scheduler with
>
>        val executionContext = ExecutionContext.fromExecutor(new
> ForkJoinPool())
>
> and thus the number of concurrently running threads is limited to the number
> of cores (using the default constructor of the ForkJoinPool).
> What do you think?
>
>
> On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli <[hidden email]>
> wrote:
>>
>> Hi guys,
>>
>> I am trying to test a job that should run a number of tasks to read from a
>> RDBMS using an improved JDBC connector. The connection and the reading run
>> smoothly, but I cannot seem to be able to move above the limit of 8
>> concurrent threads running. 8 is of course the number of cores of my
>> machine.
>>
>> I have tried working around configurations and settings, but the Executor
>> within the ExecutionContext keeps on having a parallelism of 8. Although, of
>> course, the parallelism of the execution environment is much higher (in fact
>> I have many more tasks to be allocated).
>>
>> I feel it may be an issue of the LocalMiniCluster configuration that may
>> just override/neglect my wish for higher degree of parallelism. Is there a
>> way for me to work around this issue?
>>
>> please let me know. Thanks a lot for you help! :-)
>>
>> saluti,
>> Stefano
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: threads, parallelism and task managers

Stefano Bortoli
Well, in theory yes. Each task has a thread, but only a number is run in parallel (the job of the scheduler).  Parallelism is set in the environment. However, whereas the parallelism parameter is set and read correctly, when it comes to actual starting of the threads, the number is fix to 8. We run a debugger to get to the point where the thread was started. As Flavio mentioned, the ExecutionContext has the parallelims set to 8. We have a pool of connections to a RDBS and il logs the creation of just 8 connections although parallelism is much higher.

My question is whether this is a bug (or a feature) of the LocalMiniCluster. :-) I am not scala expert, but I see some variable assignment in setting up of the MiniCluster, involving parallelism and 'default values'. Default values in terms of parallelism are based on the number of cores.

thanks a lot for the support!

saluti,
Stefano

2016-03-29 11:51 GMT+02:00 Ufuk Celebi <[hidden email]>:
Hey Stefano,

this should work by setting the parallelism on the environment, e.g.

env.setParallelism(32)

Is this what you are doing?

The task threads are not part of a pool, but each submitted task
creates its own Thread.

– Ufuk


On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
<[hidden email]> wrote:
> Any help here? I think that the problem is that the JobManager creates the
> executionContext of the scheduler with
>
>        val executionContext = ExecutionContext.fromExecutor(new
> ForkJoinPool())
>
> and thus the number of concurrently running threads is limited to the number
> of cores (using the default constructor of the ForkJoinPool).
> What do you think?
>
>
> On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli <[hidden email]>
> wrote:
>>
>> Hi guys,
>>
>> I am trying to test a job that should run a number of tasks to read from a
>> RDBMS using an improved JDBC connector. The connection and the reading run
>> smoothly, but I cannot seem to be able to move above the limit of 8
>> concurrent threads running. 8 is of course the number of cores of my
>> machine.
>>
>> I have tried working around configurations and settings, but the Executor
>> within the ExecutionContext keeps on having a parallelism of 8. Although, of
>> course, the parallelism of the execution environment is much higher (in fact
>> I have many more tasks to be allocated).
>>
>> I feel it may be an issue of the LocalMiniCluster configuration that may
>> just override/neglect my wish for higher degree of parallelism. Is there a
>> way for me to work around this issue?
>>
>> please let me know. Thanks a lot for you help! :-)
>>
>> saluti,
>> Stefano
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: threads, parallelism and task managers

Till Rohrmann-2

Hi,

for what do you use the ExecutionContext? That should actually be something which you shouldn’t be concerned with since it is only used internally by the runtime.

Cheers,
Till


On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <[hidden email]> wrote:
Well, in theory yes. Each task has a thread, but only a number is run in parallel (the job of the scheduler).  Parallelism is set in the environment. However, whereas the parallelism parameter is set and read correctly, when it comes to actual starting of the threads, the number is fix to 8. We run a debugger to get to the point where the thread was started. As Flavio mentioned, the ExecutionContext has the parallelims set to 8. We have a pool of connections to a RDBS and il logs the creation of just 8 connections although parallelism is much higher.

My question is whether this is a bug (or a feature) of the LocalMiniCluster. :-) I am not scala expert, but I see some variable assignment in setting up of the MiniCluster, involving parallelism and 'default values'. Default values in terms of parallelism are based on the number of cores.

thanks a lot for the support!

saluti,
Stefano

2016-03-29 11:51 GMT+02:00 Ufuk Celebi <[hidden email]>:
Hey Stefano,

this should work by setting the parallelism on the environment, e.g.

env.setParallelism(32)

Is this what you are doing?

The task threads are not part of a pool, but each submitted task
creates its own Thread.

– Ufuk


On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
<[hidden email]> wrote:
> Any help here? I think that the problem is that the JobManager creates the
> executionContext of the scheduler with
>
>        val executionContext = ExecutionContext.fromExecutor(new
> ForkJoinPool())
>
> and thus the number of concurrently running threads is limited to the number
> of cores (using the default constructor of the ForkJoinPool).
> What do you think?
>
>
> On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli <[hidden email]>
> wrote:
>>
>> Hi guys,
>>
>> I am trying to test a job that should run a number of tasks to read from a
>> RDBMS using an improved JDBC connector. The connection and the reading run
>> smoothly, but I cannot seem to be able to move above the limit of 8
>> concurrent threads running. 8 is of course the number of cores of my
>> machine.
>>
>> I have tried working around configurations and settings, but the Executor
>> within the ExecutionContext keeps on having a parallelism of 8. Although, of
>> course, the parallelism of the execution environment is much higher (in fact
>> I have many more tasks to be allocated).
>>
>> I feel it may be an issue of the LocalMiniCluster configuration that may
>> just override/neglect my wish for higher degree of parallelism. Is there a
>> way for me to work around this issue?
>>
>> please let me know. Thanks a lot for you help! :-)
>>
>> saluti,
>> Stefano
>
>
>


Reply | Threaded
Open this post in threaded view
|

Re: threads, parallelism and task managers

Stefano Bortoli
In fact, I don't use it. I just had to crawl back the runtime implementation to get to the point where parallelism was switching from 32 to 8.

saluti,
Stefano

2016-03-29 12:24 GMT+02:00 Till Rohrmann <[hidden email]>:

Hi,

for what do you use the ExecutionContext? That should actually be something which you shouldn’t be concerned with since it is only used internally by the runtime.

Cheers,
Till


On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <[hidden email]> wrote:
Well, in theory yes. Each task has a thread, but only a number is run in parallel (the job of the scheduler).  Parallelism is set in the environment. However, whereas the parallelism parameter is set and read correctly, when it comes to actual starting of the threads, the number is fix to 8. We run a debugger to get to the point where the thread was started. As Flavio mentioned, the ExecutionContext has the parallelims set to 8. We have a pool of connections to a RDBS and il logs the creation of just 8 connections although parallelism is much higher.

My question is whether this is a bug (or a feature) of the LocalMiniCluster. :-) I am not scala expert, but I see some variable assignment in setting up of the MiniCluster, involving parallelism and 'default values'. Default values in terms of parallelism are based on the number of cores.

thanks a lot for the support!

saluti,
Stefano

2016-03-29 11:51 GMT+02:00 Ufuk Celebi <[hidden email]>:
Hey Stefano,

this should work by setting the parallelism on the environment, e.g.

env.setParallelism(32)

Is this what you are doing?

The task threads are not part of a pool, but each submitted task
creates its own Thread.

– Ufuk


On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
<[hidden email]> wrote:
> Any help here? I think that the problem is that the JobManager creates the
> executionContext of the scheduler with
>
>        val executionContext = ExecutionContext.fromExecutor(new
> ForkJoinPool())
>
> and thus the number of concurrently running threads is limited to the number
> of cores (using the default constructor of the ForkJoinPool).
> What do you think?
>
>
> On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli <[hidden email]>
> wrote:
>>
>> Hi guys,
>>
>> I am trying to test a job that should run a number of tasks to read from a
>> RDBMS using an improved JDBC connector. The connection and the reading run
>> smoothly, but I cannot seem to be able to move above the limit of 8
>> concurrent threads running. 8 is of course the number of cores of my
>> machine.
>>
>> I have tried working around configurations and settings, but the Executor
>> within the ExecutionContext keeps on having a parallelism of 8. Although, of
>> course, the parallelism of the execution environment is much higher (in fact
>> I have many more tasks to be allocated).
>>
>> I feel it may be an issue of the LocalMiniCluster configuration that may
>> just override/neglect my wish for higher degree of parallelism. Is there a
>> way for me to work around this issue?
>>
>> please let me know. Thanks a lot for you help! :-)
>>
>> saluti,
>> Stefano
>
>
>



Reply | Threaded
Open this post in threaded view
|

Re: threads, parallelism and task managers

Till Rohrmann

Then it shouldn’t be a problem. The ExeuctionContetxt is used to run futures and their callbacks. But as Ufuk said, each task will spawn it’s own thread and if you set the parallelism to 32 then you should have 32 threads running.


On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <[hidden email]> wrote:
In fact, I don't use it. I just had to crawl back the runtime implementation to get to the point where parallelism was switching from 32 to 8.

saluti,
Stefano

2016-03-29 12:24 GMT+02:00 Till Rohrmann <[hidden email]>:

Hi,

for what do you use the ExecutionContext? That should actually be something which you shouldn’t be concerned with since it is only used internally by the runtime.

Cheers,
Till


On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <[hidden email]> wrote:
Well, in theory yes. Each task has a thread, but only a number is run in parallel (the job of the scheduler).  Parallelism is set in the environment. However, whereas the parallelism parameter is set and read correctly, when it comes to actual starting of the threads, the number is fix to 8. We run a debugger to get to the point where the thread was started. As Flavio mentioned, the ExecutionContext has the parallelims set to 8. We have a pool of connections to a RDBS and il logs the creation of just 8 connections although parallelism is much higher.

My question is whether this is a bug (or a feature) of the LocalMiniCluster. :-) I am not scala expert, but I see some variable assignment in setting up of the MiniCluster, involving parallelism and 'default values'. Default values in terms of parallelism are based on the number of cores.

thanks a lot for the support!

saluti,
Stefano

2016-03-29 11:51 GMT+02:00 Ufuk Celebi <[hidden email]>:
Hey Stefano,

this should work by setting the parallelism on the environment, e.g.

env.setParallelism(32)

Is this what you are doing?

The task threads are not part of a pool, but each submitted task
creates its own Thread.

– Ufuk


On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
<[hidden email]> wrote:
> Any help here? I think that the problem is that the JobManager creates the
> executionContext of the scheduler with
>
>        val executionContext = ExecutionContext.fromExecutor(new
> ForkJoinPool())
>
> and thus the number of concurrently running threads is limited to the number
> of cores (using the default constructor of the ForkJoinPool).
> What do you think?
>
>
> On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli <[hidden email]>
> wrote:
>>
>> Hi guys,
>>
>> I am trying to test a job that should run a number of tasks to read from a
>> RDBMS using an improved JDBC connector. The connection and the reading run
>> smoothly, but I cannot seem to be able to move above the limit of 8
>> concurrent threads running. 8 is of course the number of cores of my
>> machine.
>>
>> I have tried working around configurations and settings, but the Executor
>> within the ExecutionContext keeps on having a parallelism of 8. Although, of
>> course, the parallelism of the execution environment is much higher (in fact
>> I have many more tasks to be allocated).
>>
>> I feel it may be an issue of the LocalMiniCluster configuration that may
>> just override/neglect my wish for higher degree of parallelism. Is there a
>> way for me to work around this issue?
>>
>> please let me know. Thanks a lot for you help! :-)
>>
>> saluti,
>> Stefano
>
>
>




Reply | Threaded
Open this post in threaded view
|

Re: threads, parallelism and task managers

Stefano Bortoli
That is exactly my point. I should have 32 threads running, but I have only 8. 32 Task are created, but only only 8 are run concurrently. Flavio and I will try to make a simple program to produce the problem. If we solve our issues on the way, we'll let you know.

thanks a lot anyway.

saluti,
Stefano

2016-03-29 12:44 GMT+02:00 Till Rohrmann <[hidden email]>:

Then it shouldn’t be a problem. The ExeuctionContetxt is used to run futures and their callbacks. But as Ufuk said, each task will spawn it’s own thread and if you set the parallelism to 32 then you should have 32 threads running.


On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <[hidden email]> wrote:
In fact, I don't use it. I just had to crawl back the runtime implementation to get to the point where parallelism was switching from 32 to 8.

saluti,
Stefano

2016-03-29 12:24 GMT+02:00 Till Rohrmann <[hidden email]>:

Hi,

for what do you use the ExecutionContext? That should actually be something which you shouldn’t be concerned with since it is only used internally by the runtime.

Cheers,
Till


On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <[hidden email]> wrote:
Well, in theory yes. Each task has a thread, but only a number is run in parallel (the job of the scheduler).  Parallelism is set in the environment. However, whereas the parallelism parameter is set and read correctly, when it comes to actual starting of the threads, the number is fix to 8. We run a debugger to get to the point where the thread was started. As Flavio mentioned, the ExecutionContext has the parallelims set to 8. We have a pool of connections to a RDBS and il logs the creation of just 8 connections although parallelism is much higher.

My question is whether this is a bug (or a feature) of the LocalMiniCluster. :-) I am not scala expert, but I see some variable assignment in setting up of the MiniCluster, involving parallelism and 'default values'. Default values in terms of parallelism are based on the number of cores.

thanks a lot for the support!

saluti,
Stefano

2016-03-29 11:51 GMT+02:00 Ufuk Celebi <[hidden email]>:
Hey Stefano,

this should work by setting the parallelism on the environment, e.g.

env.setParallelism(32)

Is this what you are doing?

The task threads are not part of a pool, but each submitted task
creates its own Thread.

– Ufuk


On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
<[hidden email]> wrote:
> Any help here? I think that the problem is that the JobManager creates the
> executionContext of the scheduler with
>
>        val executionContext = ExecutionContext.fromExecutor(new
> ForkJoinPool())
>
> and thus the number of concurrently running threads is limited to the number
> of cores (using the default constructor of the ForkJoinPool).
> What do you think?
>
>
> On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli <[hidden email]>
> wrote:
>>
>> Hi guys,
>>
>> I am trying to test a job that should run a number of tasks to read from a
>> RDBMS using an improved JDBC connector. The connection and the reading run
>> smoothly, but I cannot seem to be able to move above the limit of 8
>> concurrent threads running. 8 is of course the number of cores of my
>> machine.
>>
>> I have tried working around configurations and settings, but the Executor
>> within the ExecutionContext keeps on having a parallelism of 8. Although, of
>> course, the parallelism of the execution environment is much higher (in fact
>> I have many more tasks to be allocated).
>>
>> I feel it may be an issue of the LocalMiniCluster configuration that may
>> just override/neglect my wish for higher degree of parallelism. Is there a
>> way for me to work around this issue?
>>
>> please let me know. Thanks a lot for you help! :-)
>>
>> saluti,
>> Stefano
>
>
>





Reply | Threaded
Open this post in threaded view
|

Re: threads, parallelism and task managers

Stefano Bortoli
Perhaps there is a misunderstanding on my side over the parallelism and split management given a data source.

We started from the current JDBCInputFormat to make it multi-thread. Then, given a space of keys, we create the splits based on a fetchsize set as a parameter. In the open, we get a connection from the pool, and execute a query using the split interval. This sets the 'resultSet', and then the DatasourceTask iterates between reachedEnd, next and close. On close, the connection is returned to the pool. We set parallelism to 32, and we would expect 32 connection opened but the connections opened are just 8.

We tried to make an example with the textinputformat, but being a delimitedinpurformat, the open is called sequentially when statistics are built, and then the processing is executed in parallel just after all the open are executed. This is not feasible in our case, because there would be millions of queries before the statistics are collected.

Perhaps we are doing something wrong, still to figure out what. :-/

thanks a lot for your help.

saluti,
Stefano


2016-03-29 13:30 GMT+02:00 Stefano Bortoli <[hidden email]>:
That is exactly my point. I should have 32 threads running, but I have only 8. 32 Task are created, but only only 8 are run concurrently. Flavio and I will try to make a simple program to produce the problem. If we solve our issues on the way, we'll let you know.

thanks a lot anyway.

saluti,
Stefano

2016-03-29 12:44 GMT+02:00 Till Rohrmann <[hidden email]>:

Then it shouldn’t be a problem. The ExeuctionContetxt is used to run futures and their callbacks. But as Ufuk said, each task will spawn it’s own thread and if you set the parallelism to 32 then you should have 32 threads running.


On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <[hidden email]> wrote:
In fact, I don't use it. I just had to crawl back the runtime implementation to get to the point where parallelism was switching from 32 to 8.

saluti,
Stefano

2016-03-29 12:24 GMT+02:00 Till Rohrmann <[hidden email]>:

Hi,

for what do you use the ExecutionContext? That should actually be something which you shouldn’t be concerned with since it is only used internally by the runtime.

Cheers,
Till


On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <[hidden email]> wrote:
Well, in theory yes. Each task has a thread, but only a number is run in parallel (the job of the scheduler).  Parallelism is set in the environment. However, whereas the parallelism parameter is set and read correctly, when it comes to actual starting of the threads, the number is fix to 8. We run a debugger to get to the point where the thread was started. As Flavio mentioned, the ExecutionContext has the parallelims set to 8. We have a pool of connections to a RDBS and il logs the creation of just 8 connections although parallelism is much higher.

My question is whether this is a bug (or a feature) of the LocalMiniCluster. :-) I am not scala expert, but I see some variable assignment in setting up of the MiniCluster, involving parallelism and 'default values'. Default values in terms of parallelism are based on the number of cores.

thanks a lot for the support!

saluti,
Stefano

2016-03-29 11:51 GMT+02:00 Ufuk Celebi <[hidden email]>:
Hey Stefano,

this should work by setting the parallelism on the environment, e.g.

env.setParallelism(32)

Is this what you are doing?

The task threads are not part of a pool, but each submitted task
creates its own Thread.

– Ufuk


On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
<[hidden email]> wrote:
> Any help here? I think that the problem is that the JobManager creates the
> executionContext of the scheduler with
>
>        val executionContext = ExecutionContext.fromExecutor(new
> ForkJoinPool())
>
> and thus the number of concurrently running threads is limited to the number
> of cores (using the default constructor of the ForkJoinPool).
> What do you think?
>
>
> On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli <[hidden email]>
> wrote:
>>
>> Hi guys,
>>
>> I am trying to test a job that should run a number of tasks to read from a
>> RDBMS using an improved JDBC connector. The connection and the reading run
>> smoothly, but I cannot seem to be able to move above the limit of 8
>> concurrent threads running. 8 is of course the number of cores of my
>> machine.
>>
>> I have tried working around configurations and settings, but the Executor
>> within the ExecutionContext keeps on having a parallelism of 8. Although, of
>> course, the parallelism of the execution environment is much higher (in fact
>> I have many more tasks to be allocated).
>>
>> I feel it may be an issue of the LocalMiniCluster configuration that may
>> just override/neglect my wish for higher degree of parallelism. Is there a
>> way for me to work around this issue?
>>
>> please let me know. Thanks a lot for you help! :-)
>>
>> saluti,
>> Stefano
>
>
>






Reply | Threaded
Open this post in threaded view
|

Re: threads, parallelism and task managers

Ufuk Celebi
Do you have the code somewhere online? Maybe someone can have a quick
look over it later. I'm pretty sure that is indeed a problem with the
custom input format.

– Ufuk

On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <[hidden email]> wrote:

> Perhaps there is a misunderstanding on my side over the parallelism and
> split management given a data source.
>
> We started from the current JDBCInputFormat to make it multi-thread. Then,
> given a space of keys, we create the splits based on a fetchsize set as a
> parameter. In the open, we get a connection from the pool, and execute a
> query using the split interval. This sets the 'resultSet', and then the
> DatasourceTask iterates between reachedEnd, next and close. On close, the
> connection is returned to the pool. We set parallelism to 32, and we would
> expect 32 connection opened but the connections opened are just 8.
>
> We tried to make an example with the textinputformat, but being a
> delimitedinpurformat, the open is called sequentially when statistics are
> built, and then the processing is executed in parallel just after all the
> open are executed. This is not feasible in our case, because there would be
> millions of queries before the statistics are collected.
>
> Perhaps we are doing something wrong, still to figure out what. :-/
>
> thanks a lot for your help.
>
> saluti,
> Stefano
>
>
> 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <[hidden email]>:
>>
>> That is exactly my point. I should have 32 threads running, but I have
>> only 8. 32 Task are created, but only only 8 are run concurrently. Flavio
>> and I will try to make a simple program to produce the problem. If we solve
>> our issues on the way, we'll let you know.
>>
>> thanks a lot anyway.
>>
>> saluti,
>> Stefano
>>
>> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <[hidden email]>:
>>>
>>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
>>> futures and their callbacks. But as Ufuk said, each task will spawn it’s own
>>> thread and if you set the parallelism to 32 then you should have 32 threads
>>> running.
>>>
>>>
>>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <[hidden email]>
>>> wrote:
>>>>
>>>> In fact, I don't use it. I just had to crawl back the runtime
>>>> implementation to get to the point where parallelism was switching from 32
>>>> to 8.
>>>>
>>>> saluti,
>>>> Stefano
>>>>
>>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <[hidden email]>:
>>>>>
>>>>> Hi,
>>>>>
>>>>> for what do you use the ExecutionContext? That should actually be
>>>>> something which you shouldn’t be concerned with since it is only used
>>>>> internally by the runtime.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>>
>>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <[hidden email]>
>>>>> wrote:
>>>>>>
>>>>>> Well, in theory yes. Each task has a thread, but only a number is run
>>>>>> in parallel (the job of the scheduler).  Parallelism is set in the
>>>>>> environment. However, whereas the parallelism parameter is set and read
>>>>>> correctly, when it comes to actual starting of the threads, the number is
>>>>>> fix to 8. We run a debugger to get to the point where the thread was
>>>>>> started. As Flavio mentioned, the ExecutionContext has the parallelims set
>>>>>> to 8. We have a pool of connections to a RDBS and il logs the creation of
>>>>>> just 8 connections although parallelism is much higher.
>>>>>>
>>>>>> My question is whether this is a bug (or a feature) of the
>>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some variable
>>>>>> assignment in setting up of the MiniCluster, involving parallelism and
>>>>>> 'default values'. Default values in terms of parallelism are based on the
>>>>>> number of cores.
>>>>>>
>>>>>> thanks a lot for the support!
>>>>>>
>>>>>> saluti,
>>>>>> Stefano
>>>>>>
>>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <[hidden email]>:
>>>>>>>
>>>>>>> Hey Stefano,
>>>>>>>
>>>>>>> this should work by setting the parallelism on the environment, e.g.
>>>>>>>
>>>>>>> env.setParallelism(32)
>>>>>>>
>>>>>>> Is this what you are doing?
>>>>>>>
>>>>>>> The task threads are not part of a pool, but each submitted task
>>>>>>> creates its own Thread.
>>>>>>>
>>>>>>> – Ufuk
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>>>>>>> <[hidden email]> wrote:
>>>>>>> > Any help here? I think that the problem is that the JobManager
>>>>>>> > creates the
>>>>>>> > executionContext of the scheduler with
>>>>>>> >
>>>>>>> >        val executionContext = ExecutionContext.fromExecutor(new
>>>>>>> > ForkJoinPool())
>>>>>>> >
>>>>>>> > and thus the number of concurrently running threads is limited to
>>>>>>> > the number
>>>>>>> > of cores (using the default constructor of the ForkJoinPool).
>>>>>>> > What do you think?
>>>>>>> >
>>>>>>> >
>>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli
>>>>>>> > <[hidden email]>
>>>>>>> > wrote:
>>>>>>> >>
>>>>>>> >> Hi guys,
>>>>>>> >>
>>>>>>> >> I am trying to test a job that should run a number of tasks to
>>>>>>> >> read from a
>>>>>>> >> RDBMS using an improved JDBC connector. The connection and the
>>>>>>> >> reading run
>>>>>>> >> smoothly, but I cannot seem to be able to move above the limit of
>>>>>>> >> 8
>>>>>>> >> concurrent threads running. 8 is of course the number of cores of
>>>>>>> >> my
>>>>>>> >> machine.
>>>>>>> >>
>>>>>>> >> I have tried working around configurations and settings, but the
>>>>>>> >> Executor
>>>>>>> >> within the ExecutionContext keeps on having a parallelism of 8.
>>>>>>> >> Although, of
>>>>>>> >> course, the parallelism of the execution environment is much
>>>>>>> >> higher (in fact
>>>>>>> >> I have many more tasks to be allocated).
>>>>>>> >>
>>>>>>> >> I feel it may be an issue of the LocalMiniCluster configuration
>>>>>>> >> that may
>>>>>>> >> just override/neglect my wish for higher degree of parallelism. Is
>>>>>>> >> there a
>>>>>>> >> way for me to work around this issue?
>>>>>>> >>
>>>>>>> >> please let me know. Thanks a lot for you help! :-)
>>>>>>> >>
>>>>>>> >> saluti,
>>>>>>> >> Stefano
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: threads, parallelism and task managers

Stefano Bortoli
Hi Ufuk,

here is our preliminary input formar implementation:
https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119

if you need a running project, I will have to create a test one cause I cannot share the current configuration.

thanks a lot in advance!



2016-03-30 10:13 GMT+02:00 Ufuk Celebi <[hidden email]>:
Do you have the code somewhere online? Maybe someone can have a quick
look over it later. I'm pretty sure that is indeed a problem with the
custom input format.

– Ufuk

On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <[hidden email]> wrote:
> Perhaps there is a misunderstanding on my side over the parallelism and
> split management given a data source.
>
> We started from the current JDBCInputFormat to make it multi-thread. Then,
> given a space of keys, we create the splits based on a fetchsize set as a
> parameter. In the open, we get a connection from the pool, and execute a
> query using the split interval. This sets the 'resultSet', and then the
> DatasourceTask iterates between reachedEnd, next and close. On close, the
> connection is returned to the pool. We set parallelism to 32, and we would
> expect 32 connection opened but the connections opened are just 8.
>
> We tried to make an example with the textinputformat, but being a
> delimitedinpurformat, the open is called sequentially when statistics are
> built, and then the processing is executed in parallel just after all the
> open are executed. This is not feasible in our case, because there would be
> millions of queries before the statistics are collected.
>
> Perhaps we are doing something wrong, still to figure out what. :-/
>
> thanks a lot for your help.
>
> saluti,
> Stefano
>
>
> 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <[hidden email]>:
>>
>> That is exactly my point. I should have 32 threads running, but I have
>> only 8. 32 Task are created, but only only 8 are run concurrently. Flavio
>> and I will try to make a simple program to produce the problem. If we solve
>> our issues on the way, we'll let you know.
>>
>> thanks a lot anyway.
>>
>> saluti,
>> Stefano
>>
>> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <[hidden email]>:
>>>
>>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
>>> futures and their callbacks. But as Ufuk said, each task will spawn it’s own
>>> thread and if you set the parallelism to 32 then you should have 32 threads
>>> running.
>>>
>>>
>>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <[hidden email]>
>>> wrote:
>>>>
>>>> In fact, I don't use it. I just had to crawl back the runtime
>>>> implementation to get to the point where parallelism was switching from 32
>>>> to 8.
>>>>
>>>> saluti,
>>>> Stefano
>>>>
>>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <[hidden email]>:
>>>>>
>>>>> Hi,
>>>>>
>>>>> for what do you use the ExecutionContext? That should actually be
>>>>> something which you shouldn’t be concerned with since it is only used
>>>>> internally by the runtime.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>>
>>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <[hidden email]>
>>>>> wrote:
>>>>>>
>>>>>> Well, in theory yes. Each task has a thread, but only a number is run
>>>>>> in parallel (the job of the scheduler).  Parallelism is set in the
>>>>>> environment. However, whereas the parallelism parameter is set and read
>>>>>> correctly, when it comes to actual starting of the threads, the number is
>>>>>> fix to 8. We run a debugger to get to the point where the thread was
>>>>>> started. As Flavio mentioned, the ExecutionContext has the parallelims set
>>>>>> to 8. We have a pool of connections to a RDBS and il logs the creation of
>>>>>> just 8 connections although parallelism is much higher.
>>>>>>
>>>>>> My question is whether this is a bug (or a feature) of the
>>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some variable
>>>>>> assignment in setting up of the MiniCluster, involving parallelism and
>>>>>> 'default values'. Default values in terms of parallelism are based on the
>>>>>> number of cores.
>>>>>>
>>>>>> thanks a lot for the support!
>>>>>>
>>>>>> saluti,
>>>>>> Stefano
>>>>>>
>>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <[hidden email]>:
>>>>>>>
>>>>>>> Hey Stefano,
>>>>>>>
>>>>>>> this should work by setting the parallelism on the environment, e.g.
>>>>>>>
>>>>>>> env.setParallelism(32)
>>>>>>>
>>>>>>> Is this what you are doing?
>>>>>>>
>>>>>>> The task threads are not part of a pool, but each submitted task
>>>>>>> creates its own Thread.
>>>>>>>
>>>>>>> – Ufuk
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>>>>>>> <[hidden email]> wrote:
>>>>>>> > Any help here? I think that the problem is that the JobManager
>>>>>>> > creates the
>>>>>>> > executionContext of the scheduler with
>>>>>>> >
>>>>>>> >        val executionContext = ExecutionContext.fromExecutor(new
>>>>>>> > ForkJoinPool())
>>>>>>> >
>>>>>>> > and thus the number of concurrently running threads is limited to
>>>>>>> > the number
>>>>>>> > of cores (using the default constructor of the ForkJoinPool).
>>>>>>> > What do you think?
>>>>>>> >
>>>>>>> >
>>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli
>>>>>>> > <[hidden email]>
>>>>>>> > wrote:
>>>>>>> >>
>>>>>>> >> Hi guys,
>>>>>>> >>
>>>>>>> >> I am trying to test a job that should run a number of tasks to
>>>>>>> >> read from a
>>>>>>> >> RDBMS using an improved JDBC connector. The connection and the
>>>>>>> >> reading run
>>>>>>> >> smoothly, but I cannot seem to be able to move above the limit of
>>>>>>> >> 8
>>>>>>> >> concurrent threads running. 8 is of course the number of cores of
>>>>>>> >> my
>>>>>>> >> machine.
>>>>>>> >>
>>>>>>> >> I have tried working around configurations and settings, but the
>>>>>>> >> Executor
>>>>>>> >> within the ExecutionContext keeps on having a parallelism of 8.
>>>>>>> >> Although, of
>>>>>>> >> course, the parallelism of the execution environment is much
>>>>>>> >> higher (in fact
>>>>>>> >> I have many more tasks to be allocated).
>>>>>>> >>
>>>>>>> >> I feel it may be an issue of the LocalMiniCluster configuration
>>>>>>> >> that may
>>>>>>> >> just override/neglect my wish for higher degree of parallelism. Is
>>>>>>> >> there a
>>>>>>> >> way for me to work around this issue?
>>>>>>> >>
>>>>>>> >> please let me know. Thanks a lot for you help! :-)
>>>>>>> >>
>>>>>>> >> saluti,
>>>>>>> >> Stefano
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: threads, parallelism and task managers

Flavio Pompermaier
We've finally created a running example (For Flink 0.10.2) of our improved JDBC imputformat that you can run from an IDE (it creates an in-memory derby database with 1000 rows and batch of 10) at https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
The first time you run the program you have to comment the following line:

        stmt.executeUpdate("Drop Table users ");

In your pom declare the following dependencies:

<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>

In my laptop I have 8 cores and if I put parallelism to 16 I expect to see 16 calls to the connection pool (i.e. '==================== CREATING NEW CONNECTION!') while I see only 8 (up to my maximum number of cores).
The number of created task instead is correct (16).

I hope this could help in understanding where the problem is!

Best and thank in advance,
Flavio

On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli <[hidden email]> wrote:
Hi Ufuk,

here is our preliminary input formar implementation:
https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119

if you need a running project, I will have to create a test one cause I cannot share the current configuration.

thanks a lot in advance!



2016-03-30 10:13 GMT+02:00 Ufuk Celebi <[hidden email]>:
Do you have the code somewhere online? Maybe someone can have a quick
look over it later. I'm pretty sure that is indeed a problem with the
custom input format.

– Ufuk

On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <[hidden email]> wrote:
> Perhaps there is a misunderstanding on my side over the parallelism and
> split management given a data source.
>
> We started from the current JDBCInputFormat to make it multi-thread. Then,
> given a space of keys, we create the splits based on a fetchsize set as a
> parameter. In the open, we get a connection from the pool, and execute a
> query using the split interval. This sets the 'resultSet', and then the
> DatasourceTask iterates between reachedEnd, next and close. On close, the
> connection is returned to the pool. We set parallelism to 32, and we would
> expect 32 connection opened but the connections opened are just 8.
>
> We tried to make an example with the textinputformat, but being a
> delimitedinpurformat, the open is called sequentially when statistics are
> built, and then the processing is executed in parallel just after all the
> open are executed. This is not feasible in our case, because there would be
> millions of queries before the statistics are collected.
>
> Perhaps we are doing something wrong, still to figure out what. :-/
>
> thanks a lot for your help.
>
> saluti,
> Stefano
>
>
> 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <[hidden email]>:
>>
>> That is exactly my point. I should have 32 threads running, but I have
>> only 8. 32 Task are created, but only only 8 are run concurrently. Flavio
>> and I will try to make a simple program to produce the problem. If we solve
>> our issues on the way, we'll let you know.
>>
>> thanks a lot anyway.
>>
>> saluti,
>> Stefano
>>
>> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <[hidden email]>:
>>>
>>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
>>> futures and their callbacks. But as Ufuk said, each task will spawn it’s own
>>> thread and if you set the parallelism to 32 then you should have 32 threads
>>> running.
>>>
>>>
>>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <[hidden email]>
>>> wrote:
>>>>
>>>> In fact, I don't use it. I just had to crawl back the runtime
>>>> implementation to get to the point where parallelism was switching from 32
>>>> to 8.
>>>>
>>>> saluti,
>>>> Stefano
>>>>
>>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <[hidden email]>:
>>>>>
>>>>> Hi,
>>>>>
>>>>> for what do you use the ExecutionContext? That should actually be
>>>>> something which you shouldn’t be concerned with since it is only used
>>>>> internally by the runtime.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>>
>>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <[hidden email]>
>>>>> wrote:
>>>>>>
>>>>>> Well, in theory yes. Each task has a thread, but only a number is run
>>>>>> in parallel (the job of the scheduler).  Parallelism is set in the
>>>>>> environment. However, whereas the parallelism parameter is set and read
>>>>>> correctly, when it comes to actual starting of the threads, the number is
>>>>>> fix to 8. We run a debugger to get to the point where the thread was
>>>>>> started. As Flavio mentioned, the ExecutionContext has the parallelims set
>>>>>> to 8. We have a pool of connections to a RDBS and il logs the creation of
>>>>>> just 8 connections although parallelism is much higher.
>>>>>>
>>>>>> My question is whether this is a bug (or a feature) of the
>>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some variable
>>>>>> assignment in setting up of the MiniCluster, involving parallelism and
>>>>>> 'default values'. Default values in terms of parallelism are based on the
>>>>>> number of cores.
>>>>>>
>>>>>> thanks a lot for the support!
>>>>>>
>>>>>> saluti,
>>>>>> Stefano
>>>>>>
>>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <[hidden email]>:
>>>>>>>
>>>>>>> Hey Stefano,
>>>>>>>
>>>>>>> this should work by setting the parallelism on the environment, e.g.
>>>>>>>
>>>>>>> env.setParallelism(32)
>>>>>>>
>>>>>>> Is this what you are doing?
>>>>>>>
>>>>>>> The task threads are not part of a pool, but each submitted task
>>>>>>> creates its own Thread.
>>>>>>>
>>>>>>> – Ufuk
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>>>>>>> <[hidden email]> wrote:
>>>>>>> > Any help here? I think that the problem is that the JobManager
>>>>>>> > creates the
>>>>>>> > executionContext of the scheduler with
>>>>>>> >
>>>>>>> >        val executionContext = ExecutionContext.fromExecutor(new
>>>>>>> > ForkJoinPool())
>>>>>>> >
>>>>>>> > and thus the number of concurrently running threads is limited to
>>>>>>> > the number
>>>>>>> > of cores (using the default constructor of the ForkJoinPool).
>>>>>>> > What do you think?
>>>>>>> >
>>>>>>> >
>>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli
>>>>>>> > <[hidden email]>
>>>>>>> > wrote:
>>>>>>> >>
>>>>>>> >> Hi guys,
>>>>>>> >>
>>>>>>> >> I am trying to test a job that should run a number of tasks to
>>>>>>> >> read from a
>>>>>>> >> RDBMS using an improved JDBC connector. The connection and the
>>>>>>> >> reading run
>>>>>>> >> smoothly, but I cannot seem to be able to move above the limit of
>>>>>>> >> 8
>>>>>>> >> concurrent threads running. 8 is of course the number of cores of
>>>>>>> >> my
>>>>>>> >> machine.
>>>>>>> >>
>>>>>>> >> I have tried working around configurations and settings, but the
>>>>>>> >> Executor
>>>>>>> >> within the ExecutionContext keeps on having a parallelism of 8.
>>>>>>> >> Although, of
>>>>>>> >> course, the parallelism of the execution environment is much
>>>>>>> >> higher (in fact
>>>>>>> >> I have many more tasks to be allocated).
>>>>>>> >>
>>>>>>> >> I feel it may be an issue of the LocalMiniCluster configuration
>>>>>>> >> that may
>>>>>>> >> just override/neglect my wish for higher degree of parallelism. Is
>>>>>>> >> there a
>>>>>>> >> way for me to work around this issue?
>>>>>>> >>
>>>>>>> >> please let me know. Thanks a lot for you help! :-)
>>>>>>> >>
>>>>>>> >> saluti,
>>>>>>> >> Stefano
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Reply | Threaded
Open this post in threaded view
|

Re: threads, parallelism and task managers

Flavio Pompermaier
Any feedback about our JDBC InputFormat issue..?

On Thu, Apr 7, 2016 at 12:37 PM, Flavio Pompermaier <[hidden email]> wrote:
We've finally created a running example (For Flink 0.10.2) of our improved JDBC imputformat that you can run from an IDE (it creates an in-memory derby database with 1000 rows and batch of 10) at https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
The first time you run the program you have to comment the following line:

        stmt.executeUpdate("Drop Table users ");

In your pom declare the following dependencies:

<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>

In my laptop I have 8 cores and if I put parallelism to 16 I expect to see 16 calls to the connection pool (i.e. '==================== CREATING NEW CONNECTION!') while I see only 8 (up to my maximum number of cores).
The number of created task instead is correct (16).

I hope this could help in understanding where the problem is!

Best and thank in advance,
Flavio

On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli <[hidden email]> wrote:
Hi Ufuk,

here is our preliminary input formar implementation:
https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119

if you need a running project, I will have to create a test one cause I cannot share the current configuration.

thanks a lot in advance!



2016-03-30 10:13 GMT+02:00 Ufuk Celebi <[hidden email]>:
Do you have the code somewhere online? Maybe someone can have a quick
look over it later. I'm pretty sure that is indeed a problem with the
custom input format.

– Ufuk

On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <[hidden email]> wrote:
> Perhaps there is a misunderstanding on my side over the parallelism and
> split management given a data source.
>
> We started from the current JDBCInputFormat to make it multi-thread. Then,
> given a space of keys, we create the splits based on a fetchsize set as a
> parameter. In the open, we get a connection from the pool, and execute a
> query using the split interval. This sets the 'resultSet', and then the
> DatasourceTask iterates between reachedEnd, next and close. On close, the
> connection is returned to the pool. We set parallelism to 32, and we would
> expect 32 connection opened but the connections opened are just 8.
>
> We tried to make an example with the textinputformat, but being a
> delimitedinpurformat, the open is called sequentially when statistics are
> built, and then the processing is executed in parallel just after all the
> open are executed. This is not feasible in our case, because there would be
> millions of queries before the statistics are collected.
>
> Perhaps we are doing something wrong, still to figure out what. :-/
>
> thanks a lot for your help.
>
> saluti,
> Stefano
>
>
> 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <[hidden email]>:
>>
>> That is exactly my point. I should have 32 threads running, but I have
>> only 8. 32 Task are created, but only only 8 are run concurrently. Flavio
>> and I will try to make a simple program to produce the problem. If we solve
>> our issues on the way, we'll let you know.
>>
>> thanks a lot anyway.
>>
>> saluti,
>> Stefano
>>
>> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <[hidden email]>:
>>>
>>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
>>> futures and their callbacks. But as Ufuk said, each task will spawn it’s own
>>> thread and if you set the parallelism to 32 then you should have 32 threads
>>> running.
>>>
>>>
>>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <[hidden email]>
>>> wrote:
>>>>
>>>> In fact, I don't use it. I just had to crawl back the runtime
>>>> implementation to get to the point where parallelism was switching from 32
>>>> to 8.
>>>>
>>>> saluti,
>>>> Stefano
>>>>
>>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <[hidden email]>:
>>>>>
>>>>> Hi,
>>>>>
>>>>> for what do you use the ExecutionContext? That should actually be
>>>>> something which you shouldn’t be concerned with since it is only used
>>>>> internally by the runtime.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>>
>>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <[hidden email]>
>>>>> wrote:
>>>>>>
>>>>>> Well, in theory yes. Each task has a thread, but only a number is run
>>>>>> in parallel (the job of the scheduler).  Parallelism is set in the
>>>>>> environment. However, whereas the parallelism parameter is set and read
>>>>>> correctly, when it comes to actual starting of the threads, the number is
>>>>>> fix to 8. We run a debugger to get to the point where the thread was
>>>>>> started. As Flavio mentioned, the ExecutionContext has the parallelims set
>>>>>> to 8. We have a pool of connections to a RDBS and il logs the creation of
>>>>>> just 8 connections although parallelism is much higher.
>>>>>>
>>>>>> My question is whether this is a bug (or a feature) of the
>>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some variable
>>>>>> assignment in setting up of the MiniCluster, involving parallelism and
>>>>>> 'default values'. Default values in terms of parallelism are based on the
>>>>>> number of cores.
>>>>>>
>>>>>> thanks a lot for the support!
>>>>>>
>>>>>> saluti,
>>>>>> Stefano
>>>>>>
>>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <[hidden email]>:
>>>>>>>
>>>>>>> Hey Stefano,
>>>>>>>
>>>>>>> this should work by setting the parallelism on the environment, e.g.
>>>>>>>
>>>>>>> env.setParallelism(32)
>>>>>>>
>>>>>>> Is this what you are doing?
>>>>>>>
>>>>>>> The task threads are not part of a pool, but each submitted task
>>>>>>> creates its own Thread.
>>>>>>>
>>>>>>> – Ufuk
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>>>>>>> <[hidden email]> wrote:
>>>>>>> > Any help here? I think that the problem is that the JobManager
>>>>>>> > creates the
>>>>>>> > executionContext of the scheduler with
>>>>>>> >
>>>>>>> >        val executionContext = ExecutionContext.fromExecutor(new
>>>>>>> > ForkJoinPool())
>>>>>>> >
>>>>>>> > and thus the number of concurrently running threads is limited to
>>>>>>> > the number
>>>>>>> > of cores (using the default constructor of the ForkJoinPool).
>>>>>>> > What do you think?
>>>>>>> >
>>>>>>> >
>>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli
>>>>>>> > <[hidden email]>
>>>>>>> > wrote:
>>>>>>> >>
>>>>>>> >> Hi guys,
>>>>>>> >>
>>>>>>> >> I am trying to test a job that should run a number of tasks to
>>>>>>> >> read from a
>>>>>>> >> RDBMS using an improved JDBC connector. The connection and the
>>>>>>> >> reading run
>>>>>>> >> smoothly, but I cannot seem to be able to move above the limit of
>>>>>>> >> 8
>>>>>>> >> concurrent threads running. 8 is of course the number of cores of
>>>>>>> >> my
>>>>>>> >> machine.
>>>>>>> >>
>>>>>>> >> I have tried working around configurations and settings, but the
>>>>>>> >> Executor
>>>>>>> >> within the ExecutionContext keeps on having a parallelism of 8.
>>>>>>> >> Although, of
>>>>>>> >> course, the parallelism of the execution environment is much
>>>>>>> >> higher (in fact
>>>>>>> >> I have many more tasks to be allocated).
>>>>>>> >>
>>>>>>> >> I feel it may be an issue of the LocalMiniCluster configuration
>>>>>>> >> that may
>>>>>>> >> just override/neglect my wish for higher degree of parallelism. Is
>>>>>>> >> there a
>>>>>>> >> way for me to work around this issue?
>>>>>>> >>
>>>>>>> >> please let me know. Thanks a lot for you help! :-)
>>>>>>> >>
>>>>>>> >> saluti,
>>>>>>> >> Stefano
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>



Reply | Threaded
Open this post in threaded view
|

Re: threads, parallelism and task managers

Stephan Ewen
Sounds actually not like a Flink issue. I would look into the commons pool docs.
Maybe they size their pools by default with the number of cores, so the pool has only 8 threads, and other requests are queues?

On Wed, Apr 13, 2016 at 10:29 AM, Flavio Pompermaier <[hidden email]> wrote:
Any feedback about our JDBC InputFormat issue..?

On Thu, Apr 7, 2016 at 12:37 PM, Flavio Pompermaier <[hidden email]> wrote:
We've finally created a running example (For Flink 0.10.2) of our improved JDBC imputformat that you can run from an IDE (it creates an in-memory derby database with 1000 rows and batch of 10) at https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
The first time you run the program you have to comment the following line:

        stmt.executeUpdate("Drop Table users ");

In your pom declare the following dependencies:

<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>

In my laptop I have 8 cores and if I put parallelism to 16 I expect to see 16 calls to the connection pool (i.e. '==================== CREATING NEW CONNECTION!') while I see only 8 (up to my maximum number of cores).
The number of created task instead is correct (16).

I hope this could help in understanding where the problem is!

Best and thank in advance,
Flavio

On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli <[hidden email]> wrote:
Hi Ufuk,

here is our preliminary input formar implementation:
https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119

if you need a running project, I will have to create a test one cause I cannot share the current configuration.

thanks a lot in advance!



2016-03-30 10:13 GMT+02:00 Ufuk Celebi <[hidden email]>:
Do you have the code somewhere online? Maybe someone can have a quick
look over it later. I'm pretty sure that is indeed a problem with the
custom input format.

– Ufuk

On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <[hidden email]> wrote:
> Perhaps there is a misunderstanding on my side over the parallelism and
> split management given a data source.
>
> We started from the current JDBCInputFormat to make it multi-thread. Then,
> given a space of keys, we create the splits based on a fetchsize set as a
> parameter. In the open, we get a connection from the pool, and execute a
> query using the split interval. This sets the 'resultSet', and then the
> DatasourceTask iterates between reachedEnd, next and close. On close, the
> connection is returned to the pool. We set parallelism to 32, and we would
> expect 32 connection opened but the connections opened are just 8.
>
> We tried to make an example with the textinputformat, but being a
> delimitedinpurformat, the open is called sequentially when statistics are
> built, and then the processing is executed in parallel just after all the
> open are executed. This is not feasible in our case, because there would be
> millions of queries before the statistics are collected.
>
> Perhaps we are doing something wrong, still to figure out what. :-/
>
> thanks a lot for your help.
>
> saluti,
> Stefano
>
>
> 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <[hidden email]>:
>>
>> That is exactly my point. I should have 32 threads running, but I have
>> only 8. 32 Task are created, but only only 8 are run concurrently. Flavio
>> and I will try to make a simple program to produce the problem. If we solve
>> our issues on the way, we'll let you know.
>>
>> thanks a lot anyway.
>>
>> saluti,
>> Stefano
>>
>> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <[hidden email]>:
>>>
>>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
>>> futures and their callbacks. But as Ufuk said, each task will spawn it’s own
>>> thread and if you set the parallelism to 32 then you should have 32 threads
>>> running.
>>>
>>>
>>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <[hidden email]>
>>> wrote:
>>>>
>>>> In fact, I don't use it. I just had to crawl back the runtime
>>>> implementation to get to the point where parallelism was switching from 32
>>>> to 8.
>>>>
>>>> saluti,
>>>> Stefano
>>>>
>>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <[hidden email]>:
>>>>>
>>>>> Hi,
>>>>>
>>>>> for what do you use the ExecutionContext? That should actually be
>>>>> something which you shouldn’t be concerned with since it is only used
>>>>> internally by the runtime.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>>
>>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <[hidden email]>
>>>>> wrote:
>>>>>>
>>>>>> Well, in theory yes. Each task has a thread, but only a number is run
>>>>>> in parallel (the job of the scheduler).  Parallelism is set in the
>>>>>> environment. However, whereas the parallelism parameter is set and read
>>>>>> correctly, when it comes to actual starting of the threads, the number is
>>>>>> fix to 8. We run a debugger to get to the point where the thread was
>>>>>> started. As Flavio mentioned, the ExecutionContext has the parallelims set
>>>>>> to 8. We have a pool of connections to a RDBS and il logs the creation of
>>>>>> just 8 connections although parallelism is much higher.
>>>>>>
>>>>>> My question is whether this is a bug (or a feature) of the
>>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some variable
>>>>>> assignment in setting up of the MiniCluster, involving parallelism and
>>>>>> 'default values'. Default values in terms of parallelism are based on the
>>>>>> number of cores.
>>>>>>
>>>>>> thanks a lot for the support!
>>>>>>
>>>>>> saluti,
>>>>>> Stefano
>>>>>>
>>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <[hidden email]>:
>>>>>>>
>>>>>>> Hey Stefano,
>>>>>>>
>>>>>>> this should work by setting the parallelism on the environment, e.g.
>>>>>>>
>>>>>>> env.setParallelism(32)
>>>>>>>
>>>>>>> Is this what you are doing?
>>>>>>>
>>>>>>> The task threads are not part of a pool, but each submitted task
>>>>>>> creates its own Thread.
>>>>>>>
>>>>>>> – Ufuk
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>>>>>>> <[hidden email]> wrote:
>>>>>>> > Any help here? I think that the problem is that the JobManager
>>>>>>> > creates the
>>>>>>> > executionContext of the scheduler with
>>>>>>> >
>>>>>>> >        val executionContext = ExecutionContext.fromExecutor(new
>>>>>>> > ForkJoinPool())
>>>>>>> >
>>>>>>> > and thus the number of concurrently running threads is limited to
>>>>>>> > the number
>>>>>>> > of cores (using the default constructor of the ForkJoinPool).
>>>>>>> > What do you think?
>>>>>>> >
>>>>>>> >
>>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli
>>>>>>> > <[hidden email]>
>>>>>>> > wrote:
>>>>>>> >>
>>>>>>> >> Hi guys,
>>>>>>> >>
>>>>>>> >> I am trying to test a job that should run a number of tasks to
>>>>>>> >> read from a
>>>>>>> >> RDBMS using an improved JDBC connector. The connection and the
>>>>>>> >> reading run
>>>>>>> >> smoothly, but I cannot seem to be able to move above the limit of
>>>>>>> >> 8
>>>>>>> >> concurrent threads running. 8 is of course the number of cores of
>>>>>>> >> my
>>>>>>> >> machine.
>>>>>>> >>
>>>>>>> >> I have tried working around configurations and settings, but the
>>>>>>> >> Executor
>>>>>>> >> within the ExecutionContext keeps on having a parallelism of 8.
>>>>>>> >> Although, of
>>>>>>> >> course, the parallelism of the execution environment is much
>>>>>>> >> higher (in fact
>>>>>>> >> I have many more tasks to be allocated).
>>>>>>> >>
>>>>>>> >> I feel it may be an issue of the LocalMiniCluster configuration
>>>>>>> >> that may
>>>>>>> >> just override/neglect my wish for higher degree of parallelism. Is
>>>>>>> >> there a
>>>>>>> >> way for me to work around this issue?
>>>>>>> >>
>>>>>>> >> please let me know. Thanks a lot for you help! :-)
>>>>>>> >>
>>>>>>> >> saluti,
>>>>>>> >> Stefano
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>




Reply | Threaded
Open this post in threaded view
|

Re: threads, parallelism and task managers

Stefano Bortoli
Sounds you are damn right! thanks for the insight, dumb on us for not checking this before.

saluti,
Stefano

2016-04-13 11:05 GMT+02:00 Stephan Ewen <[hidden email]>:
Sounds actually not like a Flink issue. I would look into the commons pool docs.
Maybe they size their pools by default with the number of cores, so the pool has only 8 threads, and other requests are queues?

On Wed, Apr 13, 2016 at 10:29 AM, Flavio Pompermaier <[hidden email]> wrote:
Any feedback about our JDBC InputFormat issue..?

On Thu, Apr 7, 2016 at 12:37 PM, Flavio Pompermaier <[hidden email]> wrote:
We've finally created a running example (For Flink 0.10.2) of our improved JDBC imputformat that you can run from an IDE (it creates an in-memory derby database with 1000 rows and batch of 10) at https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
The first time you run the program you have to comment the following line:

        stmt.executeUpdate("Drop Table users ");

In your pom declare the following dependencies:

<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>

In my laptop I have 8 cores and if I put parallelism to 16 I expect to see 16 calls to the connection pool (i.e. '==================== CREATING NEW CONNECTION!') while I see only 8 (up to my maximum number of cores).
The number of created task instead is correct (16).

I hope this could help in understanding where the problem is!

Best and thank in advance,
Flavio

On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli <[hidden email]> wrote:
Hi Ufuk,

here is our preliminary input formar implementation:
https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119

if you need a running project, I will have to create a test one cause I cannot share the current configuration.

thanks a lot in advance!



2016-03-30 10:13 GMT+02:00 Ufuk Celebi <[hidden email]>:
Do you have the code somewhere online? Maybe someone can have a quick
look over it later. I'm pretty sure that is indeed a problem with the
custom input format.

– Ufuk

On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <[hidden email]> wrote:
> Perhaps there is a misunderstanding on my side over the parallelism and
> split management given a data source.
>
> We started from the current JDBCInputFormat to make it multi-thread. Then,
> given a space of keys, we create the splits based on a fetchsize set as a
> parameter. In the open, we get a connection from the pool, and execute a
> query using the split interval. This sets the 'resultSet', and then the
> DatasourceTask iterates between reachedEnd, next and close. On close, the
> connection is returned to the pool. We set parallelism to 32, and we would
> expect 32 connection opened but the connections opened are just 8.
>
> We tried to make an example with the textinputformat, but being a
> delimitedinpurformat, the open is called sequentially when statistics are
> built, and then the processing is executed in parallel just after all the
> open are executed. This is not feasible in our case, because there would be
> millions of queries before the statistics are collected.
>
> Perhaps we are doing something wrong, still to figure out what. :-/
>
> thanks a lot for your help.
>
> saluti,
> Stefano
>
>
> 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <[hidden email]>:
>>
>> That is exactly my point. I should have 32 threads running, but I have
>> only 8. 32 Task are created, but only only 8 are run concurrently. Flavio
>> and I will try to make a simple program to produce the problem. If we solve
>> our issues on the way, we'll let you know.
>>
>> thanks a lot anyway.
>>
>> saluti,
>> Stefano
>>
>> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <[hidden email]>:
>>>
>>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
>>> futures and their callbacks. But as Ufuk said, each task will spawn it’s own
>>> thread and if you set the parallelism to 32 then you should have 32 threads
>>> running.
>>>
>>>
>>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <[hidden email]>
>>> wrote:
>>>>
>>>> In fact, I don't use it. I just had to crawl back the runtime
>>>> implementation to get to the point where parallelism was switching from 32
>>>> to 8.
>>>>
>>>> saluti,
>>>> Stefano
>>>>
>>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <[hidden email]>:
>>>>>
>>>>> Hi,
>>>>>
>>>>> for what do you use the ExecutionContext? That should actually be
>>>>> something which you shouldn’t be concerned with since it is only used
>>>>> internally by the runtime.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>>
>>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <[hidden email]>
>>>>> wrote:
>>>>>>
>>>>>> Well, in theory yes. Each task has a thread, but only a number is run
>>>>>> in parallel (the job of the scheduler).  Parallelism is set in the
>>>>>> environment. However, whereas the parallelism parameter is set and read
>>>>>> correctly, when it comes to actual starting of the threads, the number is
>>>>>> fix to 8. We run a debugger to get to the point where the thread was
>>>>>> started. As Flavio mentioned, the ExecutionContext has the parallelims set
>>>>>> to 8. We have a pool of connections to a RDBS and il logs the creation of
>>>>>> just 8 connections although parallelism is much higher.
>>>>>>
>>>>>> My question is whether this is a bug (or a feature) of the
>>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some variable
>>>>>> assignment in setting up of the MiniCluster, involving parallelism and
>>>>>> 'default values'. Default values in terms of parallelism are based on the
>>>>>> number of cores.
>>>>>>
>>>>>> thanks a lot for the support!
>>>>>>
>>>>>> saluti,
>>>>>> Stefano
>>>>>>
>>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <[hidden email]>:
>>>>>>>
>>>>>>> Hey Stefano,
>>>>>>>
>>>>>>> this should work by setting the parallelism on the environment, e.g.
>>>>>>>
>>>>>>> env.setParallelism(32)
>>>>>>>
>>>>>>> Is this what you are doing?
>>>>>>>
>>>>>>> The task threads are not part of a pool, but each submitted task
>>>>>>> creates its own Thread.
>>>>>>>
>>>>>>> – Ufuk
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>>>>>>> <[hidden email]> wrote:
>>>>>>> > Any help here? I think that the problem is that the JobManager
>>>>>>> > creates the
>>>>>>> > executionContext of the scheduler with
>>>>>>> >
>>>>>>> >        val executionContext = ExecutionContext.fromExecutor(new
>>>>>>> > ForkJoinPool())
>>>>>>> >
>>>>>>> > and thus the number of concurrently running threads is limited to
>>>>>>> > the number
>>>>>>> > of cores (using the default constructor of the ForkJoinPool).
>>>>>>> > What do you think?
>>>>>>> >
>>>>>>> >
>>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli
>>>>>>> > <[hidden email]>
>>>>>>> > wrote:
>>>>>>> >>
>>>>>>> >> Hi guys,
>>>>>>> >>
>>>>>>> >> I am trying to test a job that should run a number of tasks to
>>>>>>> >> read from a
>>>>>>> >> RDBMS using an improved JDBC connector. The connection and the
>>>>>>> >> reading run
>>>>>>> >> smoothly, but I cannot seem to be able to move above the limit of
>>>>>>> >> 8
>>>>>>> >> concurrent threads running. 8 is of course the number of cores of
>>>>>>> >> my
>>>>>>> >> machine.
>>>>>>> >>
>>>>>>> >> I have tried working around configurations and settings, but the
>>>>>>> >> Executor
>>>>>>> >> within the ExecutionContext keeps on having a parallelism of 8.
>>>>>>> >> Although, of
>>>>>>> >> course, the parallelism of the execution environment is much
>>>>>>> >> higher (in fact
>>>>>>> >> I have many more tasks to be allocated).
>>>>>>> >>
>>>>>>> >> I feel it may be an issue of the LocalMiniCluster configuration
>>>>>>> >> that may
>>>>>>> >> just override/neglect my wish for higher degree of parallelism. Is
>>>>>>> >> there a
>>>>>>> >> way for me to work around this issue?
>>>>>>> >>
>>>>>>> >> please let me know. Thanks a lot for you help! :-)
>>>>>>> >>
>>>>>>> >> saluti,
>>>>>>> >> Stefano
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>





Reply | Threaded
Open this post in threaded view
|

Re: threads, parallelism and task managers

Stephan Ewen
No problem ;-)

On Wed, Apr 13, 2016 at 11:38 AM, Stefano Bortoli <[hidden email]> wrote:
Sounds you are damn right! thanks for the insight, dumb on us for not checking this before.

saluti,
Stefano

2016-04-13 11:05 GMT+02:00 Stephan Ewen <[hidden email]>:
Sounds actually not like a Flink issue. I would look into the commons pool docs.
Maybe they size their pools by default with the number of cores, so the pool has only 8 threads, and other requests are queues?

On Wed, Apr 13, 2016 at 10:29 AM, Flavio Pompermaier <[hidden email]> wrote:
Any feedback about our JDBC InputFormat issue..?

On Thu, Apr 7, 2016 at 12:37 PM, Flavio Pompermaier <[hidden email]> wrote:
We've finally created a running example (For Flink 0.10.2) of our improved JDBC imputformat that you can run from an IDE (it creates an in-memory derby database with 1000 rows and batch of 10) at https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
The first time you run the program you have to comment the following line:

        stmt.executeUpdate("Drop Table users ");

In your pom declare the following dependencies:

<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>

In my laptop I have 8 cores and if I put parallelism to 16 I expect to see 16 calls to the connection pool (i.e. '==================== CREATING NEW CONNECTION!') while I see only 8 (up to my maximum number of cores).
The number of created task instead is correct (16).

I hope this could help in understanding where the problem is!

Best and thank in advance,
Flavio

On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli <[hidden email]> wrote:
Hi Ufuk,

here is our preliminary input formar implementation:
https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119

if you need a running project, I will have to create a test one cause I cannot share the current configuration.

thanks a lot in advance!



2016-03-30 10:13 GMT+02:00 Ufuk Celebi <[hidden email]>:
Do you have the code somewhere online? Maybe someone can have a quick
look over it later. I'm pretty sure that is indeed a problem with the
custom input format.

– Ufuk

On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <[hidden email]> wrote:
> Perhaps there is a misunderstanding on my side over the parallelism and
> split management given a data source.
>
> We started from the current JDBCInputFormat to make it multi-thread. Then,
> given a space of keys, we create the splits based on a fetchsize set as a
> parameter. In the open, we get a connection from the pool, and execute a
> query using the split interval. This sets the 'resultSet', and then the
> DatasourceTask iterates between reachedEnd, next and close. On close, the
> connection is returned to the pool. We set parallelism to 32, and we would
> expect 32 connection opened but the connections opened are just 8.
>
> We tried to make an example with the textinputformat, but being a
> delimitedinpurformat, the open is called sequentially when statistics are
> built, and then the processing is executed in parallel just after all the
> open are executed. This is not feasible in our case, because there would be
> millions of queries before the statistics are collected.
>
> Perhaps we are doing something wrong, still to figure out what. :-/
>
> thanks a lot for your help.
>
> saluti,
> Stefano
>
>
> 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <[hidden email]>:
>>
>> That is exactly my point. I should have 32 threads running, but I have
>> only 8. 32 Task are created, but only only 8 are run concurrently. Flavio
>> and I will try to make a simple program to produce the problem. If we solve
>> our issues on the way, we'll let you know.
>>
>> thanks a lot anyway.
>>
>> saluti,
>> Stefano
>>
>> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <[hidden email]>:
>>>
>>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
>>> futures and their callbacks. But as Ufuk said, each task will spawn it’s own
>>> thread and if you set the parallelism to 32 then you should have 32 threads
>>> running.
>>>
>>>
>>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <[hidden email]>
>>> wrote:
>>>>
>>>> In fact, I don't use it. I just had to crawl back the runtime
>>>> implementation to get to the point where parallelism was switching from 32
>>>> to 8.
>>>>
>>>> saluti,
>>>> Stefano
>>>>
>>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <[hidden email]>:
>>>>>
>>>>> Hi,
>>>>>
>>>>> for what do you use the ExecutionContext? That should actually be
>>>>> something which you shouldn’t be concerned with since it is only used
>>>>> internally by the runtime.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>>
>>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <[hidden email]>
>>>>> wrote:
>>>>>>
>>>>>> Well, in theory yes. Each task has a thread, but only a number is run
>>>>>> in parallel (the job of the scheduler).  Parallelism is set in the
>>>>>> environment. However, whereas the parallelism parameter is set and read
>>>>>> correctly, when it comes to actual starting of the threads, the number is
>>>>>> fix to 8. We run a debugger to get to the point where the thread was
>>>>>> started. As Flavio mentioned, the ExecutionContext has the parallelims set
>>>>>> to 8. We have a pool of connections to a RDBS and il logs the creation of
>>>>>> just 8 connections although parallelism is much higher.
>>>>>>
>>>>>> My question is whether this is a bug (or a feature) of the
>>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some variable
>>>>>> assignment in setting up of the MiniCluster, involving parallelism and
>>>>>> 'default values'. Default values in terms of parallelism are based on the
>>>>>> number of cores.
>>>>>>
>>>>>> thanks a lot for the support!
>>>>>>
>>>>>> saluti,
>>>>>> Stefano
>>>>>>
>>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <[hidden email]>:
>>>>>>>
>>>>>>> Hey Stefano,
>>>>>>>
>>>>>>> this should work by setting the parallelism on the environment, e.g.
>>>>>>>
>>>>>>> env.setParallelism(32)
>>>>>>>
>>>>>>> Is this what you are doing?
>>>>>>>
>>>>>>> The task threads are not part of a pool, but each submitted task
>>>>>>> creates its own Thread.
>>>>>>>
>>>>>>> – Ufuk
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>>>>>>> <[hidden email]> wrote:
>>>>>>> > Any help here? I think that the problem is that the JobManager
>>>>>>> > creates the
>>>>>>> > executionContext of the scheduler with
>>>>>>> >
>>>>>>> >        val executionContext = ExecutionContext.fromExecutor(new
>>>>>>> > ForkJoinPool())
>>>>>>> >
>>>>>>> > and thus the number of concurrently running threads is limited to
>>>>>>> > the number
>>>>>>> > of cores (using the default constructor of the ForkJoinPool).
>>>>>>> > What do you think?
>>>>>>> >
>>>>>>> >
>>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli
>>>>>>> > <[hidden email]>
>>>>>>> > wrote:
>>>>>>> >>
>>>>>>> >> Hi guys,
>>>>>>> >>
>>>>>>> >> I am trying to test a job that should run a number of tasks to
>>>>>>> >> read from a
>>>>>>> >> RDBMS using an improved JDBC connector. The connection and the
>>>>>>> >> reading run
>>>>>>> >> smoothly, but I cannot seem to be able to move above the limit of
>>>>>>> >> 8
>>>>>>> >> concurrent threads running. 8 is of course the number of cores of
>>>>>>> >> my
>>>>>>> >> machine.
>>>>>>> >>
>>>>>>> >> I have tried working around configurations and settings, but the
>>>>>>> >> Executor
>>>>>>> >> within the ExecutionContext keeps on having a parallelism of 8.
>>>>>>> >> Although, of
>>>>>>> >> course, the parallelism of the execution environment is much
>>>>>>> >> higher (in fact
>>>>>>> >> I have many more tasks to be allocated).
>>>>>>> >>
>>>>>>> >> I feel it may be an issue of the LocalMiniCluster configuration
>>>>>>> >> that may
>>>>>>> >> just override/neglect my wish for higher degree of parallelism. Is
>>>>>>> >> there a
>>>>>>> >> way for me to work around this issue?
>>>>>>> >>
>>>>>>> >> please let me know. Thanks a lot for you help! :-)
>>>>>>> >>
>>>>>>> >> saluti,
>>>>>>> >> Stefano
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>