You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
386 lines
13 KiB
386 lines
13 KiB
2 years ago
|
From 7a6fa42d4a4263c94b9bf18290f9e7680ea9e7f4 Mon Sep 17 00:00:00 2001
|
||
|
From: Nicolas Saenz Julienne <nsaenzju@redhat.com>
|
||
|
Date: Mon, 25 Apr 2022 09:57:23 +0200
|
||
|
Subject: [PATCH 03/16] util/event-loop-base: Introduce options to set the
|
||
|
thread pool size
|
||
|
|
||
|
RH-Author: Nicolas Saenz Julienne <nsaenzju@redhat.com>
|
||
|
RH-MergeRequest: 93: util/thread-pool: Expose minimum and maximum size
|
||
|
RH-Commit: [3/3] af78a88ff3c69701cbb5f9e980c3d6ebbd13ff98
|
||
|
RH-Bugzilla: 2031024
|
||
|
RH-Acked-by: Miroslav Rezanina <mrezanin@redhat.com>
|
||
|
RH-Acked-by: Stefano Garzarella <sgarzare@redhat.com>
|
||
|
RH-Acked-by: Stefan Hajnoczi <stefanha@redhat.com>
|
||
|
|
||
|
The thread pool regulates itself: when idle, it kills threads until
|
||
|
empty, when in demand, it creates new threads until full. This behaviour
|
||
|
doesn't play well with latency sensitive workloads where the price of
|
||
|
creating a new thread is too high. For example, when paired with qemu's
|
||
|
'-mlock', or using safety features like SafeStack, creating a new thread
|
||
|
has been measured take multiple milliseconds.
|
||
|
|
||
|
In order to mitigate this let's introduce a new 'EventLoopBase'
|
||
|
property to set the thread pool size. The threads will be created during
|
||
|
the pool's initialization or upon updating the property's value, remain
|
||
|
available during its lifetime regardless of demand, and destroyed upon
|
||
|
freeing it. A properly characterized workload will then be able to
|
||
|
configure the pool to avoid any latency spikes.
|
||
|
|
||
|
Signed-off-by: Nicolas Saenz Julienne <nsaenzju@redhat.com>
|
||
|
Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
|
||
|
Acked-by: Markus Armbruster <armbru@redhat.com>
|
||
|
Message-id: 20220425075723.20019-4-nsaenzju@redhat.com
|
||
|
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
|
||
|
(cherry picked from commit 71ad4713cc1d7fca24388b828ef31ae6cb38a31c)
|
||
|
---
|
||
|
event-loop-base.c | 23 +++++++++++++
|
||
|
include/block/aio.h | 10 ++++++
|
||
|
include/block/thread-pool.h | 3 ++
|
||
|
include/sysemu/event-loop-base.h | 4 +++
|
||
|
iothread.c | 3 ++
|
||
|
qapi/qom.json | 10 +++++-
|
||
|
util/aio-posix.c | 1 +
|
||
|
util/async.c | 20 ++++++++++++
|
||
|
util/main-loop.c | 9 ++++++
|
||
|
util/thread-pool.c | 55 +++++++++++++++++++++++++++++---
|
||
|
10 files changed, 133 insertions(+), 5 deletions(-)
|
||
|
|
||
|
diff --git a/event-loop-base.c b/event-loop-base.c
|
||
|
index e7f99a6ec8..d5be4dc6fc 100644
|
||
|
--- a/event-loop-base.c
|
||
|
+++ b/event-loop-base.c
|
||
|
@@ -14,6 +14,7 @@
|
||
|
#include "qemu/osdep.h"
|
||
|
#include "qom/object_interfaces.h"
|
||
|
#include "qapi/error.h"
|
||
|
+#include "block/thread-pool.h"
|
||
|
#include "sysemu/event-loop-base.h"
|
||
|
|
||
|
typedef struct {
|
||
|
@@ -21,9 +22,22 @@ typedef struct {
|
||
|
ptrdiff_t offset; /* field's byte offset in EventLoopBase struct */
|
||
|
} EventLoopBaseParamInfo;
|
||
|
|
||
|
+static void event_loop_base_instance_init(Object *obj)
|
||
|
+{
|
||
|
+ EventLoopBase *base = EVENT_LOOP_BASE(obj);
|
||
|
+
|
||
|
+ base->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
|
||
|
+}
|
||
|
+
|
||
|
static EventLoopBaseParamInfo aio_max_batch_info = {
|
||
|
"aio-max-batch", offsetof(EventLoopBase, aio_max_batch),
|
||
|
};
|
||
|
+static EventLoopBaseParamInfo thread_pool_min_info = {
|
||
|
+ "thread-pool-min", offsetof(EventLoopBase, thread_pool_min),
|
||
|
+};
|
||
|
+static EventLoopBaseParamInfo thread_pool_max_info = {
|
||
|
+ "thread-pool-max", offsetof(EventLoopBase, thread_pool_max),
|
||
|
+};
|
||
|
|
||
|
static void event_loop_base_get_param(Object *obj, Visitor *v,
|
||
|
const char *name, void *opaque, Error **errp)
|
||
|
@@ -95,12 +109,21 @@ static void event_loop_base_class_init(ObjectClass *klass, void *class_data)
|
||
|
event_loop_base_get_param,
|
||
|
event_loop_base_set_param,
|
||
|
NULL, &aio_max_batch_info);
|
||
|
+ object_class_property_add(klass, "thread-pool-min", "int",
|
||
|
+ event_loop_base_get_param,
|
||
|
+ event_loop_base_set_param,
|
||
|
+ NULL, &thread_pool_min_info);
|
||
|
+ object_class_property_add(klass, "thread-pool-max", "int",
|
||
|
+ event_loop_base_get_param,
|
||
|
+ event_loop_base_set_param,
|
||
|
+ NULL, &thread_pool_max_info);
|
||
|
}
|
||
|
|
||
|
static const TypeInfo event_loop_base_info = {
|
||
|
.name = TYPE_EVENT_LOOP_BASE,
|
||
|
.parent = TYPE_OBJECT,
|
||
|
.instance_size = sizeof(EventLoopBase),
|
||
|
+ .instance_init = event_loop_base_instance_init,
|
||
|
.class_size = sizeof(EventLoopBaseClass),
|
||
|
.class_init = event_loop_base_class_init,
|
||
|
.abstract = true,
|
||
|
diff --git a/include/block/aio.h b/include/block/aio.h
|
||
|
index 5634173b12..d128558f1d 100644
|
||
|
--- a/include/block/aio.h
|
||
|
+++ b/include/block/aio.h
|
||
|
@@ -192,6 +192,8 @@ struct AioContext {
|
||
|
QSLIST_HEAD(, Coroutine) scheduled_coroutines;
|
||
|
QEMUBH *co_schedule_bh;
|
||
|
|
||
|
+ int thread_pool_min;
|
||
|
+ int thread_pool_max;
|
||
|
/* Thread pool for performing work and receiving completion callbacks.
|
||
|
* Has its own locking.
|
||
|
*/
|
||
|
@@ -769,4 +771,12 @@ void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
|
||
|
void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
|
||
|
Error **errp);
|
||
|
|
||
|
+/**
|
||
|
+ * aio_context_set_thread_pool_params:
|
||
|
+ * @ctx: the aio context
|
||
|
+ * @min: min number of threads to have readily available in the thread pool
|
||
|
+ * @min: max number of threads the thread pool can contain
|
||
|
+ */
|
||
|
+void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
|
||
|
+ int64_t max, Error **errp);
|
||
|
#endif
|
||
|
diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
|
||
|
index 7dd7d730a0..2020bcc92d 100644
|
||
|
--- a/include/block/thread-pool.h
|
||
|
+++ b/include/block/thread-pool.h
|
||
|
@@ -20,6 +20,8 @@
|
||
|
|
||
|
#include "block/block.h"
|
||
|
|
||
|
+#define THREAD_POOL_MAX_THREADS_DEFAULT 64
|
||
|
+
|
||
|
typedef int ThreadPoolFunc(void *opaque);
|
||
|
|
||
|
typedef struct ThreadPool ThreadPool;
|
||
|
@@ -33,5 +35,6 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
|
||
|
int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
|
||
|
ThreadPoolFunc *func, void *arg);
|
||
|
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
|
||
|
+void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
|
||
|
|
||
|
#endif
|
||
|
diff --git a/include/sysemu/event-loop-base.h b/include/sysemu/event-loop-base.h
|
||
|
index fced4c9fea..2748bf6ae1 100644
|
||
|
--- a/include/sysemu/event-loop-base.h
|
||
|
+++ b/include/sysemu/event-loop-base.h
|
||
|
@@ -33,5 +33,9 @@ struct EventLoopBase {
|
||
|
|
||
|
/* AioContext AIO engine parameters */
|
||
|
int64_t aio_max_batch;
|
||
|
+
|
||
|
+ /* AioContext thread pool parameters */
|
||
|
+ int64_t thread_pool_min;
|
||
|
+ int64_t thread_pool_max;
|
||
|
};
|
||
|
#endif
|
||
|
diff --git a/iothread.c b/iothread.c
|
||
|
index 8fa2f3bfb8..529194a566 100644
|
||
|
--- a/iothread.c
|
||
|
+++ b/iothread.c
|
||
|
@@ -174,6 +174,9 @@ static void iothread_set_aio_context_params(EventLoopBase *base, Error **errp)
|
||
|
aio_context_set_aio_params(iothread->ctx,
|
||
|
iothread->parent_obj.aio_max_batch,
|
||
|
errp);
|
||
|
+
|
||
|
+ aio_context_set_thread_pool_params(iothread->ctx, base->thread_pool_min,
|
||
|
+ base->thread_pool_max, errp);
|
||
|
}
|
||
|
|
||
|
|
||
|
diff --git a/qapi/qom.json b/qapi/qom.json
|
||
|
index 7d4a2ac1b9..6a653c6636 100644
|
||
|
--- a/qapi/qom.json
|
||
|
+++ b/qapi/qom.json
|
||
|
@@ -508,10 +508,18 @@
|
||
|
# 0 means that the engine will use its default.
|
||
|
# (default: 0)
|
||
|
#
|
||
|
+# @thread-pool-min: minimum number of threads reserved in the thread pool
|
||
|
+# (default:0)
|
||
|
+#
|
||
|
+# @thread-pool-max: maximum number of threads the thread pool can contain
|
||
|
+# (default:64)
|
||
|
+#
|
||
|
# Since: 7.1
|
||
|
##
|
||
|
{ 'struct': 'EventLoopBaseProperties',
|
||
|
- 'data': { '*aio-max-batch': 'int' } }
|
||
|
+ 'data': { '*aio-max-batch': 'int',
|
||
|
+ '*thread-pool-min': 'int',
|
||
|
+ '*thread-pool-max': 'int' } }
|
||
|
|
||
|
##
|
||
|
# @IothreadProperties:
|
||
|
diff --git a/util/aio-posix.c b/util/aio-posix.c
|
||
|
index be0182a3c6..731f3826c0 100644
|
||
|
--- a/util/aio-posix.c
|
||
|
+++ b/util/aio-posix.c
|
||
|
@@ -15,6 +15,7 @@
|
||
|
|
||
|
#include "qemu/osdep.h"
|
||
|
#include "block/block.h"
|
||
|
+#include "block/thread-pool.h"
|
||
|
#include "qemu/main-loop.h"
|
||
|
#include "qemu/rcu.h"
|
||
|
#include "qemu/rcu_queue.h"
|
||
|
diff --git a/util/async.c b/util/async.c
|
||
|
index 2ea1172f3e..554ba70cca 100644
|
||
|
--- a/util/async.c
|
||
|
+++ b/util/async.c
|
||
|
@@ -563,6 +563,9 @@ AioContext *aio_context_new(Error **errp)
|
||
|
|
||
|
ctx->aio_max_batch = 0;
|
||
|
|
||
|
+ ctx->thread_pool_min = 0;
|
||
|
+ ctx->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
|
||
|
+
|
||
|
return ctx;
|
||
|
fail:
|
||
|
g_source_destroy(&ctx->source);
|
||
|
@@ -696,3 +699,20 @@ void qemu_set_current_aio_context(AioContext *ctx)
|
||
|
assert(!get_my_aiocontext());
|
||
|
set_my_aiocontext(ctx);
|
||
|
}
|
||
|
+
|
||
|
+void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
|
||
|
+ int64_t max, Error **errp)
|
||
|
+{
|
||
|
+
|
||
|
+ if (min > max || !max || min > INT_MAX || max > INT_MAX) {
|
||
|
+ error_setg(errp, "bad thread-pool-min/thread-pool-max values");
|
||
|
+ return;
|
||
|
+ }
|
||
|
+
|
||
|
+ ctx->thread_pool_min = min;
|
||
|
+ ctx->thread_pool_max = max;
|
||
|
+
|
||
|
+ if (ctx->thread_pool) {
|
||
|
+ thread_pool_update_params(ctx->thread_pool, ctx);
|
||
|
+ }
|
||
|
+}
|
||
|
diff --git a/util/main-loop.c b/util/main-loop.c
|
||
|
index 5b13f456fa..a0f48186ab 100644
|
||
|
--- a/util/main-loop.c
|
||
|
+++ b/util/main-loop.c
|
||
|
@@ -30,6 +30,7 @@
|
||
|
#include "sysemu/replay.h"
|
||
|
#include "qemu/main-loop.h"
|
||
|
#include "block/aio.h"
|
||
|
+#include "block/thread-pool.h"
|
||
|
#include "qemu/error-report.h"
|
||
|
#include "qemu/queue.h"
|
||
|
#include "qemu/compiler.h"
|
||
|
@@ -187,12 +188,20 @@ int qemu_init_main_loop(Error **errp)
|
||
|
|
||
|
static void main_loop_update_params(EventLoopBase *base, Error **errp)
|
||
|
{
|
||
|
+ ERRP_GUARD();
|
||
|
+
|
||
|
if (!qemu_aio_context) {
|
||
|
error_setg(errp, "qemu aio context not ready");
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
aio_context_set_aio_params(qemu_aio_context, base->aio_max_batch, errp);
|
||
|
+ if (*errp) {
|
||
|
+ return;
|
||
|
+ }
|
||
|
+
|
||
|
+ aio_context_set_thread_pool_params(qemu_aio_context, base->thread_pool_min,
|
||
|
+ base->thread_pool_max, errp);
|
||
|
}
|
||
|
|
||
|
MainLoop *mloop;
|
||
|
diff --git a/util/thread-pool.c b/util/thread-pool.c
|
||
|
index d763cea505..196835b4d3 100644
|
||
|
--- a/util/thread-pool.c
|
||
|
+++ b/util/thread-pool.c
|
||
|
@@ -58,7 +58,6 @@ struct ThreadPool {
|
||
|
QemuMutex lock;
|
||
|
QemuCond worker_stopped;
|
||
|
QemuSemaphore sem;
|
||
|
- int max_threads;
|
||
|
QEMUBH *new_thread_bh;
|
||
|
|
||
|
/* The following variables are only accessed from one AioContext. */
|
||
|
@@ -71,8 +70,27 @@ struct ThreadPool {
|
||
|
int new_threads; /* backlog of threads we need to create */
|
||
|
int pending_threads; /* threads created but not running yet */
|
||
|
bool stopping;
|
||
|
+ int min_threads;
|
||
|
+ int max_threads;
|
||
|
};
|
||
|
|
||
|
+static inline bool back_to_sleep(ThreadPool *pool, int ret)
|
||
|
+{
|
||
|
+ /*
|
||
|
+ * The semaphore timed out, we should exit the loop except when:
|
||
|
+ * - There is work to do, we raced with the signal.
|
||
|
+ * - The max threads threshold just changed, we raced with the signal.
|
||
|
+ * - The thread pool forces a minimum number of readily available threads.
|
||
|
+ */
|
||
|
+ if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
|
||
|
+ pool->cur_threads > pool->max_threads ||
|
||
|
+ pool->cur_threads <= pool->min_threads)) {
|
||
|
+ return true;
|
||
|
+ }
|
||
|
+
|
||
|
+ return false;
|
||
|
+}
|
||
|
+
|
||
|
static void *worker_thread(void *opaque)
|
||
|
{
|
||
|
ThreadPool *pool = opaque;
|
||
|
@@ -91,8 +109,9 @@ static void *worker_thread(void *opaque)
|
||
|
ret = qemu_sem_timedwait(&pool->sem, 10000);
|
||
|
qemu_mutex_lock(&pool->lock);
|
||
|
pool->idle_threads--;
|
||
|
- } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
|
||
|
- if (ret == -1 || pool->stopping) {
|
||
|
+ } while (back_to_sleep(pool, ret));
|
||
|
+ if (ret == -1 || pool->stopping ||
|
||
|
+ pool->cur_threads > pool->max_threads) {
|
||
|
break;
|
||
|
}
|
||
|
|
||
|
@@ -294,6 +313,33 @@ void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
|
||
|
thread_pool_submit_aio(pool, func, arg, NULL, NULL);
|
||
|
}
|
||
|
|
||
|
+void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
|
||
|
+{
|
||
|
+ qemu_mutex_lock(&pool->lock);
|
||
|
+
|
||
|
+ pool->min_threads = ctx->thread_pool_min;
|
||
|
+ pool->max_threads = ctx->thread_pool_max;
|
||
|
+
|
||
|
+ /*
|
||
|
+ * We either have to:
|
||
|
+ * - Increase the number available of threads until over the min_threads
|
||
|
+ * threshold.
|
||
|
+ * - Decrease the number of available threads until under the max_threads
|
||
|
+ * threshold.
|
||
|
+ * - Do nothing. The current number of threads fall in between the min and
|
||
|
+ * max thresholds. We'll let the pool manage itself.
|
||
|
+ */
|
||
|
+ for (int i = pool->cur_threads; i < pool->min_threads; i++) {
|
||
|
+ spawn_thread(pool);
|
||
|
+ }
|
||
|
+
|
||
|
+ for (int i = pool->cur_threads; i > pool->max_threads; i--) {
|
||
|
+ qemu_sem_post(&pool->sem);
|
||
|
+ }
|
||
|
+
|
||
|
+ qemu_mutex_unlock(&pool->lock);
|
||
|
+}
|
||
|
+
|
||
|
static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
|
||
|
{
|
||
|
if (!ctx) {
|
||
|
@@ -306,11 +352,12 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
|
||
|
qemu_mutex_init(&pool->lock);
|
||
|
qemu_cond_init(&pool->worker_stopped);
|
||
|
qemu_sem_init(&pool->sem, 0);
|
||
|
- pool->max_threads = 64;
|
||
|
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
|
||
|
|
||
|
QLIST_INIT(&pool->head);
|
||
|
QTAILQ_INIT(&pool->request_list);
|
||
|
+
|
||
|
+ thread_pool_update_params(pool, ctx);
|
||
|
}
|
||
|
|
||
|
ThreadPool *thread_pool_new(AioContext *ctx)
|
||
|
--
|
||
|
2.31.1
|
||
|
|