diff --git a/BUILD b/BUILD index 063f6d8e9e..a1f4a9da8b 100644 --- a/BUILD +++ b/BUILD @@ -360,6 +360,9 @@ cc_library( cc_library( name = "thread_pool", + srcs = [ + "hwy/contrib/thread_pool/thread_pool.cc", + ], hdrs = [ "hwy/contrib/thread_pool/futex.h", "hwy/contrib/thread_pool/spin.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 76db52436a..65139b71ae 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -240,6 +240,7 @@ list(APPEND HWY_CONTRIB_SOURCES hwy/contrib/sort/vqsort.h hwy/contrib/thread_pool/futex.h hwy/contrib/thread_pool/spin.h + hwy/contrib/thread_pool/thread_pool.cc hwy/contrib/thread_pool/thread_pool.h hwy/contrib/thread_pool/topology.cc hwy/contrib/thread_pool/topology.h diff --git a/hwy/contrib/thread_pool/thread_pool.cc b/hwy/contrib/thread_pool/thread_pool.cc new file mode 100644 index 0000000000..18dd763f25 --- /dev/null +++ b/hwy/contrib/thread_pool/thread_pool.cc @@ -0,0 +1,31 @@ +// Copyright 2025 Google LLC +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "hwy/contrib/thread_pool/thread_pool.h" + +#include "hwy/highway_export.h" + +namespace hwy { +namespace pool { + +// TODO: move implementation here. + +HWY_DLLEXPORT Shared& Shared::Get() { + static Shared* shared = new Shared(); + return *shared; +} + +} // namespace pool +} // namespace hwy diff --git a/hwy/contrib/thread_pool/thread_pool.h b/hwy/contrib/thread_pool/thread_pool.h index 1bd891625d..7f647820b8 100644 --- a/hwy/contrib/thread_pool/thread_pool.h +++ b/hwy/contrib/thread_pool/thread_pool.h @@ -23,6 +23,7 @@ #include #include #include // snprintf +#include #include #include @@ -45,8 +46,11 @@ #include #endif -// Define to HWY_NOINLINE to see profiles of `WorkerRun*` and waits. -#define HWY_POOL_PROFILE +#if PROFILER_ENABLED +#include // std::sort + +#include "hwy/bit_set.h" +#endif namespace hwy { @@ -278,16 +282,7 @@ struct Config { // 4 bytes }; static_assert(sizeof(Config) == 4, ""); -struct Zones { - explicit Zones(Profiler& profiler) - : run_and_tune(profiler.AddZone("Pool.Run+Tune")), - run(profiler.AddZone("Pool.Run\\Tune")) {} - - profiler::ZoneHandle run_and_tune; - profiler::ZoneHandle run; -}; - -#if PROFILER_ENABLED || HWY_IDE +#if PROFILER_ENABLED // Accumulates timings and stats from main thread and workers. class Stats { @@ -355,7 +350,7 @@ class Stats { const size_t thread_idx = worker_idx - 1; HWY_DASSERT(PerThread(thread_idx, kDWait) == 0); HWY_DASSERT(PerThread(thread_idx, kWaitReps) == 0); - HWY_DASSERT(PerThread(thread_idx, kBeforeRun) == 0); + HWY_DASSERT(PerThread(thread_idx, kTBeforeRun) == 0); HWY_DASSERT(PerThread(thread_idx, kDRun) == 0); PerThread(thread_idx, kDWait) = d_wait; PerThread(thread_idx, kWaitReps) = wait_reps; @@ -542,6 +537,208 @@ class Stats { // Enables shift rather than multiplication. static_assert(sizeof(Stats) == (kMaxThreads + 1) * HWY_ALIGNMENT, "Wrong size"); +// Non-power of two to avoid 2K aliasing. +HWY_INLINE_VAR constexpr size_t kMaxCallers = 60; + +// Per-caller stats, stored in `PerCluster`. +class CallerAccumulator { + public: + bool Any() const { return calls_ != 0; } + + void Add(size_t tasks, size_t workers, bool is_root, timer::Ticks wait_before, + timer::Ticks elapsed) { + calls_++; + root_ += is_root; + workers_ += workers; + min_tasks_ = HWY_MIN(min_tasks_, tasks); + max_tasks_ = HWY_MAX(max_tasks_, tasks); + tasks_ += tasks; + wait_before_ += wait_before; + elapsed_ += elapsed; + } + + void AddFrom(const CallerAccumulator& other) { + calls_ += other.calls_; + root_ += other.root_; + workers_ += other.workers_; + min_tasks_ = HWY_MIN(min_tasks_, other.min_tasks_); + max_tasks_ = HWY_MAX(max_tasks_, other.max_tasks_); + tasks_ += other.tasks_; + wait_before_ += other.wait_before_; + elapsed_ += other.elapsed_; + } + + bool operator>(const CallerAccumulator& other) const { + return elapsed_ > other.elapsed_; + } + + void PrintAndReset(const char* caller, size_t active_clusters) { + if (!Any()) return; + HWY_ASSERT(root_ <= calls_); + const double inv_calls = 1.0 / static_cast(calls_); + const double pc_root = static_cast(root_) * inv_calls * 100.0; + const double avg_workers = static_cast(workers_) * inv_calls; + const double avg_tasks = static_cast(tasks_) * inv_calls; + const double avg_tasks_per_worker = avg_tasks / avg_workers; + const double inv_freq = 1.0 / platform::InvariantTicksPerSecond(); + const double sum_wait_before = static_cast(wait_before_) * inv_freq; + const double avg_wait_before = + root_ ? sum_wait_before / static_cast(root_) : 0.0; + const double elapsed = static_cast(elapsed_) * inv_freq; + const double avg_elapsed = elapsed * inv_calls; + const double task_len = avg_elapsed / avg_tasks_per_worker; + printf( + "%40s: %7.0f x (%3.0f%%) %2zu clusters, %4.1f workers @ " + "%5.1f tasks (%u-%u), " + "%5.0f us wait, %6.1E us run (task len %6.1E us), total %6.2f s\n", + caller, static_cast(calls_), pc_root, active_clusters, + avg_workers, avg_tasks_per_worker, static_cast(min_tasks_), + static_cast(max_tasks_), avg_wait_before * 1E6, + avg_elapsed * 1E6, task_len * 1E6, elapsed); + *this = CallerAccumulator(); + } + + // For the grand total, only print calls and elapsed because averaging the + // the other stats is not very useful. No need to reset because this is called + // on a temporary. + void PrintTotal() { + if (!Any()) return; + HWY_ASSERT(root_ <= calls_); + const double elapsed = + static_cast(elapsed_) / platform::InvariantTicksPerSecond(); + printf("TOTAL: %7.0f x run %6.2f s\n", static_cast(calls_), + elapsed); + } + + private: + int64_t calls_ = 0; + int64_t root_ = 0; + uint64_t workers_ = 0; + uint64_t min_tasks_ = ~uint64_t{0}; + uint64_t max_tasks_ = 0; + uint64_t tasks_ = 0; + // both are wall time for root Run, otherwise CPU time. + timer::Ticks wait_before_ = 0; + timer::Ticks elapsed_ = 0; +}; +static_assert(sizeof(CallerAccumulator) == 64, ""); + +class PerCluster { + public: + CallerAccumulator& Get(size_t caller_idx) { + HWY_DASSERT(caller_idx < kMaxCallers); + callers_.Set(caller_idx); + return accumulators_[caller_idx]; + } + + template + void ForeachCaller(Func&& func) { + callers_.Foreach([&](size_t caller_idx) { + func(caller_idx, accumulators_[caller_idx]); + }); + } + + // Returns indices (required for `StringTable::Name`) in descending order of + // elapsed time. + std::vector Sorted() { + std::vector vec; + vec.reserve(kMaxCallers); + ForeachCaller([&](size_t caller_idx, CallerAccumulator&) { + vec.push_back(caller_idx); + }); + std::sort(vec.begin(), vec.end(), [&](size_t a, size_t b) { + return accumulators_[a] > accumulators_[b]; + }); + return vec; + } + + // Caller takes care of resetting `accumulators_`. + void ResetBits() { callers_ = hwy::BitSet(); } + + private: + CallerAccumulator accumulators_[kMaxCallers]; + hwy::BitSet callers_; +}; + +// Type-safe wrapper. +class Caller { + public: + Caller() : idx_(0) {} // `AddCaller` never returns 0. + explicit Caller(size_t idx) : idx_(idx) { HWY_DASSERT(idx < kMaxCallers); } + size_t Idx() const { return idx_; } + + private: + size_t idx_; +}; + +// Singleton, shared by all ThreadPool. +class Shared { + public: + static HWY_DLLEXPORT Shared& Get(); // Thread-safe. + + Stopwatch MakeStopwatch() const { return Stopwatch(timer_); } + Stopwatch& LastRootEnd() { return last_root_end_; } + + // Thread-safe. Calls with the same `name` return the same `Caller`. + Caller AddCaller(const char* name) { return Caller(callers_.Add(name)); } + + PerCluster& Cluster(size_t cluster_idx) { + HWY_DASSERT(cluster_idx < kMaxClusters); + return per_cluster_[cluster_idx]; + } + + // Called from the main thread via `Profiler::PrintResults`. + void PrintAndReset() { + // Start counting pools (= one per cluster) invoked by each caller. + size_t active_clusters[kMaxCallers] = {}; + per_cluster_[0].ForeachCaller( + [&](size_t caller_idx, CallerAccumulator& acc) { + active_clusters[caller_idx] = acc.Any(); + }); + // Reduce per-cluster accumulators into the first cluster. + for (size_t cluster_idx = 1; cluster_idx < kMaxClusters; ++cluster_idx) { + per_cluster_[cluster_idx].ForeachCaller( + [&](size_t caller_idx, CallerAccumulator& acc) { + active_clusters[caller_idx] += acc.Any(); + per_cluster_[0].Get(caller_idx).AddFrom(acc); + acc = CallerAccumulator(); + }); + per_cluster_[cluster_idx].ResetBits(); + } + + CallerAccumulator total; + for (size_t caller_idx : per_cluster_[0].Sorted()) { + CallerAccumulator& acc = per_cluster_[0].Get(caller_idx); + total.AddFrom(acc); // must be before PrintAndReset. + acc.PrintAndReset(callers_.Name(caller_idx), active_clusters[caller_idx]); + } + total.PrintTotal(); + per_cluster_[0].ResetBits(); + } + + private: + Shared() // called via Get(). + : last_root_end_(timer_), + send_config(callers_.Add("SendConfig")), + dtor(callers_.Add("PoolDtor")), + print_stats(callers_.Add("PrintStats")) { + Profiler::Get().AddFunc(this, [this]() { PrintAndReset(); }); + // Can skip `RemoveFunc` because the singleton never dies. + } + + const Timer timer_; + Stopwatch last_root_end_; + + PerCluster per_cluster_[kMaxClusters]; + StringTable callers_; + + public: + // Returned from `callers_.Add`: + Caller send_config; + Caller dtor; + Caller print_stats; +}; + #else struct Stats { @@ -555,18 +752,29 @@ struct Stats { void Reset(size_t = kMaxThreads) {} }; -#endif // PROFILER_ENABLED +struct Caller {}; + +class Shared { + public: + static HWY_DLLEXPORT Shared& Get(); // Thread-safe. + + Stopwatch MakeStopwatch() const { return Stopwatch(timer_); } + + Caller AddCaller(const char*) { return Caller(); } -// For shorter argument lists. Used by `Worker`, owned by `ThreadPool`. -struct Shared { - Shared() : profiler(Profiler::Get()), zones(profiler) {} + private: + Shared() {} + + const Timer timer_; - Profiler& profiler; - const Timer timer; - const pool::Zones zones; - alignas(HWY_ALIGNMENT) Stats stats; + public: + Caller send_config; + Caller dtor; + Caller print_stats; }; +#endif // PROFILER_ENABLED + // Per-worker state used by both main and worker threads. `ThreadFunc` // (threads) and `ThreadPool` (main) have a few additional members of their own. class alignas(HWY_ALIGNMENT) Worker { // HWY_ALIGNMENT bytes @@ -593,12 +801,11 @@ class alignas(HWY_ALIGNMENT) Worker { // HWY_ALIGNMENT bytes public: Worker(const size_t worker, const size_t num_threads, const PoolWorkerMapping mapping, const Divisor64& div_workers, - Shared& shared) + const Stopwatch& stopwatch) : workers_(this - worker), worker_(worker), num_threads_(num_threads), - shared_(shared), - stopwatch_(shared_.timer), + stopwatch_(stopwatch), // If `num_threads == 0`, we might be in an inner pool and must use // the `global_idx` we are currently running on. global_idx_(num_threads == 0 ? Profiler::GlobalIdx() : mapping(worker)), @@ -623,7 +830,7 @@ class alignas(HWY_ALIGNMENT) Worker { // HWY_ALIGNMENT bytes HWY_IF_CONSTEXPR(PROFILER_ENABLED) { if (HWY_LIKELY(OwnsGlobalIdx())) { - shared_.profiler.ReserveWorker(global_idx_); + Profiler::Get().ReserveWorker(global_idx_); } } } @@ -631,7 +838,7 @@ class alignas(HWY_ALIGNMENT) Worker { // HWY_ALIGNMENT bytes ~Worker() { HWY_IF_CONSTEXPR(PROFILER_ENABLED) { if (HWY_LIKELY(OwnsGlobalIdx())) { - shared_.profiler.FreeWorker(global_idx_); + Profiler::Get().FreeWorker(global_idx_); } } } @@ -647,9 +854,7 @@ class alignas(HWY_ALIGNMENT) Worker { // HWY_ALIGNMENT bytes size_t NumThreads() const { return num_threads_; } size_t GlobalIdx() const { return global_idx_; } - Stopwatch MakeStopwatch() { return Stopwatch(shared_.timer); } - Profiler& GetProfiler() const { return shared_.profiler; } - Stats& GetStats() const { return shared_.stats; } + size_t ClusterIdx() const { return cluster_idx_; } void SetStartTime() { stopwatch_.Reset(); } timer::Ticks ElapsedTime() { return stopwatch_.Elapsed(); } @@ -716,7 +921,6 @@ class alignas(HWY_ALIGNMENT) Worker { // HWY_ALIGNMENT bytes const size_t worker_; const size_t num_threads_; - Shared& shared_; // must be before global_idx_. Stopwatch stopwatch_; // Reset by `SetStartTime`. const size_t global_idx_; const size_t cluster_idx_; @@ -749,11 +953,11 @@ class WorkerLifecycle { // 0 bytes static Worker* Init(uint8_t* storage, size_t num_threads, PoolWorkerMapping mapping, const Divisor64& div_workers, Shared& shared) { - Worker* workers = - new (storage) Worker(0, num_threads, mapping, div_workers, shared); + Worker* workers = new (storage) + Worker(0, num_threads, mapping, div_workers, shared.MakeStopwatch()); for (size_t worker = 1; worker <= num_threads; ++worker) { - new (Addr(storage, worker)) - Worker(worker, num_threads, mapping, div_workers, shared); + new (Addr(storage, worker)) Worker(worker, num_threads, mapping, + div_workers, shared.MakeStopwatch()); // Ensure pointer arithmetic is the same (will be used in Destroy). HWY_DASSERT(reinterpret_cast(workers + worker) == reinterpret_cast(Addr(storage, worker))); @@ -829,17 +1033,18 @@ class Tasks { } // Runs the worker's assigned range of tasks, plus work stealing if needed. - HWY_POOL_PROFILE void WorkerRun(Worker* worker) const { + void WorkerRun(Worker* worker, const Shared& shared, Stats& stats) const { if (NumTasks() > worker->NumThreads() + 1) { - WorkerRunDynamic(worker); + WorkerRunDynamic(worker, shared, stats); } else { - WorkerRunStatic(worker); + WorkerRunStatic(worker, shared, stats); } } private: // Special case for <= 1 task per worker, where stealing is unnecessary. - void WorkerRunStatic(Worker* worker) const { + void WorkerRunStatic(Worker* worker, const Shared& shared, + Stats& stats) const { const uint64_t begin = begin_.load(kAcq); const uint64_t end = end_.load(kAcq); HWY_DASSERT(begin <= end); @@ -850,9 +1055,9 @@ class Tasks { if (HWY_LIKELY(task < end)) { const void* opaque = Opaque(); const RunFunc func = Func(); - Stopwatch stopwatch = worker->MakeStopwatch(); + Stopwatch stopwatch = shared.MakeStopwatch(); func(opaque, task, index); - worker->GetStats().NotifyRunStatic(index, stopwatch.Elapsed()); + stats.NotifyRunStatic(index, stopwatch.Elapsed()); } } @@ -868,7 +1073,8 @@ class Tasks { // and perform work from others, as if they were that worker. This deals with // imbalances as they arise, but care is required to reduce contention. We // randomize the order in which threads choose victims to steal from. - HWY_POOL_PROFILE void WorkerRunDynamic(Worker* worker) const { + void WorkerRunDynamic(Worker* worker, const Shared& shared, + Stats& stats) const { Worker* workers = worker->AllWorkers(); const size_t index = worker->Index(); const RunFunc func = Func(); @@ -893,7 +1099,7 @@ class Tasks { hwy::Pause(); // Reduce coherency traffic while stealing. break; } - Stopwatch stopwatch = worker->MakeStopwatch(); + Stopwatch stopwatch = shared.MakeStopwatch(); // Pass the index we are actually running on; this is important // because it is the TLS index for user code. func(opaque, task, index); @@ -902,8 +1108,7 @@ class Tasks { sum_d_func += stopwatch.Elapsed(); } } - worker->GetStats().NotifyRunDynamic(index, sum_tasks, sum_stolen, - sum_d_func); + stats.NotifyRunDynamic(index, sum_tasks, sum_stolen, sum_d_func); } size_t NumTasks() const { @@ -1116,7 +1321,7 @@ class alignas(HWY_ALIGNMENT) ThreadPool { PoolWorkerMapping mapping = PoolWorkerMapping()) : num_threads_(ClampedNumThreads(num_threads)), div_workers_(1 + num_threads_), - shared_(), // on first call, calls ReserveWorker(0)! + shared_(pool::Shared::Get()), // on first call, calls ReserveWorker(0)! workers_(pool::WorkerLifecycle::Init(worker_bytes_, num_threads_, mapping, div_workers_, shared_)) { // Leaves the default wait mode as `kBlock`, which means futex, because @@ -1130,12 +1335,13 @@ class alignas(HWY_ALIGNMENT) ThreadPool { // Skip empty pools because they do not update stats anyway. if (num_threads_ > 0) { - shared_.profiler.AddFunc(this, [this]() { PrintStats(); }); + Profiler::Get().AddFunc(this, [this]() { PrintStats(); }); } threads_.reserve(num_threads_); for (size_t thread = 0; thread < num_threads_; ++thread) { - threads_.emplace_back(ThreadFunc(workers_[1 + thread], tasks_, shared_)); + threads_.emplace_back( + ThreadFunc(workers_[1 + thread], tasks_, shared_, stats_)); } // Threads' `Config` defaults to spinning. Change to `kBlock` (see above). @@ -1150,7 +1356,8 @@ class alignas(HWY_ALIGNMENT) ThreadPool { // Windows, otherwise we could call that from `Run`. Instead, we must cause // the thread to wake up and exit. We can just use `Run`. (void)RunWithoutAutotune( - 0, NumWorkers(), [this](HWY_MAYBE_UNUSED uint64_t task, size_t worker) { + 0, NumWorkers(), shared_.dtor, + [this](HWY_MAYBE_UNUSED uint64_t task, size_t worker) { HWY_DASSERT(task == worker); workers_[worker].SetExit(Exit::kThread); }); @@ -1161,7 +1368,7 @@ class alignas(HWY_ALIGNMENT) ThreadPool { } if (num_threads_ > 0) { - shared_.profiler.RemoveFunc(this); + Profiler::Get().RemoveFunc(this); } pool::WorkerLifecycle::Destroy(workers_, num_threads_); @@ -1192,6 +1399,10 @@ class alignas(HWY_ALIGNMENT) ThreadPool { bool AutoTuneComplete() const { return AutoTuner().Best(); } Span AutoTuneCosts() { return AutoTuner().Costs(); } + static pool::Caller AddCaller(const char* name) { + return pool::Shared::Get().AddCaller(name); + } + // parallel-for: Runs `closure(task, worker)` on workers for every `task` in // `[begin, end)`. Note that the unit of work should be large enough to // amortize the function call overhead, but small enough that each worker @@ -1200,24 +1411,20 @@ class alignas(HWY_ALIGNMENT) ThreadPool { // Not thread-safe - concurrent parallel-for in the same `ThreadPool` are // forbidden unless `NumWorkers() == 1` or `end <= begin + 1`. template - void Run(uint64_t begin, uint64_t end, const Closure& closure) { - const profiler::Zone zone(workers_[0].GetProfiler(), - workers_[0].GlobalIdx(), - shared_.zones.run_and_tune); - (void)zone; - + void Run(uint64_t begin, uint64_t end, pool::Caller caller, + const Closure& closure) { AutoTuneT& auto_tuner = AutoTuner(); // Already finished tuning: run without time measurement. if (HWY_LIKELY(auto_tuner.Best())) { // Don't care whether threads ran, we are done either way. - (void)RunWithoutAutotune(begin, end, closure); + (void)RunWithoutAutotune(begin, end, caller, closure); return; } // Not yet finished: measure time and notify autotuner. - Stopwatch stopwatch(shared_.timer); + Stopwatch stopwatch(shared_.MakeStopwatch()); // Skip update if threads didn't actually run. - if (!RunWithoutAutotune(begin, end, closure)) return; + if (!RunWithoutAutotune(begin, end, caller, closure)) return; auto_tuner.NotifyCost(stopwatch.Elapsed()); pool::Config next = auto_tuner.NextConfig(); // may be overwritten below @@ -1249,14 +1456,19 @@ class alignas(HWY_ALIGNMENT) ThreadPool { SendConfig(next); } + // Backward-compatible version without Caller. + template + void Run(uint64_t begin, uint64_t end, const Closure& closure) { + Run(begin, end, pool::Caller(), closure); + } + private: // Called via `CallWithConfig`. struct MainWakeAndBarrier { template - HWY_POOL_PROFILE void operator()(const Spin& spin, const Wait& wait, - pool::Worker& main, - const pool::Tasks& tasks, - pool::Shared& shared) const { + void operator()(const Spin& spin, const Wait& wait, pool::Worker& main, + const pool::Tasks& tasks, const pool::Shared& shared, + pool::Stats& stats) const { const pool::Barrier barrier; pool::Worker* workers = main.AllWorkers(); HWY_DASSERT(&main == main.AllWorkers()); // main is first. @@ -1269,21 +1481,21 @@ class alignas(HWY_ALIGNMENT) ThreadPool { } } - Stopwatch stopwatch(shared.timer); + Stopwatch stopwatch(shared.MakeStopwatch()); const timer::Ticks t_before_wake = stopwatch.Origin(); wait.WakeWorkers(workers, epoch); const timer::Ticks d_wake = stopwatch.Elapsed(); // Also perform work on the main thread before the barrier. - tasks.WorkerRun(&main); + tasks.WorkerRun(&main, shared, stats); const timer::Ticks d_run = stopwatch.Elapsed(); // Spin-waits until all worker *threads* (not `main`, because it already // knows it is here) called `WorkerReached`. barrier.UntilReached(num_threads, workers, spin, epoch); const timer::Ticks d_barrier = stopwatch.Elapsed(); - shared.stats.NotifyMainRun(main.NumThreads(), t_before_wake, d_wake, - d_run, d_barrier); + stats.NotifyMainRun(main.NumThreads(), t_before_wake, d_wake, d_run, + d_barrier); HWY_IF_CONSTEXPR(HWY_IS_DEBUG_BUILD) { for (size_t i = 0; i < 1 + num_threads; ++i) { @@ -1296,29 +1508,29 @@ class alignas(HWY_ALIGNMENT) ThreadPool { } }; - // Called by `std::thread`. Could also be a lambda, but annotating with - // `HWY_POOL_PROFILE` makes it easier to inspect the generated code. + // Called by `std::thread`. Could also be a lambda. class ThreadFunc { // Functor called by `CallWithConfig`. Loops until `SendConfig` changes the // Spin or Wait policy or the pool is destroyed. struct WorkerLoop { template void operator()(const Spin& spin, const Wait& wait, pool::Worker& worker, - pool::Tasks& tasks, pool::Shared& shared) const { + pool::Tasks& tasks, const pool::Shared& shared, + pool::Stats& stats) const { do { // Main worker also calls this, so their epochs match. const uint32_t epoch = worker.AdvanceWorkerEpoch(); - Stopwatch stopwatch = worker.MakeStopwatch(); + Stopwatch stopwatch(shared.MakeStopwatch()); const size_t wait_reps = wait.UntilWoken(worker, spin); const timer::Ticks d_wait = stopwatch.Elapsed(); const timer::Ticks t_before_run = stopwatch.Origin(); - tasks.WorkerRun(&worker); + tasks.WorkerRun(&worker, shared, stats); const timer::Ticks d_run = stopwatch.Elapsed(); - shared.stats.NotifyThreadRun(worker.Index(), d_wait, wait_reps, - t_before_run, d_run); + stats.NotifyThreadRun(worker.Index(), d_wait, wait_reps, t_before_run, + d_run); // Notify barrier after `WorkerRun`. Note that we cannot send an // after-barrier timestamp, see above. @@ -1329,10 +1541,11 @@ class alignas(HWY_ALIGNMENT) ThreadPool { }; public: - ThreadFunc(pool::Worker& worker, pool::Tasks& tasks, pool::Shared& shared) - : worker_(worker), tasks_(tasks), shared_(shared) {} + ThreadFunc(pool::Worker& worker, pool::Tasks& tasks, + const pool::Shared& shared, pool::Stats& stats) + : worker_(worker), tasks_(tasks), shared_(shared), stats_(stats) {} - HWY_POOL_PROFILE void operator()() { + void operator()() { // Ensure main thread's writes are visible (synchronizes with fence in // `WorkerLifecycle::Init`). std::atomic_thread_fence(std::memory_order_acquire); @@ -1341,7 +1554,8 @@ class alignas(HWY_ALIGNMENT) ThreadPool { SetThreadName("worker%03zu", static_cast(worker_.Index() - 1)); worker_.SetStartTime(); - worker_.GetProfiler().SetGlobalIdx(worker_.GlobalIdx()); + Profiler& profiler = Profiler::Get(); + profiler.SetGlobalIdx(worker_.GlobalIdx()); // No Zone here because it would only exit after `GetExit`, which may be // after the main thread's `PROFILER_END_ROOT_RUN`, and thus too late to // be counted. Instead, `ProfilerFunc` records the elapsed time. @@ -1350,14 +1564,14 @@ class alignas(HWY_ALIGNMENT) ThreadPool { for (;;) { // Uses the initial config, or the last one set during WorkerRun. CallWithConfig(worker_.NextConfig(), WorkerLoop(), worker_, tasks_, - shared_); + shared_, stats_); // Exit or reset the flag and return to WorkerLoop with a new config. if (worker_.GetExit() == Exit::kThread) break; worker_.SetExit(Exit::kNone); } - worker_.GetProfiler().SetGlobalIdx(~size_t{0}); + profiler.SetGlobalIdx(~size_t{0}); // Defer `FreeWorker` until workers are destroyed to ensure the profiler // is not still using the worker. @@ -1366,14 +1580,15 @@ class alignas(HWY_ALIGNMENT) ThreadPool { private: pool::Worker& worker_; pool::Tasks& tasks_; - pool::Shared& shared_; + const pool::Shared& shared_; + pool::Stats& stats_; }; void PrintStats() { // Total run time from all non-main threads. std::atomic sum_thread_elapsed{0}; (void)RunWithoutAutotune( - 0, NumWorkers(), + 0, NumWorkers(), shared_.print_stats, [this, &sum_thread_elapsed](HWY_MAYBE_UNUSED uint64_t task, size_t worker) { HWY_DASSERT(task == worker); @@ -1384,18 +1599,15 @@ class alignas(HWY_ALIGNMENT) ThreadPool { }); const timer::Ticks thread_total = sum_thread_elapsed.load(std::memory_order_acquire); - shared_.stats.PrintAndReset(num_threads_, thread_total); + stats_.PrintAndReset(num_threads_, thread_total); } // Returns whether threads were used. If not, there is no need to update // the autotuner config. template - bool RunWithoutAutotune(uint64_t begin, uint64_t end, + bool RunWithoutAutotune(uint64_t begin, uint64_t end, pool::Caller caller, const Closure& closure) { pool::Worker& main = workers_[0]; - const profiler::Zone zone(main.GetProfiler(), main.GlobalIdx(), - shared_.zones.run); - (void)zone; const size_t num_tasks = static_cast(end - begin); const size_t num_workers = NumWorkers(); @@ -1410,7 +1622,13 @@ class alignas(HWY_ALIGNMENT) ThreadPool { } busy_.Set(); + +#if PROFILER_ENABLED const bool is_root = PROFILER_IS_ROOT_RUN(); + Stopwatch stopwatch(shared_.MakeStopwatch()); + const timer::Ticks wait_before = + is_root ? shared_.LastRootEnd().Elapsed() : 0; +#endif tasks_.Set(begin, end, closure); @@ -1420,11 +1638,19 @@ class alignas(HWY_ALIGNMENT) ThreadPool { } // Runs `MainWakeAndBarrier` with the first worker slot. - CallWithConfig(config(), MainWakeAndBarrier(), main, tasks_, shared_); + CallWithConfig(config(), MainWakeAndBarrier(), main, tasks_, shared_, + stats_); +#if PROFILER_ENABLED + pool::CallerAccumulator& acc = + shared_.Cluster(main.ClusterIdx()).Get(caller.Idx()); + acc.Add(num_tasks, num_workers, is_root, wait_before, stopwatch.Elapsed()); if (is_root) { PROFILER_END_ROOT_RUN(); + shared_.LastRootEnd().Reset(); } +#endif + busy_.Clear(); return true; } @@ -1436,7 +1662,7 @@ class alignas(HWY_ALIGNMENT) ThreadPool { // using `next_config`. HWY_NOINLINE void SendConfig(pool::Config next_config) { (void)RunWithoutAutotune( - 0, NumWorkers(), + 0, NumWorkers(), shared_.send_config, [this, next_config](HWY_MAYBE_UNUSED uint64_t task, size_t worker) { HWY_DASSERT(task == worker); // one task per worker workers_[worker].SetNextConfig(next_config); @@ -1459,9 +1685,11 @@ class alignas(HWY_ALIGNMENT) ThreadPool { const size_t num_threads_; // not including main thread const Divisor64 div_workers_; - pool::Shared shared_; // passed to Worker ctor + pool::Shared& shared_; pool::Worker* const workers_; // points into `worker_bytes_` + alignas(HWY_ALIGNMENT) pool::Stats stats_; + // This is written by the main thread and read by workers, via reference // passed to `ThreadFunc`. Padding ensures that the workers' cache lines are // not unnecessarily invalidated when the main thread writes other members. diff --git a/hwy/contrib/thread_pool/thread_pool_test.cc b/hwy/contrib/thread_pool/thread_pool_test.cc index ba5fb02503..748535fe0c 100644 --- a/hwy/contrib/thread_pool/thread_pool_test.cc +++ b/hwy/contrib/thread_pool/thread_pool_test.cc @@ -237,7 +237,7 @@ TEST(ThreadPoolTest, TestWaiter) { auto storage = hwy::AllocateAligned(num_workers * sizeof(Worker)); HWY_ASSERT(storage); const Divisor64 div_workers(num_workers); - Shared shared; // already calls ReserveWorker(0). + Shared& shared = Shared::Get(); // already calls ReserveWorker(0). for (WaitType wait_type : {WaitType::kBlock, WaitType::kSpin1, WaitType::kSpinSeparate}) { @@ -269,7 +269,8 @@ TEST(ThreadPoolTest, TestTasks) { auto storage = hwy::AllocateAligned(num_workers * sizeof(Worker)); HWY_ASSERT(storage); const Divisor64 div_workers(num_workers); - Shared shared; + Shared& shared = Shared::Get(); + Stats stats; Worker* workers = WorkerLifecycle::Init( storage.get(), num_threads, PoolWorkerMapping(), div_workers, shared); @@ -293,7 +294,7 @@ TEST(ThreadPoolTest, TestTasks) { Tasks::DivideRangeAmongWorkers(begin, end, div_workers, workers); // The `tasks < workers` special case requires running by all workers. for (size_t worker = 0; worker < num_workers; ++worker) { - tasks.WorkerRun(workers + worker); + tasks.WorkerRun(workers + worker, shared, stats); } // Ensure all tasks were run. diff --git a/hwy/profiler.cc b/hwy/profiler.cc index c7a6b0d585..8855d48366 100644 --- a/hwy/profiler.cc +++ b/hwy/profiler.cc @@ -139,8 +139,8 @@ Profiler::Profiler() { // Even if disabled, we want to export the symbol. HWY_DLLEXPORT Profiler& Profiler::Get() { - static Profiler profiler; - return profiler; + static Profiler* profiler = new Profiler(); + return *profiler; } } // namespace hwy diff --git a/hwy/profiler.h b/hwy/profiler.h index 8e7472b2bb..ba1b7fa6a4 100644 --- a/hwy/profiler.h +++ b/hwy/profiler.h @@ -17,9 +17,12 @@ #include #include +#include // strcmp, strlen +#include #include +#include "hwy/base.h" #include "hwy/highway_export.h" // High precision, low overhead time measurements. Returns exact call counts and @@ -49,15 +52,12 @@ #if PROFILER_ENABLED #include -#include // strcmp, strlen #include // std::sort -#include #include #include #include "hwy/aligned_allocator.h" -#include "hwy/base.h" #include "hwy/bit_set.h" #include "hwy/timer.h" #endif // PROFILER_ENABLED @@ -78,6 +78,75 @@ enum class ProfilerFlags : uint32_t { // Called during `PrintResults` to print results from other modules. using ProfilerFunc = std::function; +template +class StringTable { + static constexpr std::memory_order kRelaxed = std::memory_order_relaxed; + static constexpr std::memory_order kAcq = std::memory_order_acquire; + static constexpr std::memory_order kRel = std::memory_order_release; + + public: + // Returns a copy of the `name` passed to `Add` that returned the + // given `idx`. + const char* Name(size_t idx) const { + // `kRelaxed` is sufficient because pointers are immutable once published + // via a `kRelease` store. + return ptrs_[idx].load(kRelaxed); + } + + // Returns `idx < kMaxStrings`. Can be called concurrently. Calls with the + // same `name` return the same `idx`. + size_t Add(const char* name) { + // Linear search if it already exists. `kAcq` ensures we see prior stores. + const size_t num_strings = next_ptr_.load(kAcq); + HWY_ASSERT(num_strings < kMaxStrings); + for (size_t idx = 1; idx < num_strings; ++idx) { + const char* existing = ptrs_[idx].load(kAcq); + // `next_ptr_` was published after writing `ptr_`, hence it is non-null. + HWY_ASSERT(existing != nullptr); + if (HWY_UNLIKELY(!strcmp(existing, name))) { + return idx; + } + } + + // Copy `name` into `chars_` before publishing the pointer. + const size_t len = strlen(name) + 1; + const size_t pos = next_char_.fetch_add(len, kRelaxed); + HWY_ASSERT(pos + len <= sizeof(chars_)); + strcpy(chars_ + pos, name); // NOLINT + + for (;;) { + size_t idx = next_ptr_.load(kRelaxed); + HWY_ASSERT(idx < kMaxStrings); + + // Attempt to claim the next `idx` via CAS. + const char* expected = nullptr; + if (HWY_LIKELY(ptrs_[idx].compare_exchange_weak(expected, chars_ + pos, + kRel, kRelaxed))) { + // Publish the new count and make the `ptrs_` write visible. + next_ptr_.store(idx + 1, kRel); + HWY_DASSERT(!strcmp(Name(idx), name)); + return idx; + } + + // We lost the race. `expected` has been updated. + if (HWY_UNLIKELY(!strcmp(expected, name))) { + // Done, another thread added the same name. Note that we waste the + // extra space in `chars_`, which is fine because it is rare. + HWY_DASSERT(!strcmp(Name(idx), name)); + return idx; + } + + // Other thread added a different name. Retry with the next slot. + } + } + + private: + std::atomic ptrs_[kMaxStrings]; + std::atomic next_ptr_{1}; // next idx + std::atomic next_char_{0}; + char chars_[kMaxStrings * 55]; +}; + #if PROFILER_ENABLED // Implementation details. @@ -141,72 +210,21 @@ class ZoneHandle { // Storage for zone names. class Zones { - static constexpr std::memory_order kRelaxed = std::memory_order_relaxed; - static constexpr std::memory_order kAcq = std::memory_order_acquire; - static constexpr std::memory_order kRel = std::memory_order_release; - public: // Returns a copy of the `name` passed to `AddZone` that returned the // given `zone`. const char* Name(ZoneHandle zone) const { - // `kRelaxed` is sufficient because pointers are immutable once published - // via a `kRelease` store. - return ptrs_[zone.ZoneIdx()].load(kRelaxed); + return strings_.Name(zone.ZoneIdx()); } - // Thread-safe. + // Can be called concurrently. Calls with the same `name` return the same + // `ZoneHandle.ZoneIdx()`. ZoneHandle AddZone(const char* name, ProfilerFlags flags) { - // Linear search if it already exists. `kAcq` ensures we see prior stores. - const size_t num_zones = next_ptr_.load(kAcq); - HWY_ASSERT(num_zones < kMaxZones); - for (size_t zone_idx = 1; zone_idx < num_zones; ++zone_idx) { - const char* existing = ptrs_[zone_idx].load(kAcq); - // `next_ptr_` was published after writing `ptr_`, hence it is non-null. - HWY_ASSERT(existing != nullptr); - if (HWY_UNLIKELY(!strcmp(existing, name))) { - return ZoneHandle(zone_idx, flags); - } - } - - // Copy `name` into `chars_` before publishing the pointer. - const size_t len = strlen(name) + 1; - const size_t pos = next_char_.fetch_add(len, kRelaxed); - HWY_ASSERT(pos + len <= sizeof(chars_)); - strcpy(chars_ + pos, name); // NOLINT - - for (;;) { - size_t zone_idx = next_ptr_.load(kRelaxed); - HWY_ASSERT(zone_idx < kMaxZones); - - // Attempt to claim the next `zone_idx` via CAS. - const char* expected = nullptr; - if (HWY_LIKELY(ptrs_[zone_idx].compare_exchange_weak( - expected, chars_ + pos, kRel, kRelaxed))) { - // Publish the new count and make the `ptrs_` write visible. - next_ptr_.store(zone_idx + 1, kRel); - const ZoneHandle zone(zone_idx, flags); - HWY_DASSERT(!strcmp(Name(zone), name)); - return zone; - } - - // We lost the race. `expected` has been updated. - if (HWY_UNLIKELY(!strcmp(expected, name))) { - // Done, another thread added the same zone. Note that we waste the - // extra space in `chars_`, which is fine because it is rare. - const ZoneHandle zone(zone_idx, flags); - HWY_DASSERT(!strcmp(Name(zone), name)); - return zone; - } - - // Other thread added a different zone. Retry with the next slot. - } + return ZoneHandle(strings_.Add(name), flags); } private: - std::atomic ptrs_[kMaxZones]; - std::atomic next_ptr_{1}; // next zone_idx - char chars_[kMaxZones * 70]; - std::atomic next_char_{0}; + StringTable strings_; }; // Allows other classes such as `ThreadPool` to register/unregister a function diff --git a/meson.build b/meson.build index 5df06033e2..08bec3addc 100644 --- a/meson.build +++ b/meson.build @@ -150,6 +150,7 @@ hwy_contrib_headers = files( hwy_contrib_sources = files( 'hwy/contrib/image/image.cc', 'hwy/contrib/sort/vqsort.cc', + 'hwy/contrib/thread_pool/thread_pool.cc', 'hwy/contrib/thread_pool/topology.cc', # plus all of the vqsort_*.cc.... # note meson doesn't directly support glob (by design).