diff --git a/previews/PR48/.documenter-siteinfo.json b/previews/PR48/.documenter-siteinfo.json index 145b78f3..e4191845 100644 --- a/previews/PR48/.documenter-siteinfo.json +++ b/previews/PR48/.documenter-siteinfo.json @@ -1 +1 @@ -{"documenter":{"julia_version":"1.10.0","generation_timestamp":"2024-02-10T18:27:09","documenter_version":"1.2.1"}} \ No newline at end of file +{"documenter":{"julia_version":"1.10.0","generation_timestamp":"2024-02-10T21:30:15","documenter_version":"1.2.1"}} \ No newline at end of file diff --git a/previews/PR48/examples/integration/integration/index.html b/previews/PR48/examples/integration/integration/index.html index 2b614f6b..9099a810 100644 --- a/previews/PR48/examples/integration/integration/index.html +++ b/previews/PR48/examples/integration/integration/index.html @@ -23,4 +23,4 @@ @btime trapezoidal(0, 1, $N); @btime trapezoidal_parallel(0, 1, $N);
  12.782 ms (0 allocations: 0 bytes)
   2.563 ms (37 allocations: 3.16 KiB)
-

Because the problem is trivially parallel - all threads to the same thing and don't need to communicate - we expect an ideal speedup of (close to) the number of available threads.


This page was generated using Literate.jl.

+

Because the problem is trivially parallel - all threads to the same thing and don't need to communicate - we expect an ideal speedup of (close to) the number of available threads.


This page was generated using Literate.jl.

diff --git a/previews/PR48/examples/juliaset/juliaset/index.html b/previews/PR48/examples/juliaset/juliaset/index.html index 6b705673..8017e43f 100644 --- a/previews/PR48/examples/juliaset/juliaset/index.html +++ b/previews/PR48/examples/juliaset/juliaset/index.html @@ -59,4 +59,4 @@

Note that while this turns out to be a bit faster, it comes at the expense of much more allocations.

To quantify the impact of load balancing we can opt out of dynamic scheduling and use the StaticScheduler instead. The latter doesn't provide any form of load balancing.

using OhMyThreads: StaticScheduler
 
 @btime compute_juliaset_parallel!($img; scheduler=StaticScheduler()) samples=10 evals=3;
  63.147 ms (37 allocations: 3.26 KiB)
-

This page was generated using Literate.jl.

+

This page was generated using Literate.jl.

diff --git a/previews/PR48/examples/mc/mc/index.html b/previews/PR48/examples/mc/mc/index.html index 179266e6..ea8ccd6f 100644 --- a/previews/PR48/examples/mc/mc/index.html +++ b/previews/PR48/examples/mc/mc/index.html @@ -56,4 +56,4 @@ @btime mc($(length(idcs))) samples=10 evals=3;
  88.041 ms (0 allocations: 0 bytes)
   63.427 ms (0 allocations: 0 bytes)
-

This page was generated using Literate.jl.

+

This page was generated using Literate.jl.

diff --git a/previews/PR48/examples/tls/tls/index.html b/previews/PR48/examples/tls/tls/index.html index d2ac1347..f1d3b7ad 100644 --- a/previews/PR48/examples/tls/tls/index.html +++ b/previews/PR48/examples/tls/tls/index.html @@ -90,4 +90,4 @@ @btime matmulsums_tls_kwargs($As, $Bs; scheduler=$(DynamicScheduler(; nchunks=nthreads()))); @btime matmulsums_tls_kwargs($As, $Bs; scheduler=$(StaticScheduler()));
  576.448 ms (67 allocations: 40.01 MiB)
   574.186 ms (67 allocations: 40.01 MiB)
-

This page was generated using Literate.jl.

+

This page was generated using Literate.jl.

diff --git a/previews/PR48/index.html b/previews/PR48/index.html index 889b86c1..93680bc6 100644 --- a/previews/PR48/index.html +++ b/previews/PR48/index.html @@ -18,4 +18,4 @@ @btime mc_parallel($N; scheduler=DynamicScheduler(; nchunks=1)) # effectively using 1 thread @btime mc_parallel($N) # using all 5 threads

Timings might be something like this:

447.093 ms (7 allocations: 624 bytes)
-89.401 ms (66 allocations: 5.72 KiB)

