diff --git a/hwy/contrib/thread_pool/thread_pool.h b/hwy/contrib/thread_pool/thread_pool.h index 5a77a4f4e7..9371277017 100644 --- a/hwy/contrib/thread_pool/thread_pool.h +++ b/hwy/contrib/thread_pool/thread_pool.h @@ -180,40 +180,42 @@ static inline const char* ToString(WaitType type) { } } -// We want predictable struct/class sizes so we can reason about cache lines. -#pragma pack(push, 1) - // Parameters governing the main and worker thread behavior. Can be updated at -// runtime via `SetWaitMode`. Both have copies which are carefully synchronized -// (two-phase barrier). 32 bits leave room for two future fields. 64 bits would -// also be fine because this does not go through futex. +// runtime via `SetWaitMode`, which calls `SendConfig`. Both have copies which +// are carefully synchronized. 32 bits leave room for two future fields. +// 64 bits would also be fine because this does not go through futex. struct Config { // 4 bytes static std::vector AllCandidates(PoolWaitMode wait_mode) { - std::vector spin_types(size_t{1}, DetectSpin()); - // Monitor-based spin may be slower, so also try Pause. - if (spin_types[0] != SpinType::kPause) { - spin_types.push_back(SpinType::kPause); - } + std::vector candidates; - std::vector wait_types; if (wait_mode == PoolWaitMode::kSpin) { + std::vector spin_types; + spin_types.reserve(2); + spin_types.push_back(DetectSpin()); + // Monitor-based spin may be slower, so also try Pause. + if (spin_types[0] != SpinType::kPause) { + spin_types.push_back(SpinType::kPause); + } + // All except `kBlock`. + std::vector wait_types; for (size_t wait = 0;; ++wait) { const WaitType wait_type = static_cast(wait); if (wait_type == WaitType::kSentinel) break; if (wait_type != WaitType::kBlock) wait_types.push_back(wait_type); } - } else { - wait_types.push_back(WaitType::kBlock); - } - std::vector candidates; - candidates.reserve(50); - for (const SpinType spin_type : spin_types) { - for (const WaitType wait_type : wait_types) { - candidates.emplace_back(spin_type, wait_type); + candidates.reserve(spin_types.size() * wait_types.size()); + for (const SpinType spin_type : spin_types) { + for (const WaitType wait_type : wait_types) { + candidates.emplace_back(spin_type, wait_type); + } } + } else { + // kBlock does not use spin, so there is only one candidate. + candidates.emplace_back(SpinType::kPause, WaitType::kBlock); } + return candidates; } @@ -224,9 +226,10 @@ struct Config { // 4 bytes return buf; } - Config() {} Config(SpinType spin_type, WaitType wait_type) : spin_type(spin_type), wait_type(wait_type) {} + // Workers initially spin until ThreadPool sends them their actual config. + Config() : Config(SpinType::kPause, WaitType::kSpinSeparate) {} SpinType spin_type; WaitType wait_type; @@ -270,6 +273,7 @@ class alignas(HWY_ALIGNMENT) Worker { // HWY_ALIGNMENT bytes Worker& operator=(const Worker&) = delete; size_t Index() const { return worker_; } + // For work stealing. Worker* AllWorkers() { return workers_; } const Worker* AllWorkers() const { return workers_; } size_t NumThreads() const { return num_threads_; } @@ -277,7 +281,8 @@ class alignas(HWY_ALIGNMENT) Worker { // HWY_ALIGNMENT bytes // ------------------------ Per-worker storage for `SendConfig` Config NextConfig() const { return next_config_; } - // For workers, but no harm if also called by main thread. + // Called during `SendConfig` by workers and now also the main thread. This + // avoids a separate `ThreadPool` member which risks going out of sync. void SetNextConfig(Config copy) { next_config_ = copy; } uint32_t GetExit() const { return exit_; } @@ -321,8 +326,9 @@ class alignas(HWY_ALIGNMENT) Worker { // HWY_ALIGNMENT bytes // ------------------------ Barrier: Main thread waits for workers - // Used for loads and UntilEqual. + // For use by `HasReached` and `UntilReached`. const std::atomic& Barrier() const { return barrier_epoch_; } + // Setting to `epoch` signals that the worker has reached the barrier. void StoreBarrier(uint32_t epoch) { barrier_epoch_.store(epoch, kRel); } private: @@ -352,8 +358,6 @@ class alignas(HWY_ALIGNMENT) Worker { // HWY_ALIGNMENT bytes }; static_assert(sizeof(Worker) == HWY_ALIGNMENT, ""); -#pragma pack(pop) - // Creates/destroys `Worker` using preallocated storage. See comment at // `ThreadPool::worker_bytes_` for why we do not dynamically allocate. class WorkerLifecycle { // 0 bytes @@ -388,10 +392,9 @@ class WorkerLifecycle { // 0 bytes } }; -#pragma pack(push, 1) // Stores arguments to `Run`: the function and range of task indices. Set by // the main thread, read by workers including the main thread. -class alignas(8) Tasks { +class Tasks { static constexpr auto kAcq = std::memory_order_acquire; // Signature of the (internal) function called from workers(s) for each @@ -414,7 +417,7 @@ class alignas(8) Tasks { } // Assigns workers their share of `[begin, end)`. Called from the main - // thread; workers are initializing or spinning for a command. + // thread; workers are initializing or waiting for a command. static void DivideRangeAmongWorkers(const uint64_t begin, const uint64_t end, const Divisor64& div_workers, Worker* workers) { @@ -524,7 +527,6 @@ class alignas(8) Tasks { std::atomic opaque_; }; static_assert(sizeof(Tasks) == 16 + 2 * sizeof(void*), ""); -#pragma pack(pop) // ------------------------------ Threads wait, main wakes them @@ -550,8 +552,6 @@ static_assert(sizeof(Tasks) == 16 + 2 * sizeof(void*), ""); // Futex: blocking reduces apparent CPU usage, but has higher wake latency. struct WaitBlock { - WaitType Type() const { return WaitType::kBlock; } - // Wakes all workers by storing the current `epoch`. void WakeWorkers(Worker* workers, const uint32_t epoch) const { HWY_DASSERT(epoch != 0); @@ -561,11 +561,12 @@ struct WaitBlock { // Waits until `WakeWorkers(_, epoch)` has been called. template - void UntilWoken(const Worker& worker, const Spin& /*spin*/) const { + size_t UntilWoken(const Worker& worker, const Spin& /*spin*/) const { HWY_DASSERT(worker.Index() != 0); // main is 0 const uint32_t epoch = worker.WorkerEpoch(); const Worker* workers = worker.AllWorkers(); BlockUntilDifferent(epoch - 1, workers[1].Waiter()); + return 1; // iterations } }; @@ -573,27 +574,23 @@ struct WaitBlock { // one cache line and thus have it in a shared state, which means the store // will invalidate each of them, leading to more transactions than SpinSeparate. struct WaitSpin1 { - WaitType Type() const { return WaitType::kSpin1; } - void WakeWorkers(Worker* workers, const uint32_t epoch) const { workers[1].StoreWaiter(epoch); } + // Returns the number of spin-wait iterations. template - void UntilWoken(const Worker& worker, const Spin& spin) const { + size_t UntilWoken(const Worker& worker, const Spin& spin) const { HWY_DASSERT(worker.Index() != 0); // main is 0 const Worker* workers = worker.AllWorkers(); const uint32_t epoch = worker.WorkerEpoch(); - (void)spin.UntilEqual(epoch, workers[1].Waiter()); - // TODO: store reps in stats. + return spin.UntilEqual(epoch, workers[1].Waiter()); } }; // Separate u32 per thread: more stores for the main thread, but each worker // only polls its own cache line, leading to fewer cache-coherency transactions. struct WaitSpinSeparate { - WaitType Type() const { return WaitType::kSpinSeparate; } - void WakeWorkers(Worker* workers, const uint32_t epoch) const { for (size_t thread = 0; thread < workers->NumThreads(); ++thread) { workers[1 + thread].StoreWaiter(epoch); @@ -601,11 +598,10 @@ struct WaitSpinSeparate { } template - void UntilWoken(const Worker& worker, const Spin& spin) const { + size_t UntilWoken(const Worker& worker, const Spin& spin) const { HWY_DASSERT(worker.Index() != 0); // main is 0 const uint32_t epoch = worker.WorkerEpoch(); - (void)spin.UntilEqual(epoch, worker.Waiter()); - // TODO: store reps in stats. + return spin.UntilEqual(epoch, worker.Waiter()); } }; @@ -653,6 +649,7 @@ class Barrier { workers[0].StoreBarrier(epoch); // for main thread HasReached. for (size_t i = 0; i < num_threads; ++i) { + // TODO: log number of spin-wait iterations. (void)spin.UntilEqual(epoch, workers[1 + i].Barrier()); } } @@ -680,19 +677,6 @@ class Barrier { // // For load-balancing, we use work stealing in random order. class alignas(HWY_ALIGNMENT) ThreadPool { - public: - // This typically includes hyperthreads, hence it is a loose upper bound. - // -1 because these are in addition to the main thread. - static size_t MaxThreads() { - LogicalProcessorSet lps; - // This is OS dependent, but more accurate if available because it takes - // into account restrictions set by cgroups or numactl/taskset. - if (GetThreadAffinity(lps)) { - return lps.Count() - 1; - } - return static_cast(std::thread::hardware_concurrency() - 1); - } - // Called by `std::thread`. Could also be a lambda, but annotating with // `HWY_POOL_PROFILE` makes it easier to inspect the generated code. class ThreadFunc { @@ -702,15 +686,14 @@ class alignas(HWY_ALIGNMENT) ThreadPool { template void operator()(const Spin& spin, const Wait& wait, pool::Worker& worker) const { - wait.UntilWoken(worker, spin); + // TODO: log number of spin-wait iterations. + (void)wait.UntilWoken(worker, spin); } }; public: - ThreadFunc(pool::Worker& worker, pool::Tasks& tasks, pool::Config config) - : worker_(worker), tasks_(tasks) { - worker.SetNextConfig(config); - } + ThreadFunc(pool::Worker& worker, pool::Tasks& tasks) + : worker_(worker), tasks_(tasks) {} HWY_POOL_PROFILE void operator()() { // Ensure main thread's writes are visible (synchronizes with fence in @@ -743,14 +726,38 @@ class alignas(HWY_ALIGNMENT) ThreadPool { pool::Tasks& tasks_; }; + // Used to initialize `num_threads_` from the ctor argument. + static size_t ClampedNumThreads(size_t num_threads) { + // Upper bound is required for `worker_bytes_`. + if (HWY_UNLIKELY(num_threads > pool::kMaxThreads)) { + HWY_WARN("ThreadPool: clamping num_threads %zu to %zu.", num_threads, + pool::kMaxThreads); + num_threads = pool::kMaxThreads; + } + return num_threads; + } + + public: + // This typically includes hyperthreads, hence it is a loose upper bound. + // -1 because these are in addition to the main thread. + static size_t MaxThreads() { + LogicalProcessorSet lps; + // This is OS dependent, but more accurate if available because it takes + // into account restrictions set by cgroups or numactl/taskset. + if (GetThreadAffinity(lps)) { + return lps.Count() - 1; + } + return static_cast(std::thread::hardware_concurrency() - 1); + } + // `num_threads` is the number of *additional* threads to spawn, which should // not exceed `MaxThreads()`. Note that the main thread also performs work. explicit ThreadPool(size_t num_threads) - : have_timer_stop_(platform::HaveTimerStop(cpu100_)), - num_threads_(ClampedNumThreads(num_threads)), + : num_threads_(ClampedNumThreads(num_threads)), div_workers_(1 + num_threads_), workers_(pool::WorkerLifecycle::Init(worker_bytes_, num_threads_, - div_workers_)) { + div_workers_)), + have_timer_stop_(platform::HaveTimerStop(cpu100_)) { // Leaves the default wait mode as `kBlock`, which means futex, because // spinning only makes sense when threads are pinned and wake latency is // important, so it must explicitly be requested by calling `SetWaitMode`. @@ -759,15 +766,16 @@ class alignas(HWY_ALIGNMENT) ThreadPool { AutoTuner().SetCandidates( pool::Config::AllCandidates(mode)); } - config_ = AutoTuner().Candidates()[0]; threads_.reserve(num_threads_); for (size_t thread = 0; thread < num_threads_; ++thread) { - threads_.emplace_back(ThreadFunc(workers_[1 + thread], tasks_, config_)); + threads_.emplace_back(ThreadFunc(workers_[1 + thread], tasks_)); } - // No barrier is required here because wakeup works regardless of the - // relative order of wake and wait. + // Threads' `Config` defaults to spinning. Change to `kBlock` (see above). + // This also ensures all threads have started before we return, so that + // startup latency is billed to the ctor, not the first `Run`. + SendConfig(AutoTuner().Candidates()[0]); } // If we created threads, waits for them all to exit. @@ -775,10 +783,11 @@ class alignas(HWY_ALIGNMENT) ThreadPool { // There is no portable way to request threads to exit like `ExitThread` on // Windows, otherwise we could call that from `Run`. Instead, we must cause // the thread to wake up and exit. We can just use `Run`. - (void)RunWithoutAutotune(0, NumWorkers(), - [this](uint64_t /*task*/, size_t worker) { - workers_[worker].SetExit(1); - }); + (void)RunWithoutAutotune( + 0, NumWorkers(), [this](HWY_MAYBE_UNUSED uint64_t task, size_t worker) { + HWY_DASSERT(task == worker); + workers_[worker].SetExit(1); + }); for (std::thread& thread : threads_) { HWY_DASSERT(thread.joinable()); @@ -807,8 +816,8 @@ class alignas(HWY_ALIGNMENT) ThreadPool { : AutoTuner().NextConfig()); } - // For printing which are in use. - pool::Config config() const { return config_; } + // For printing which is in use. + pool::Config config() const { return workers_[0].NextConfig(); } bool AutoTuneComplete() const { return AutoTuner().Best(); } Span AutoTuneCosts() { return AutoTuner().Costs(); } @@ -863,24 +872,13 @@ class alignas(HWY_ALIGNMENT) ThreadPool { } else { HWY_IF_CONSTEXPR(pool::kVerbosity >= 2) { fprintf(stderr, "Pool %3zu: %s %9lu\n", NumWorkers(), - config_.ToString().c_str(), t1 - t0); + config().ToString().c_str(), t1 - t0); } SendConfig(auto_tuner.NextConfig()); } } private: - // Used to initialize ThreadPool::num_threads_ from its ctor argument. - static size_t ClampedNumThreads(size_t num_threads) { - // Upper bound is required for `worker_bytes_`. - if (HWY_UNLIKELY(num_threads > pool::kMaxThreads)) { - HWY_WARN("ThreadPool: clamping num_threads %zu to %zu.", num_threads, - pool::kMaxThreads); - num_threads = pool::kMaxThreads; - } - return num_threads; - } - // Debug-only re-entrancy detection. void SetBusy() { HWY_DASSERT(!busy_.test_and_set()); } void ClearBusy() { HWY_IF_CONSTEXPR(HWY_IS_DEBUG_BUILD) busy_.clear(); } @@ -895,7 +893,7 @@ class alignas(HWY_ALIGNMENT) ThreadPool { pool::Worker* workers = main.AllWorkers(); HWY_DASSERT(&main == main.AllWorkers()); // main is first. const size_t num_threads = main.NumThreads(); - const uint32_t epoch = main.WorkerEpoch(); + const uint32_t epoch = main.AdvanceWorkerEpoch(); HWY_IF_CONSTEXPR(HWY_IS_DEBUG_BUILD) { for (size_t i = 0; i < 1 + num_threads; ++i) { @@ -904,13 +902,11 @@ class alignas(HWY_ALIGNMENT) ThreadPool { } wait.WakeWorkers(workers, epoch); - // Threads might still be starting up and wake up late, but we wait for - // them at the barrier below. // Also perform work on the main thread before the barrier. tasks.WorkerRun(&main); - // Spin-waits until all *threads* (not the main thread, because it already + // Spin-waits until all worker *threads* (not `main`, because it already // knows it is here) called `WorkerReached`. barrier.UntilReached(num_threads, workers, spin, epoch); HWY_IF_CONSTEXPR(HWY_IS_DEBUG_BUILD) { @@ -919,7 +915,7 @@ class alignas(HWY_ALIGNMENT) ThreadPool { } } - // Threads may already be waiting `UntilWoken`, which serves as the + // Threads are or will soon be waiting `UntilWoken`, which serves as the // 'release' phase of the barrier. } }; @@ -951,11 +947,9 @@ class alignas(HWY_ALIGNMENT) ThreadPool { pool::Tasks::DivideRangeAmongWorkers(begin, end, div_workers_, workers_); } - pool::Worker& main = workers_[0]; - const HWY_MAYBE_UNUSED uint32_t epoch = main.AdvanceWorkerEpoch(); + // Runs `MainWakeAndBarrier` with the first worker slot. + CallWithConfig(config(), MainWakeAndBarrier(), workers_[0], tasks_); - // Assign main thread the first worker slot (it used to be the last). - CallWithConfig(config_, MainWakeAndBarrier(), main, tasks_); if (is_root) { PROFILER_END_ROOT_RUN(); } @@ -966,8 +960,8 @@ class alignas(HWY_ALIGNMENT) ThreadPool { // Sends `next_config` to workers: // - Main wakes threads using the current config. // - Threads copy `next_config` into their `Worker` during `WorkerRun`. - // - Threads notify the (unchanged) barrier and already wait for the next - // wake using the new config. + // - Threads notify the (same) barrier and already wait for the next wake + // using `next_config`. HWY_NOINLINE void SendConfig(pool::Config next_config) { (void)RunWithoutAutotune( 0, NumWorkers(), @@ -978,7 +972,7 @@ class alignas(HWY_ALIGNMENT) ThreadPool { // All have woken and are, or will be, waiting per `next_config`. Now we // can entirely switch the main thread's config for the next wake. - config_ = next_config; + workers_[0].SetNextConfig(next_config); } using AutoTuneT = AutoTune; @@ -990,15 +984,17 @@ class alignas(HWY_ALIGNMENT) ThreadPool { return auto_tune_[static_cast(wait_mode_) - 1]; } - char cpu100_[100]; - const bool have_timer_stop_; const size_t num_threads_; // not including main thread const Divisor64 div_workers_; pool::Worker* const workers_; // points into `worker_bytes_` + const bool have_timer_stop_; + char cpu100_[100]; // write-only for `HaveTimerStop` in ctor. - // The only mutable state: - pool::Tasks tasks_; // written by `Run` and read by workers. - pool::Config config_; // for use by the next `Run`. Updated via `SendConfig`. + // This is written by the main thread and read by workers, via reference + // passed to `ThreadFunc`. Padding ensures that the workers' cache lines are + // not unnecessarily invalidated when the main thread writes other members. + alignas(HWY_ALIGNMENT) pool::Tasks tasks_; + HWY_MAYBE_UNUSED char padding_[HWY_ALIGNMENT - sizeof(pool::Tasks)]; // In debug builds, detects if functions are re-entered. std::atomic_flag busy_ = ATOMIC_FLAG_INIT;