(Check out the full Parallel Monte Carlo example if you like.)

No Transducers

Unlike most JuliaFolds2 packages, OhMyThreads.jl is not built off of Transducers.jl, nor is it a building block for Transducers.jl. Rather, it is meant to be a simpler, more maintainable, and more accessible alternative to high-level packages like, e.g., ThreadsX.jl or Folds.jl.

Acknowledgements

The idea for this package came from Carsten Bauer and Mason Protter. Check out the list of contributors for more information.

+89.401 ms (66 allocations: 5.72 KiB)

(Check out the full Parallel Monte Carlo example if you like.)

No Transducers

Unlike most JuliaFolds2 packages, OhMyThreads.jl is not built off of Transducers.jl, nor is it a building block for Transducers.jl. Rather, it is meant to be a simpler, more maintainable, and more accessible alternative to high-level packages like, e.g., ThreadsX.jl or Folds.jl.

Acknowledgements

The idea for this package came from Carsten Bauer and Mason Protter. Check out the list of contributors for more information.

diff --git a/previews/PR48/refs/api/index.html b/previews/PR48/refs/api/index.html index 926186a7..90fb6e77 100644 --- a/previews/PR48/refs/api/index.html +++ b/previews/PR48/refs/api/index.html @@ -2,18 +2,18 @@ Public API · OhMyThreads.jl

Public API

Index

Exported

Functions

OhMyThreads.tmapreduceFunction
tmapreduce(f, op, A::AbstractArray...;
            [scheduler::Scheduler = DynamicScheduler()],
            [outputtype::Type = Any],
-           [init])

A multithreaded function like Base.mapreduce. Perform a reduction over A, applying a single-argument function f to each element, and then combining them with the two-argument function op.

Note that op must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c). If op is not (approximately) associative, you will get undefined results.

Example:

tmapreduce(√, +, [1, 2, 3, 4, 5])

is the parallelized version of sum(√, [1, 2, 3, 4, 5]) in the form

(√1 + √2) + (√3 + √4) + √5

Keyword arguments:

  • scheduler::Scheduler (default DynamicScheduler()): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information.
  • outputtype::Type (default Any): will work as the asserted output type of parallel calculations. We use StableTasks.jl to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.
  • init: forwarded to mapreduce for the task-local sequential parts of the calculation.
source
OhMyThreads.treduceFunction
treduce(op, A::AbstractArray...;
+           [init])

A multithreaded function like Base.mapreduce. Perform a reduction over A, applying a single-argument function f to each element, and then combining them with the two-argument function op.

Note that op must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c). If op is not (approximately) associative, you will get undefined results.

Example:

tmapreduce(√, +, [1, 2, 3, 4, 5])

is the parallelized version of sum(√, [1, 2, 3, 4, 5]) in the form

(√1 + √2) + (√3 + √4) + √5

Keyword arguments:

  • scheduler::Scheduler (default DynamicScheduler()): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information.
  • outputtype::Type (default Any): will work as the asserted output type of parallel calculations. We use StableTasks.jl to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.
  • init: forwarded to mapreduce for the task-local sequential parts of the calculation.
source
OhMyThreads.treduceFunction
treduce(op, A::AbstractArray...;
         [scheduler::Scheduler = DynamicScheduler()],
         [outputtype::Type = Any],
-        [init])

A multithreaded function like Base.reduce. Perform a reduction over A using the two-argument function op.

Note that op must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c). If op is not (approximately) associative, you will get undefined results.

Example:

treduce(+, [1, 2, 3, 4, 5])

is the parallelized version of sum([1, 2, 3, 4, 5]) in the form

(1 + 2) + (3 + 4) + 5

Keyword arguments:

  • scheduler::Scheduler (default DynamicScheduler()): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information.
  • outputtype::Type (default Any): will work as the asserted output type of parallel calculations. We use StableTasks.jl to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.
  • init: forwarded to mapreduce for the task-local sequential parts of the calculation.
source
OhMyThreads.tmapFunction
tmap(f, [OutputElementType], A::AbstractArray...;
-     [schedule::Scheduler = DynamicScheduler()])

A multithreaded function like Base.map. Create a new container similar to A and fills it in parallel such that the ith element is equal to f(A[i]).

The optional argument OutputElementType will select a specific element type for the returned container, and will generally incur fewer allocations than the version where OutputElementType is not specified.

Example:

tmap(sin, 1:10)

Keyword arguments:

  • scheduler::Scheduler (default DynamicScheduler()): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information.
source
OhMyThreads.tmap!Function
tmap!(f, out, A::AbstractArray...;
-      [schedule::Scheduler = DynamicScheduler()])

A multithreaded function like Base.map!. In parallel on multiple tasks, this function assigns each element of out[i] = f(A[i]) for each index i of A and out.

Keyword arguments:

  • scheduler::Scheduler (default DynamicScheduler()): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information.
source
OhMyThreads.tforeachFunction
tforeach(f, A::AbstractArray...;
+        [init])

A multithreaded function like Base.reduce. Perform a reduction over A using the two-argument function op.

Note that op must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c). If op is not (approximately) associative, you will get undefined results.

Example:

treduce(+, [1, 2, 3, 4, 5])

is the parallelized version of sum([1, 2, 3, 4, 5]) in the form

(1 + 2) + (3 + 4) + 5

Keyword arguments:

  • scheduler::Scheduler (default DynamicScheduler()): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information.
  • outputtype::Type (default Any): will work as the asserted output type of parallel calculations. We use StableTasks.jl to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.
  • init: forwarded to mapreduce for the task-local sequential parts of the calculation.
source
OhMyThreads.tmapFunction
tmap(f, [OutputElementType], A::AbstractArray...;
+     [schedule::Scheduler = DynamicScheduler()])

A multithreaded function like Base.map. Create a new container similar to A and fills it in parallel such that the ith element is equal to f(A[i]).

The optional argument OutputElementType will select a specific element type for the returned container, and will generally incur fewer allocations than the version where OutputElementType is not specified.

Example:

tmap(sin, 1:10)

Keyword arguments:

  • scheduler::Scheduler (default DynamicScheduler()): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information.
source
OhMyThreads.tmap!Function
tmap!(f, out, A::AbstractArray...;
+      [schedule::Scheduler = DynamicScheduler()])

A multithreaded function like Base.map!. In parallel on multiple tasks, this function assigns each element of out[i] = f(A[i]) for each index i of A and out.

Keyword arguments:

  • scheduler::Scheduler (default DynamicScheduler()): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information.
source
OhMyThreads.tforeachFunction
tforeach(f, A::AbstractArray...;
          [schedule::Scheduler = DynamicScheduler()]) :: Nothing

A multithreaded function like Base.foreach. Apply f to each element of A on multiple parallel tasks, and return nothing. I.e. it is the parallel equivalent of

for x in A
     f(x)
 end

Example:

tforeach(1:10) do i
     println(i^2)
-end

Keyword arguments:

  • scheduler::Scheduler (default DynamicScheduler()): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information.
source
OhMyThreads.tcollectFunction
tcollect([OutputElementType], gen::Union{AbstractArray, Generator{<:AbstractArray}};
-         [schedule::Scheduler = DynamicScheduler()])

A multithreaded function like Base.collect. Essentially just calls tmap on the generator function and inputs.

The optional argument OutputElementType will select a specific element type for the returned container, and will generally incur fewer allocations than the version where OutputElementType is not specified.

Example:

tcollect(sin(i) for i in 1:10)

Keyword arguments:

  • scheduler::Scheduler (default DynamicScheduler()): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information.
source
OhMyThreads.treducemapFunction
treducemap(op, f, A::AbstractArray...;
+end

Keyword arguments:

  • scheduler::Scheduler (default DynamicScheduler()): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information.
source
OhMyThreads.tcollectFunction
tcollect([OutputElementType], gen::Union{AbstractArray, Generator{<:AbstractArray}};
+         [schedule::Scheduler = DynamicScheduler()])

A multithreaded function like Base.collect. Essentially just calls tmap on the generator function and inputs.

The optional argument OutputElementType will select a specific element type for the returned container, and will generally incur fewer allocations than the version where OutputElementType is not specified.

Example:

tcollect(sin(i) for i in 1:10)

Keyword arguments:

  • scheduler::Scheduler (default DynamicScheduler()): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information.
source
OhMyThreads.treducemapFunction
treducemap(op, f, A::AbstractArray...;
            [scheduler::Scheduler = DynamicScheduler()],
            [outputtype::Type = Any],
-           [init])

Like tmapreduce except the order of the f and op arguments are switched. This is sometimes convenient with do-block notation. Perform a reduction over A, applying a single-argument function f to each element, and then combining them with the two-argument function op.

Note that op must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c). If op is not (approximately) associative, you will get undefined results.

Example:

tmapreduce(√, +, [1, 2, 3, 4, 5])

is the parallelized version of sum(√, [1, 2, 3, 4, 5]) in the form

(√1 + √2) + (√3 + √4) + √5

Keyword arguments:

  • scheduler::Scheduler (default DynamicScheduler()): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information.
  • outputtype::Type (default Any): will work as the asserted output type of parallel calculations. We use StableTasks.jl to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.
  • init: forwarded to mapreduce for the task-local sequential parts of the calculation.
source

Schedulers

OhMyThreads.Schedulers.DynamicSchedulerType

The default dynamic scheduler. Divides the given collection into chunks and then spawns a task per chunk to perform the requested operation in parallel. The tasks are assigned to threads by Julia's dynamic scheduler and are non-sticky, that is, they can migrate between threads.

Generally preferred since it is flexible, can provide load balancing, and is composable with other multithreaded code.

Keyword arguments:

  • nchunks::Integer (default 2 * nthreads(threadpool)):
    • Determines the number of chunks (and thus also the number of parallel tasks).
    • Increasing nchunks can help with load balancing, but at the expense of creating more overhead. For nchunks <= nthreads() there are not enough chunks for any load balancing.
    • Setting nchunks < nthreads() is an effective way to use only a subset of the available threads.
    • Setting nchunks = 0 turns off the internal chunking entirely (a task is spawned for each element). Note that, depending on the input, this scheduler might spawn many(!) tasks and can be
    very costly!
  • split::Symbol (default :batch):
    • Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements.
    • See ChunkSplitters.jl for more details and available options.
  • threadpool::Symbol (default :default):
    • Possible options are :default and :interactive.
    • The high-priority pool :interactive should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without yielding as it can interfere with heartbeat processes.
source
OhMyThreads.Schedulers.StaticSchedulerType

A static low-overhead scheduler. Divides the given collection into chunks and then spawns a task per chunk to perform the requested operation in parallel. The tasks are statically assigned to threads up front and are made sticky, that is, they are guaranteed to stay on the assigned threads (no task migration).

Can sometimes be more performant than DynamicScheduler when the workload is (close to) uniform and, because of the lower overhead, for small workloads. Isn't well composable with other multithreaded code though.

Keyword arguments:

  • nchunks::Integer (default nthreads()):
    • Determines the number of chunks (and thus also the number of parallel tasks).
    • Setting nchunks < nthreads() is an effective way to use only a subset of the available threads.
    • Currently, nchunks > nthreads() isn't officialy supported but, for now, will fall back to nchunks = nthreads().
  • split::Symbol (default :batch):
    • Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements.
    • See ChunkSplitters.jl for more details and available options.
source
OhMyThreads.Schedulers.GreedySchedulerType

A greedy dynamic scheduler. The elements of the collection are first put into a Channel and then dynamic, non-sticky tasks are spawned to process channel content in parallel.

Note that elements are processed in a non-deterministic order, and thus a potential reducing function must be commutative in addition to being associative, or you could get incorrect results!

Can be good choice for load-balancing slower, uneven computations, but does carry some additional overhead.

Keyword arguments:

  • ntasks::Int (default nthreads()):
    • Determines the number of parallel tasks to be spawned.
    • Setting nchunks < nthreads() is an effective way to use only a subset of the available threads.
source

Non-Exported

OhMyThreads.@spawnsee StableTasks.jl
OhMyThreads.@spawnatsee StableTasks.jl
OhMyThreads.chunkssee ChunkSplitters.jl
OhMyThreads.TaskLocalValuesee TaskLocalValues.jl
+ [init])

Like tmapreduce except the order of the f and op arguments are switched. This is sometimes convenient with do-block notation. Perform a reduction over A, applying a single-argument function f to each element, and then combining them with the two-argument function op.

Note that op must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c). If op is not (approximately) associative, you will get undefined results.

Example:

tmapreduce(√, +, [1, 2, 3, 4, 5])

is the parallelized version of sum(√, [1, 2, 3, 4, 5]) in the form

(√1 + √2) + (√3 + √4) + √5

Keyword arguments:

source

Schedulers

OhMyThreads.Schedulers.SchedulerType

Supertype for all available schedulers:

source
OhMyThreads.Schedulers.DynamicSchedulerType

The default dynamic scheduler. Divides the given collection into chunks and then spawns a task per chunk to perform the requested operation in parallel. The tasks are assigned to threads by Julia's dynamic scheduler and are non-sticky, that is, they can migrate between threads.

Generally preferred since it is flexible, can provide load balancing, and is composable with other multithreaded code.

Keyword arguments:

  • nchunks::Integer (default 2 * nthreads(threadpool)):
    • Determines the number of chunks (and thus also the number of parallel tasks).
    • Increasing nchunks can help with load balancing, but at the expense of creating more overhead. For nchunks <= nthreads() there are not enough chunks for any load balancing.
    • Setting nchunks < nthreads() is an effective way to use only a subset of the available threads.
    • Setting nchunks = 0 turns off the internal chunking entirely (a task is spawned for each element). Note that, depending on the input, this scheduler might spawn many(!) tasks and can be
    very costly!
  • split::Symbol (default :batch):
    • Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements.
    • See ChunkSplitters.jl for more details and available options.
  • threadpool::Symbol (default :default):
    • Possible options are :default and :interactive.
    • The high-priority pool :interactive should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without yielding as it can interfere with heartbeat processes.
source
OhMyThreads.Schedulers.StaticSchedulerType

A static low-overhead scheduler. Divides the given collection into chunks and then spawns a task per chunk to perform the requested operation in parallel. The tasks are statically assigned to threads up front and are made sticky, that is, they are guaranteed to stay on the assigned threads (no task migration).

Can sometimes be more performant than DynamicScheduler when the workload is (close to) uniform and, because of the lower overhead, for small workloads. Isn't well composable with other multithreaded code though.

Keyword arguments:

  • nchunks::Integer (default nthreads()):
    • Determines the number of chunks (and thus also the number of parallel tasks).
    • Setting nchunks < nthreads() is an effective way to use only a subset of the available threads.
    • Currently, nchunks > nthreads() isn't officialy supported but, for now, will fall back to nchunks = nthreads().
  • split::Symbol (default :batch):
    • Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements.
    • See ChunkSplitters.jl for more details and available options.
source
OhMyThreads.Schedulers.GreedySchedulerType

A greedy dynamic scheduler. The elements of the collection are first put into a Channel and then dynamic, non-sticky tasks are spawned to process channel content in parallel.

Note that elements are processed in a non-deterministic order, and thus a potential reducing function must be commutative in addition to being associative, or you could get incorrect results!

Can be good choice for load-balancing slower, uneven computations, but does carry some additional overhead.

Keyword arguments:

  • ntasks::Int (default nthreads()):
    • Determines the number of parallel tasks to be spawned.
    • Setting nchunks < nthreads() is an effective way to use only a subset of the available threads.
source

Non-Exported

OhMyThreads.@spawnsee StableTasks.jl
OhMyThreads.@spawnatsee StableTasks.jl
OhMyThreads.chunkssee ChunkSplitters.jl
OhMyThreads.TaskLocalValuesee TaskLocalValues.jl
diff --git a/previews/PR48/refs/internal/index.html b/previews/PR48/refs/internal/index.html index 92252b18..b17b932f 100644 --- a/previews/PR48/refs/internal/index.html +++ b/previews/PR48/refs/internal/index.html @@ -1,2 +1,2 @@ -Internal · OhMyThreads.jl
+Internal · OhMyThreads.jl
diff --git a/previews/PR48/translation/index.html b/previews/PR48/translation/index.html index 2077990f..5c4a3d34 100644 --- a/previews/PR48/translation/index.html +++ b/previews/PR48/translation/index.html @@ -43,4 +43,4 @@ data[i] = calc(i) end
# OhMyThreads: Variant 1
 data = tmap(i->calc(i), 1:10)
# OhMyThreads: Variant 2
-data = tcollect(calc(i) for i in 1:10)
+data = tcollect(calc(i) for i in 1:10)