You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1280 lines
36 KiB
1280 lines
36 KiB
1 year ago
|
From d0e4238471c7bcb0757a877622fea593e8906fcc Mon Sep 17 00:00:00 2001
|
||
|
From: "Richard W.M. Jones" <rjones@redhat.com>
|
||
|
Date: Thu, 27 Jul 2023 17:08:25 +0100
|
||
|
Subject: [PATCH] curl: Use curl multi interface
|
||
|
|
||
|
See the comment at the top of plugins/curl/pool.c for general
|
||
|
information about how this works.
|
||
|
|
||
|
This makes a very large difference to performance over the previous
|
||
|
implementation. Note for the tests below I also applied the next
|
||
|
commit changing the behaviour of the connections parameter.
|
||
|
|
||
|
Using this test case:
|
||
|
|
||
|
$ http=https://cloud-images.ubuntu.com/lunar/current/lunar-server-cloudimg-amd64.img
|
||
|
$ nbdkit -r -U - curl $http ipresolve=v4 --run 'nbdcopy -p $uri null'
|
||
|
|
||
|
The times are as follows:
|
||
|
|
||
|
multi, connections=64 17.2s
|
||
|
multi, connections=32 21.7s
|
||
|
wget 28.4s
|
||
|
multi, connections=16 41.3s
|
||
|
before this commit 180s
|
||
|
|
||
|
(cherry picked from commit a74b289ee15a7c75dceb8a96403f1fa8ebd72e88)
|
||
|
---
|
||
|
plugins/curl/config.c | 246 --------------------------
|
||
|
plugins/curl/curl.c | 366 +++++++++++++++++++++++++++++++++-----
|
||
|
plugins/curl/curldefs.h | 38 ++--
|
||
|
plugins/curl/pool.c | 381 ++++++++++++++++++++++++++++++++--------
|
||
|
4 files changed, 652 insertions(+), 379 deletions(-)
|
||
|
|
||
|
diff --git a/plugins/curl/config.c b/plugins/curl/config.c
|
||
|
index 276c79d5..ce82d5f9 100644
|
||
|
--- a/plugins/curl/config.c
|
||
|
+++ b/plugins/curl/config.c
|
||
|
@@ -48,8 +48,6 @@
|
||
|
|
||
|
#include <nbdkit-plugin.h>
|
||
|
|
||
|
-#include "ascii-ctype.h"
|
||
|
-#include "ascii-string.h"
|
||
|
#include "cleanup.h"
|
||
|
|
||
|
#include "curldefs.h"
|
||
|
@@ -89,12 +87,6 @@ static const char *user_agent = NULL;
|
||
|
|
||
|
static int debug_cb (CURL *handle, curl_infotype type,
|
||
|
const char *data, size_t size, void *);
|
||
|
-static size_t write_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
|
||
|
-static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
|
||
|
-static int get_content_length_accept_range (struct curl_handle *ch);
|
||
|
-static bool try_fallback_GET_method (struct curl_handle *ch);
|
||
|
-static size_t header_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
|
||
|
-static size_t error_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
|
||
|
|
||
|
/* Use '-D curl.verbose=1' to set. */
|
||
|
NBDKIT_DLL_PUBLIC int curl_debug_verbose = 0;
|
||
|
@@ -671,17 +663,9 @@ allocate_handle (void)
|
||
|
if (user_agent)
|
||
|
curl_easy_setopt (ch->c, CURLOPT_USERAGENT, user_agent);
|
||
|
|
||
|
- if (get_content_length_accept_range (ch) == -1)
|
||
|
- goto err;
|
||
|
-
|
||
|
/* Get set up for reading and writing. */
|
||
|
curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, NULL);
|
||
|
curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, NULL);
|
||
|
- curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb);
|
||
|
- curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
|
||
|
- /* These are only used if !readonly but we always register them. */
|
||
|
- curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb);
|
||
|
- curl_easy_setopt (ch->c, CURLOPT_READDATA, ch);
|
||
|
|
||
|
return ch;
|
||
|
|
||
|
@@ -771,233 +755,3 @@ debug_cb (CURL *handle, curl_infotype type,
|
||
|
out:
|
||
|
return 0;
|
||
|
}
|
||
|
-
|
||
|
-/* NB: The terminology used by libcurl is confusing!
|
||
|
- *
|
||
|
- * WRITEFUNCTION / write_cb is used when reading from the remote server
|
||
|
- * READFUNCTION / read_cb is used when writing to the remote server.
|
||
|
- *
|
||
|
- * We use the same terminology as libcurl here.
|
||
|
- */
|
||
|
-
|
||
|
-static size_t
|
||
|
-write_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
|
||
|
-{
|
||
|
- struct curl_handle *ch = opaque;
|
||
|
- size_t orig_realsize = size * nmemb;
|
||
|
- size_t realsize = orig_realsize;
|
||
|
-
|
||
|
- assert (ch->write_buf);
|
||
|
-
|
||
|
- /* Don't read more than the requested amount of data, even if the
|
||
|
- * server or libcurl sends more.
|
||
|
- */
|
||
|
- if (realsize > ch->write_count)
|
||
|
- realsize = ch->write_count;
|
||
|
-
|
||
|
- memcpy (ch->write_buf, ptr, realsize);
|
||
|
-
|
||
|
- ch->write_count -= realsize;
|
||
|
- ch->write_buf += realsize;
|
||
|
-
|
||
|
- return orig_realsize;
|
||
|
-}
|
||
|
-
|
||
|
-static size_t
|
||
|
-read_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
|
||
|
-{
|
||
|
- struct curl_handle *ch = opaque;
|
||
|
- size_t realsize = size * nmemb;
|
||
|
-
|
||
|
- assert (ch->read_buf);
|
||
|
- if (realsize > ch->read_count)
|
||
|
- realsize = ch->read_count;
|
||
|
-
|
||
|
- memcpy (ptr, ch->read_buf, realsize);
|
||
|
-
|
||
|
- ch->read_count -= realsize;
|
||
|
- ch->read_buf += realsize;
|
||
|
-
|
||
|
- return realsize;
|
||
|
-}
|
||
|
-
|
||
|
-/* Get the file size and also whether the remote HTTP server
|
||
|
- * supports byte ranges.
|
||
|
- */
|
||
|
-static int
|
||
|
-get_content_length_accept_range (struct curl_handle *ch)
|
||
|
-{
|
||
|
- CURLcode r;
|
||
|
- long code;
|
||
|
-#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
|
||
|
- curl_off_t o;
|
||
|
-#else
|
||
|
- double d;
|
||
|
-#endif
|
||
|
-
|
||
|
- /* We must run the scripts if necessary and set headers in the
|
||
|
- * handle.
|
||
|
- */
|
||
|
- if (do_scripts (ch) == -1)
|
||
|
- return -1;
|
||
|
-
|
||
|
- /* Set this flag in the handle to false. The callback should set it
|
||
|
- * to true if byte ranges are supported, which we check below.
|
||
|
- */
|
||
|
- ch->accept_range = false;
|
||
|
-
|
||
|
- /* No Body, not nobody! This forces a HEAD request. */
|
||
|
- curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L);
|
||
|
- curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
|
||
|
- curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
|
||
|
- r = curl_easy_perform (ch->c);
|
||
|
- update_times (ch->c);
|
||
|
- if (r != CURLE_OK) {
|
||
|
- display_curl_error (ch, r,
|
||
|
- "problem doing HEAD request to fetch size of URL [%s]",
|
||
|
- url);
|
||
|
-
|
||
|
- /* Get the HTTP status code, if available. */
|
||
|
- r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code);
|
||
|
- if (r == CURLE_OK)
|
||
|
- nbdkit_debug ("HTTP status code: %ld", code);
|
||
|
- else
|
||
|
- code = -1;
|
||
|
-
|
||
|
- /* See comment on try_fallback_GET_method below. */
|
||
|
- if (code != 403 || !try_fallback_GET_method (ch))
|
||
|
- return -1;
|
||
|
- }
|
||
|
-
|
||
|
- /* Get the content length.
|
||
|
- *
|
||
|
- * Note there is some subtlety here: For web servers using chunked
|
||
|
- * encoding, either the Content-Length header will not be present,
|
||
|
- * or if present it should be ignored. (For such servers the only
|
||
|
- * way to find out the true length would be to read all of the
|
||
|
- * content, which we don't want to do).
|
||
|
- *
|
||
|
- * Curl itself resolves this for us. It will ignore the
|
||
|
- * Content-Length header if chunked encoding is used, returning the
|
||
|
- * length as -1 which we check below (see also
|
||
|
- * curl:lib/http.c:Curl_http_size).
|
||
|
- */
|
||
|
-#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
|
||
|
- r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &o);
|
||
|
- if (r != CURLE_OK) {
|
||
|
- display_curl_error (ch, r,
|
||
|
- "could not get length of remote file [%s]", url);
|
||
|
- return -1;
|
||
|
- }
|
||
|
-
|
||
|
- if (o == -1) {
|
||
|
- nbdkit_error ("could not get length of remote file [%s], "
|
||
|
- "is the URL correct?", url);
|
||
|
- return -1;
|
||
|
- }
|
||
|
-
|
||
|
- ch->exportsize = o;
|
||
|
-#else
|
||
|
- r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d);
|
||
|
- if (r != CURLE_OK) {
|
||
|
- display_curl_error (ch, r,
|
||
|
- "could not get length of remote file [%s]", url);
|
||
|
- return -1;
|
||
|
- }
|
||
|
-
|
||
|
- if (d == -1) {
|
||
|
- nbdkit_error ("could not get length of remote file [%s], "
|
||
|
- "is the URL correct?", url);
|
||
|
- return -1;
|
||
|
- }
|
||
|
-
|
||
|
- ch->exportsize = d;
|
||
|
-#endif
|
||
|
- nbdkit_debug ("content length: %" PRIi64, ch->exportsize);
|
||
|
-
|
||
|
- /* If this is HTTP, check that byte ranges are supported. */
|
||
|
- if (ascii_strncasecmp (url, "http://", strlen ("http://")) == 0 ||
|
||
|
- ascii_strncasecmp (url, "https://", strlen ("https://")) == 0) {
|
||
|
- if (!ch->accept_range) {
|
||
|
- nbdkit_error ("server does not support 'range' (byte range) requests");
|
||
|
- return -1;
|
||
|
- }
|
||
|
-
|
||
|
- nbdkit_debug ("accept range supported (for HTTP/HTTPS)");
|
||
|
- }
|
||
|
-
|
||
|
- return 0;
|
||
|
-}
|
||
|
-
|
||
|
-/* S3 servers can return 403 Forbidden for HEAD but still respond
|
||
|
- * to GET, so we give it a second chance in that case.
|
||
|
- * https://github.com/kubevirt/containerized-data-importer/issues/2737
|
||
|
- *
|
||
|
- * This function issues a GET request with a writefunction that always
|
||
|
- * returns an error, thus effectively getting the headers but
|
||
|
- * abandoning the transfer as soon as possible after.
|
||
|
- */
|
||
|
-static bool
|
||
|
-try_fallback_GET_method (struct curl_handle *ch)
|
||
|
-{
|
||
|
- CURLcode r;
|
||
|
-
|
||
|
- nbdkit_debug ("attempting to fetch headers using GET method");
|
||
|
-
|
||
|
- curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L);
|
||
|
- curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
|
||
|
- curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
|
||
|
- curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb);
|
||
|
- curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
|
||
|
- r = curl_easy_perform (ch->c);
|
||
|
- update_times (ch->c);
|
||
|
-
|
||
|
- /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too
|
||
|
- * (eg if the remote has zero length). Other errors might happen
|
||
|
- * but we ignore them since it is a fallback path.
|
||
|
- */
|
||
|
- return r == CURLE_OK || r == CURLE_WRITE_ERROR;
|
||
|
-}
|
||
|
-
|
||
|
-static size_t
|
||
|
-header_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
|
||
|
-{
|
||
|
- struct curl_handle *ch = opaque;
|
||
|
- size_t realsize = size * nmemb;
|
||
|
- const char *header = ptr;
|
||
|
- const char *end = header + realsize;
|
||
|
- const char *accept_ranges = "accept-ranges:";
|
||
|
- const char *bytes = "bytes";
|
||
|
-
|
||
|
- if (realsize >= strlen (accept_ranges) &&
|
||
|
- ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) == 0) {
|
||
|
- const char *p = strchr (header, ':') + 1;
|
||
|
-
|
||
|
- /* Skip whitespace between the header name and value. */
|
||
|
- while (p < end && *p && ascii_isspace (*p))
|
||
|
- p++;
|
||
|
-
|
||
|
- if (end - p >= strlen (bytes)
|
||
|
- && strncmp (p, bytes, strlen (bytes)) == 0) {
|
||
|
- /* Check that there is nothing but whitespace after the value. */
|
||
|
- p += strlen (bytes);
|
||
|
- while (p < end && *p && ascii_isspace (*p))
|
||
|
- p++;
|
||
|
-
|
||
|
- if (p == end || !*p)
|
||
|
- ch->accept_range = true;
|
||
|
- }
|
||
|
- }
|
||
|
-
|
||
|
- return realsize;
|
||
|
-}
|
||
|
-
|
||
|
-static size_t
|
||
|
-error_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
|
||
|
-{
|
||
|
-#ifdef CURL_WRITEFUNC_ERROR
|
||
|
- return CURL_WRITEFUNC_ERROR;
|
||
|
-#else
|
||
|
- return 0; /* in older curl, any size < requested will also be an error */
|
||
|
-#endif
|
||
|
-}
|
||
|
diff --git a/plugins/curl/curl.c b/plugins/curl/curl.c
|
||
|
index be42de36..28cc7bbe 100644
|
||
|
--- a/plugins/curl/curl.c
|
||
|
+++ b/plugins/curl/curl.c
|
||
|
@@ -48,7 +48,8 @@
|
||
|
|
||
|
#include <nbdkit-plugin.h>
|
||
|
|
||
|
-#include "cleanup.h"
|
||
|
+#include "ascii-ctype.h"
|
||
|
+#include "ascii-string.h"
|
||
|
|
||
|
#include "curldefs.h"
|
||
|
|
||
|
@@ -118,32 +119,6 @@ curl_close (void *handle)
|
||
|
|
||
|
#define THREAD_MODEL NBDKIT_THREAD_MODEL_PARALLEL
|
||
|
|
||
|
-/* Calls get_handle() ... put_handle() to get a handle for the length
|
||
|
- * of the current scope.
|
||
|
- */
|
||
|
-#define GET_HANDLE_FOR_CURRENT_SCOPE(ch) \
|
||
|
- CLEANUP_PUT_HANDLE struct curl_handle *ch = get_handle ();
|
||
|
-#define CLEANUP_PUT_HANDLE __attribute__ ((cleanup (cleanup_put_handle)))
|
||
|
-static void
|
||
|
-cleanup_put_handle (void *chp)
|
||
|
-{
|
||
|
- struct curl_handle *ch = * (struct curl_handle **) chp;
|
||
|
-
|
||
|
- if (ch != NULL)
|
||
|
- put_handle (ch);
|
||
|
-}
|
||
|
-
|
||
|
-/* Get the file size. */
|
||
|
-static int64_t
|
||
|
-curl_get_size (void *handle)
|
||
|
-{
|
||
|
- GET_HANDLE_FOR_CURRENT_SCOPE (ch);
|
||
|
- if (ch == NULL)
|
||
|
- return -1;
|
||
|
-
|
||
|
- return ch->exportsize;
|
||
|
-}
|
||
|
-
|
||
|
/* Multi-conn is safe for read-only connections, but HTTP does not
|
||
|
* have any concept of flushing so we cannot use it for read-write
|
||
|
* connections.
|
||
|
@@ -156,23 +131,253 @@ curl_can_multi_conn (void *handle)
|
||
|
return !! h->readonly;
|
||
|
}
|
||
|
|
||
|
+/* Get the file size. */
|
||
|
+static int get_content_length_accept_range (struct curl_handle *ch);
|
||
|
+static bool try_fallback_GET_method (struct curl_handle *ch);
|
||
|
+static size_t header_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
|
||
|
+static size_t error_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
|
||
|
+
|
||
|
+static int64_t
|
||
|
+curl_get_size (void *handle)
|
||
|
+{
|
||
|
+ struct curl_handle *ch;
|
||
|
+ CURLcode r;
|
||
|
+ long code;
|
||
|
+#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
|
||
|
+ curl_off_t o;
|
||
|
+#else
|
||
|
+ double d;
|
||
|
+#endif
|
||
|
+ int64_t exportsize;
|
||
|
+
|
||
|
+ /* Get a curl easy handle. */
|
||
|
+ ch = allocate_handle ();
|
||
|
+ if (ch == NULL) goto err;
|
||
|
+
|
||
|
+ /* Prepare to read the headers. */
|
||
|
+ if (get_content_length_accept_range (ch) == -1)
|
||
|
+ goto err;
|
||
|
+
|
||
|
+ /* Send the command to the worker thread and wait. */
|
||
|
+ struct command cmd = {
|
||
|
+ .type = EASY_HANDLE,
|
||
|
+ .ch = ch,
|
||
|
+ };
|
||
|
+
|
||
|
+ r = send_command_and_wait (&cmd);
|
||
|
+ update_times (ch->c);
|
||
|
+ if (r != CURLE_OK) {
|
||
|
+ display_curl_error (ch, r,
|
||
|
+ "problem doing HEAD request to fetch size of URL [%s]",
|
||
|
+ url);
|
||
|
+
|
||
|
+ /* Get the HTTP status code, if available. */
|
||
|
+ r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code);
|
||
|
+ if (r == CURLE_OK)
|
||
|
+ nbdkit_debug ("HTTP status code: %ld", code);
|
||
|
+ else
|
||
|
+ code = -1;
|
||
|
+
|
||
|
+ /* See comment on try_fallback_GET_method below. */
|
||
|
+ if (code != 403 || !try_fallback_GET_method (ch))
|
||
|
+ goto err;
|
||
|
+ }
|
||
|
+
|
||
|
+ /* Get the content length.
|
||
|
+ *
|
||
|
+ * Note there is some subtlety here: For web servers using chunked
|
||
|
+ * encoding, either the Content-Length header will not be present,
|
||
|
+ * or if present it should be ignored. (For such servers the only
|
||
|
+ * way to find out the true length would be to read all of the
|
||
|
+ * content, which we don't want to do).
|
||
|
+ *
|
||
|
+ * Curl itself resolves this for us. It will ignore the
|
||
|
+ * Content-Length header if chunked encoding is used, returning the
|
||
|
+ * length as -1 which we check below (see also
|
||
|
+ * curl:lib/http.c:Curl_http_size).
|
||
|
+ */
|
||
|
+#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
|
||
|
+ r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &o);
|
||
|
+ if (r != CURLE_OK) {
|
||
|
+ display_curl_error (ch, r,
|
||
|
+ "could not get length of remote file [%s]", url);
|
||
|
+ goto err;
|
||
|
+ }
|
||
|
+
|
||
|
+ if (o == -1) {
|
||
|
+ nbdkit_error ("could not get length of remote file [%s], "
|
||
|
+ "is the URL correct?", url);
|
||
|
+ goto err;
|
||
|
+ }
|
||
|
+
|
||
|
+ exportsize = o;
|
||
|
+#else
|
||
|
+ r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d);
|
||
|
+ if (r != CURLE_OK) {
|
||
|
+ display_curl_error (ch, r,
|
||
|
+ "could not get length of remote file [%s]", url);
|
||
|
+ goto err;
|
||
|
+ }
|
||
|
+
|
||
|
+ if (d == -1) {
|
||
|
+ nbdkit_error ("could not get length of remote file [%s], "
|
||
|
+ "is the URL correct?", url);
|
||
|
+ goto err;
|
||
|
+ }
|
||
|
+
|
||
|
+ exportsize = d;
|
||
|
+#endif
|
||
|
+ nbdkit_debug ("content length: %" PRIi64, exportsize);
|
||
|
+
|
||
|
+ /* If this is HTTP, check that byte ranges are supported. */
|
||
|
+ if (ascii_strncasecmp (url, "http://", strlen ("http://")) == 0 ||
|
||
|
+ ascii_strncasecmp (url, "https://", strlen ("https://")) == 0) {
|
||
|
+ if (!ch->accept_range) {
|
||
|
+ nbdkit_error ("server does not support 'range' (byte range) requests");
|
||
|
+ goto err;
|
||
|
+ }
|
||
|
+
|
||
|
+ nbdkit_debug ("accept range supported (for HTTP/HTTPS)");
|
||
|
+ }
|
||
|
+
|
||
|
+ free_handle (ch);
|
||
|
+ return exportsize;
|
||
|
+
|
||
|
+ err:
|
||
|
+ if (ch)
|
||
|
+ free_handle (ch);
|
||
|
+ return -1;
|
||
|
+}
|
||
|
+
|
||
|
+/* Get the file size and also whether the remote HTTP server
|
||
|
+ * supports byte ranges.
|
||
|
+ */
|
||
|
+static int
|
||
|
+get_content_length_accept_range (struct curl_handle *ch)
|
||
|
+{
|
||
|
+ /* We must run the scripts if necessary and set headers in the
|
||
|
+ * handle.
|
||
|
+ */
|
||
|
+ if (do_scripts (ch) == -1)
|
||
|
+ return -1;
|
||
|
+
|
||
|
+ /* Set this flag in the handle to false. The callback should set it
|
||
|
+ * to true if byte ranges are supported, which we check below.
|
||
|
+ */
|
||
|
+ ch->accept_range = false;
|
||
|
+
|
||
|
+ /* No Body, not nobody! This forces a HEAD request. */
|
||
|
+ curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L);
|
||
|
+ curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
|
||
|
+ curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
|
||
|
+ curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, NULL);
|
||
|
+ curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, NULL);
|
||
|
+ curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, NULL);
|
||
|
+ curl_easy_setopt (ch->c, CURLOPT_READDATA, NULL);
|
||
|
+ return 0;
|
||
|
+}
|
||
|
+
|
||
|
+/* S3 servers can return 403 Forbidden for HEAD but still respond
|
||
|
+ * to GET, so we give it a second chance in that case.
|
||
|
+ * https://github.com/kubevirt/containerized-data-importer/issues/2737
|
||
|
+ *
|
||
|
+ * This function issues a GET request with a writefunction that always
|
||
|
+ * returns an error, thus effectively getting the headers but
|
||
|
+ * abandoning the transfer as soon as possible after.
|
||
|
+ */
|
||
|
+static bool
|
||
|
+try_fallback_GET_method (struct curl_handle *ch)
|
||
|
+{
|
||
|
+ CURLcode r;
|
||
|
+
|
||
|
+ nbdkit_debug ("attempting to fetch headers using GET method");
|
||
|
+
|
||
|
+ curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L);
|
||
|
+ curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
|
||
|
+ curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
|
||
|
+ curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb);
|
||
|
+ curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
|
||
|
+
|
||
|
+ struct command cmd = {
|
||
|
+ .type = EASY_HANDLE,
|
||
|
+ .ch = ch,
|
||
|
+ };
|
||
|
+
|
||
|
+ r = send_command_and_wait (&cmd);
|
||
|
+ update_times (ch->c);
|
||
|
+
|
||
|
+ /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too
|
||
|
+ * (eg if the remote has zero length). Other errors might happen
|
||
|
+ * but we ignore them since it is a fallback path.
|
||
|
+ */
|
||
|
+ return r == CURLE_OK || r == CURLE_WRITE_ERROR;
|
||
|
+}
|
||
|
+
|
||
|
+static size_t
|
||
|
+header_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
|
||
|
+{
|
||
|
+ struct curl_handle *ch = opaque;
|
||
|
+ size_t realsize = size * nmemb;
|
||
|
+ const char *header = ptr;
|
||
|
+ const char *end = header + realsize;
|
||
|
+ const char *accept_ranges = "accept-ranges:";
|
||
|
+ const char *bytes = "bytes";
|
||
|
+
|
||
|
+ if (realsize >= strlen (accept_ranges) &&
|
||
|
+ ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) == 0) {
|
||
|
+ const char *p = strchr (header, ':') + 1;
|
||
|
+
|
||
|
+ /* Skip whitespace between the header name and value. */
|
||
|
+ while (p < end && *p && ascii_isspace (*p))
|
||
|
+ p++;
|
||
|
+
|
||
|
+ if (end - p >= strlen (bytes)
|
||
|
+ && strncmp (p, bytes, strlen (bytes)) == 0) {
|
||
|
+ /* Check that there is nothing but whitespace after the value. */
|
||
|
+ p += strlen (bytes);
|
||
|
+ while (p < end && *p && ascii_isspace (*p))
|
||
|
+ p++;
|
||
|
+
|
||
|
+ if (p == end || !*p)
|
||
|
+ ch->accept_range = true;
|
||
|
+ }
|
||
|
+ }
|
||
|
+
|
||
|
+ return realsize;
|
||
|
+}
|
||
|
+
|
||
|
+static size_t
|
||
|
+error_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
|
||
|
+{
|
||
|
+#ifdef CURL_WRITEFUNC_ERROR
|
||
|
+ return CURL_WRITEFUNC_ERROR;
|
||
|
+#else
|
||
|
+ return 0; /* in older curl, any size < requested will also be an error */
|
||
|
+#endif
|
||
|
+}
|
||
|
+
|
||
|
/* Read data from the remote server. */
|
||
|
+static size_t write_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
|
||
|
+
|
||
|
static int
|
||
|
curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
|
||
|
{
|
||
|
CURLcode r;
|
||
|
+ struct curl_handle *ch;
|
||
|
char range[128];
|
||
|
|
||
|
- GET_HANDLE_FOR_CURRENT_SCOPE (ch);
|
||
|
- if (ch == NULL)
|
||
|
- return -1;
|
||
|
+ /* Get a curl easy handle. */
|
||
|
+ ch = allocate_handle ();
|
||
|
+ if (ch == NULL) goto err;
|
||
|
|
||
|
/* Run the scripts if necessary and set headers in the handle. */
|
||
|
- if (do_scripts (ch) == -1) return -1;
|
||
|
+ if (do_scripts (ch) == -1) goto err;
|
||
|
|
||
|
/* Tell the write_cb where we want the data to be written. write_cb
|
||
|
* will update this if the data comes in multiple sections.
|
||
|
*/
|
||
|
+ curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb);
|
||
|
+ curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
|
||
|
ch->write_buf = buf;
|
||
|
ch->write_count = count;
|
||
|
|
||
|
@@ -183,11 +388,16 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
|
||
|
offset, offset + count);
|
||
|
curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
|
||
|
|
||
|
- /* The assumption here is that curl will look after timeouts. */
|
||
|
- r = curl_easy_perform (ch->c);
|
||
|
+ /* Send the command to the worker thread and wait. */
|
||
|
+ struct command cmd = {
|
||
|
+ .type = EASY_HANDLE,
|
||
|
+ .ch = ch,
|
||
|
+ };
|
||
|
+
|
||
|
+ r = send_command_and_wait (&cmd);
|
||
|
if (r != CURLE_OK) {
|
||
|
- display_curl_error (ch, r, "pread: curl_easy_perform");
|
||
|
- return -1;
|
||
|
+ display_curl_error (ch, r, "pread");
|
||
|
+ goto err;
|
||
|
}
|
||
|
update_times (ch->c);
|
||
|
|
||
|
@@ -198,26 +408,67 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
|
||
|
/* As far as I understand the cURL API, this should never happen. */
|
||
|
assert (ch->write_count == 0);
|
||
|
|
||
|
+ free_handle (ch);
|
||
|
return 0;
|
||
|
+
|
||
|
+ err:
|
||
|
+ if (ch)
|
||
|
+ free_handle (ch);
|
||
|
+ return -1;
|
||
|
+}
|
||
|
+
|
||
|
+/* NB: The terminology used by libcurl is confusing!
|
||
|
+ *
|
||
|
+ * WRITEFUNCTION / write_cb is used when reading from the remote server
|
||
|
+ * READFUNCTION / read_cb is used when writing to the remote server.
|
||
|
+ *
|
||
|
+ * We use the same terminology as libcurl here.
|
||
|
+ */
|
||
|
+static size_t
|
||
|
+write_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
|
||
|
+{
|
||
|
+ struct curl_handle *ch = opaque;
|
||
|
+ size_t orig_realsize = size * nmemb;
|
||
|
+ size_t realsize = orig_realsize;
|
||
|
+
|
||
|
+ assert (ch->write_buf);
|
||
|
+
|
||
|
+ /* Don't read more than the requested amount of data, even if the
|
||
|
+ * server or libcurl sends more.
|
||
|
+ */
|
||
|
+ if (realsize > ch->write_count)
|
||
|
+ realsize = ch->write_count;
|
||
|
+
|
||
|
+ memcpy (ch->write_buf, ptr, realsize);
|
||
|
+
|
||
|
+ ch->write_count -= realsize;
|
||
|
+ ch->write_buf += realsize;
|
||
|
+
|
||
|
+ return orig_realsize;
|
||
|
}
|
||
|
|
||
|
/* Write data to the remote server. */
|
||
|
+static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
|
||
|
+
|
||
|
static int
|
||
|
curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset)
|
||
|
{
|
||
|
CURLcode r;
|
||
|
+ struct curl_handle *ch;
|
||
|
char range[128];
|
||
|
|
||
|
- GET_HANDLE_FOR_CURRENT_SCOPE (ch);
|
||
|
- if (ch == NULL)
|
||
|
- return -1;
|
||
|
+ /* Get a curl easy handle. */
|
||
|
+ ch = allocate_handle ();
|
||
|
+ if (ch == NULL) goto err;
|
||
|
|
||
|
/* Run the scripts if necessary and set headers in the handle. */
|
||
|
- if (do_scripts (ch) == -1) return -1;
|
||
|
+ if (do_scripts (ch) == -1) goto err;
|
||
|
|
||
|
/* Tell the read_cb where we want the data to be read from. read_cb
|
||
|
* will update this if the data comes in multiple sections.
|
||
|
*/
|
||
|
+ curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb);
|
||
|
+ curl_easy_setopt (ch->c, CURLOPT_READDATA, ch);
|
||
|
ch->read_buf = buf;
|
||
|
ch->read_count = count;
|
||
|
|
||
|
@@ -228,11 +479,16 @@ curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset)
|
||
|
offset, offset + count);
|
||
|
curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
|
||
|
|
||
|
- /* The assumption here is that curl will look after timeouts. */
|
||
|
- r = curl_easy_perform (ch->c);
|
||
|
+ /* Send the command to the worker thread and wait. */
|
||
|
+ struct command cmd = {
|
||
|
+ .type = EASY_HANDLE,
|
||
|
+ .ch = ch,
|
||
|
+ };
|
||
|
+
|
||
|
+ r = send_command_and_wait (&cmd);
|
||
|
if (r != CURLE_OK) {
|
||
|
- display_curl_error (ch, r, "pwrite: curl_easy_perform");
|
||
|
- return -1;
|
||
|
+ display_curl_error (ch, r, "pwrite");
|
||
|
+ goto err;
|
||
|
}
|
||
|
update_times (ch->c);
|
||
|
|
||
|
@@ -243,7 +499,31 @@ curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset)
|
||
|
/* As far as I understand the cURL API, this should never happen. */
|
||
|
assert (ch->read_count == 0);
|
||
|
|
||
|
+ free_handle (ch);
|
||
|
return 0;
|
||
|
+
|
||
|
+ err:
|
||
|
+ if (ch)
|
||
|
+ free_handle (ch);
|
||
|
+ return -1;
|
||
|
+}
|
||
|
+
|
||
|
+static size_t
|
||
|
+read_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
|
||
|
+{
|
||
|
+ struct curl_handle *ch = opaque;
|
||
|
+ size_t realsize = size * nmemb;
|
||
|
+
|
||
|
+ assert (ch->read_buf);
|
||
|
+ if (realsize > ch->read_count)
|
||
|
+ realsize = ch->read_count;
|
||
|
+
|
||
|
+ memcpy (ptr, ch->read_buf, realsize);
|
||
|
+
|
||
|
+ ch->read_count -= realsize;
|
||
|
+ ch->read_buf += realsize;
|
||
|
+
|
||
|
+ return realsize;
|
||
|
}
|
||
|
|
||
|
static struct nbdkit_plugin plugin = {
|
||
|
diff --git a/plugins/curl/curldefs.h b/plugins/curl/curldefs.h
|
||
|
index 939c8d37..6b158d85 100644
|
||
|
--- a/plugins/curl/curldefs.h
|
||
|
+++ b/plugins/curl/curldefs.h
|
||
|
@@ -62,6 +62,9 @@
|
||
|
#define HAVE_CURLINFO_TOTAL_TIME_T
|
||
|
#define HAVE_CURLINFO_REDIRECT_TIME_T
|
||
|
#endif
|
||
|
+#if CURL_AT_LEAST_VERSION (7, 66, 0)
|
||
|
+#define HAVE_CURL_MULTI_POLL
|
||
|
+#endif
|
||
|
#if CURL_AT_LEAST_VERSION (8, 2, 0)
|
||
|
#define HAVE_CURLINFO_CONN_ID
|
||
|
#define HAVE_CURLINFO_XFER_ID
|
||
|
@@ -89,16 +92,6 @@ struct curl_handle {
|
||
|
/* The underlying curl handle. */
|
||
|
CURL *c;
|
||
|
|
||
|
- /* Index of this handle in the pool (for debugging). */
|
||
|
- size_t i;
|
||
|
-
|
||
|
- /* True if the handle is in use by a thread. */
|
||
|
- bool in_use;
|
||
|
-
|
||
|
- /* These fields are used/initialized when we create the handle. */
|
||
|
- bool accept_range;
|
||
|
- int64_t exportsize;
|
||
|
-
|
||
|
char errbuf[CURL_ERROR_SIZE];
|
||
|
|
||
|
/* Before doing a read or write operation, set these to point to the
|
||
|
@@ -111,8 +104,30 @@ struct curl_handle {
|
||
|
const char *read_buf;
|
||
|
uint32_t read_count;
|
||
|
|
||
|
+ /* This field is used by curl_get_size. */
|
||
|
+ bool accept_range;
|
||
|
+
|
||
|
/* Used by scripts.c */
|
||
|
struct curl_slist *headers_copy;
|
||
|
+
|
||
|
+ /* Used by pool.c */
|
||
|
+ struct command *cmd;
|
||
|
+};
|
||
|
+
|
||
|
+/* Asynchronous commands that can be sent to the pool thread. */
|
||
|
+enum command_type { EASY_HANDLE, STOP };
|
||
|
+struct command {
|
||
|
+ /* These fields are set by the caller. */
|
||
|
+ enum command_type type; /* command */
|
||
|
+ struct curl_handle *ch; /* for EASY_HANDLE, the easy handle */
|
||
|
+
|
||
|
+ /* This field is set to a unique value by send_command_and_wait. */
|
||
|
+ uint64_t id; /* serial number */
|
||
|
+
|
||
|
+ /* These fields are used to signal back that the command finished. */
|
||
|
+ pthread_mutex_t mutex; /* completion mutex */
|
||
|
+ pthread_cond_t cond; /* completion condition */
|
||
|
+ CURLcode status; /* status code (CURLE_OK = succeeded) */
|
||
|
};
|
||
|
|
||
|
/* config.c */
|
||
|
@@ -127,8 +142,7 @@ extern void free_handle (struct curl_handle *);
|
||
|
extern int pool_get_ready (void);
|
||
|
extern int pool_after_fork (void);
|
||
|
extern void pool_unload (void);
|
||
|
-extern struct curl_handle *get_handle (void);
|
||
|
-extern void put_handle (struct curl_handle *ch);
|
||
|
+extern CURLcode send_command_and_wait (struct command *cmd);
|
||
|
|
||
|
/* scripts.c */
|
||
|
extern int do_scripts (struct curl_handle *ch);
|
||
|
diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c
|
||
|
index eb2d330e..254951d1 100644
|
||
|
--- a/plugins/curl/pool.c
|
||
|
+++ b/plugins/curl/pool.c
|
||
|
@@ -30,11 +30,29 @@
|
||
|
* SUCH DAMAGE.
|
||
|
*/
|
||
|
|
||
|
-/* Curl handle pool.
|
||
|
+/* Worker thread which processes the curl multi interface.
|
||
|
*
|
||
|
- * To get a libcurl handle, call get_handle(). When you hold the
|
||
|
- * handle, it is yours exclusively to use. After you have finished
|
||
|
- * with the handle, put it back into the pool by calling put_handle().
|
||
|
+ * The main nbdkit threads (see curl.c) create curl easy handles
|
||
|
+ * initialized with the work they want to carry out. Note there is
|
||
|
+ * one easy handle per task (eg. per pread/pwrite request). The easy
|
||
|
+ * handles are not reused.
|
||
|
+ *
|
||
|
+ * The commands + optional easy handle are submitted to the worker
|
||
|
+ * thread over a self-pipe (it's easy to use a pipe here because the
|
||
|
+ * way curl multi works is it can listen on an extra fd, but not on
|
||
|
+ * anything else like a pthread condition). The curl multi performs
|
||
|
+ * the work of the outstanding easy handles.
|
||
|
+ *
|
||
|
+ * When an easy handle finishes work or errors, we retire the command
|
||
|
+ * by signalling back to the waiting nbdkit thread using a pthread
|
||
|
+ * condition.
|
||
|
+ *
|
||
|
+ * In my experiments, we're almost always I/O bound so I haven't seen
|
||
|
+ * any strong need to use more than one curl multi / worker thread,
|
||
|
+ * although it would be possible to add more in future.
|
||
|
+ *
|
||
|
+ * See also this extremely useful thread:
|
||
|
+ * https://curl.se/mail/lib-2019-03/0100.html
|
||
|
*/
|
||
|
|
||
|
#include <config.h>
|
||
|
@@ -45,6 +63,7 @@
|
||
|
#include <stdint.h>
|
||
|
#include <inttypes.h>
|
||
|
#include <string.h>
|
||
|
+#include <unistd.h>
|
||
|
#include <assert.h>
|
||
|
#include <pthread.h>
|
||
|
|
||
|
@@ -62,115 +81,321 @@ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0;
|
||
|
|
||
|
unsigned connections = 4;
|
||
|
|
||
|
-/* This lock protects access to the curl_handles vector below. */
|
||
|
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
|
||
|
+/* Pipe used to notify background thread that a command is pending in
|
||
|
+ * the queue. A pointer to the 'struct command' is sent over the
|
||
|
+ * pipe.
|
||
|
+ */
|
||
|
+static int self_pipe[2] = { -1, -1 };
|
||
|
|
||
|
-/* List of curl handles. This is allocated dynamically as more
|
||
|
- * handles are requested. Currently it does not shrink. It may grow
|
||
|
- * up to 'connections' in length.
|
||
|
+/* The curl multi handle. */
|
||
|
+static CURLM *multi;
|
||
|
+
|
||
|
+/* List of running easy handles. We only need to maintain this so we
|
||
|
+ * can remove them from the multi handle when cleaning up.
|
||
|
*/
|
||
|
DEFINE_VECTOR_TYPE (curl_handle_list, struct curl_handle *);
|
||
|
static curl_handle_list curl_handles = empty_vector;
|
||
|
|
||
|
-/* The condition is used when the curl handles vector is full and
|
||
|
- * we're waiting for a thread to put_handle.
|
||
|
- */
|
||
|
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
|
||
|
-static size_t in_use = 0, waiting = 0;
|
||
|
+static const char *
|
||
|
+command_type_to_string (enum command_type type)
|
||
|
+{
|
||
|
+ switch (type) {
|
||
|
+ case EASY_HANDLE: return "EASY_HANDLE";
|
||
|
+ case STOP: return "STOP";
|
||
|
+ default: abort ();
|
||
|
+ }
|
||
|
+}
|
||
|
|
||
|
int
|
||
|
pool_get_ready (void)
|
||
|
{
|
||
|
+ multi = curl_multi_init ();
|
||
|
+ if (multi == NULL) {
|
||
|
+ nbdkit_error ("curl_multi_init failed: %m");
|
||
|
+ return -1;
|
||
|
+ }
|
||
|
+
|
||
|
return 0;
|
||
|
}
|
||
|
|
||
|
+/* Start and stop the background thread. */
|
||
|
+static pthread_t thread;
|
||
|
+static bool thread_running;
|
||
|
+static void *pool_worker (void *);
|
||
|
+
|
||
|
int
|
||
|
pool_after_fork (void)
|
||
|
{
|
||
|
+ int err;
|
||
|
+
|
||
|
+ if (pipe (self_pipe) == -1) {
|
||
|
+ nbdkit_error ("pipe: %m");
|
||
|
+ return -1;
|
||
|
+ }
|
||
|
+
|
||
|
+ /* Start the pool background thread where all the curl work is done. */
|
||
|
+ err = pthread_create (&thread, NULL, pool_worker, NULL);
|
||
|
+ if (err != 0) {
|
||
|
+ errno = err;
|
||
|
+ nbdkit_error ("pthread_create: %m");
|
||
|
+ return -1;
|
||
|
+ }
|
||
|
+ thread_running = true;
|
||
|
+
|
||
|
return 0;
|
||
|
}
|
||
|
|
||
|
-/* Close and free all handles in the pool. */
|
||
|
+/* Unload the background thread. */
|
||
|
void
|
||
|
pool_unload (void)
|
||
|
{
|
||
|
- size_t i;
|
||
|
+ if (thread_running) {
|
||
|
+ /* Stop the background thread. */
|
||
|
+ struct command cmd = { .type = STOP };
|
||
|
+ send_command_and_wait (&cmd);
|
||
|
+ pthread_join (thread, NULL);
|
||
|
+ thread_running = false;
|
||
|
+ }
|
||
|
|
||
|
- if (curl_debug_pool)
|
||
|
- nbdkit_debug ("unload_pool: number of curl handles allocated: %zu",
|
||
|
- curl_handles.len);
|
||
|
+ if (self_pipe[0] >= 0) {
|
||
|
+ close (self_pipe[0]);
|
||
|
+ self_pipe[0] = -1;
|
||
|
+ }
|
||
|
+ if (self_pipe[1] >= 0) {
|
||
|
+ close (self_pipe[1]);
|
||
|
+ self_pipe[1] = -1;
|
||
|
+ }
|
||
|
|
||
|
- for (i = 0; i < curl_handles.len; ++i)
|
||
|
- free_handle (curl_handles.ptr[i]);
|
||
|
- curl_handle_list_reset (&curl_handles);
|
||
|
+ if (multi) {
|
||
|
+ size_t i;
|
||
|
+
|
||
|
+ /* Remove and free any easy handles in the multi. */
|
||
|
+ for (i = 0; i < curl_handles.len; ++i) {
|
||
|
+ curl_multi_remove_handle (multi, curl_handles.ptr[i]->c);
|
||
|
+ free_handle (curl_handles.ptr[i]);
|
||
|
+ }
|
||
|
+
|
||
|
+ curl_multi_cleanup (multi);
|
||
|
+ multi = NULL;
|
||
|
+ }
|
||
|
}
|
||
|
|
||
|
-/* Get a handle from the pool.
|
||
|
- *
|
||
|
- * It is owned exclusively by the caller until they call put_handle.
|
||
|
+/* Command queue. */
|
||
|
+static _Atomic uint64_t id; /* next command ID */
|
||
|
+
|
||
|
+/* Send command to the background thread and wait for completion.
|
||
|
+ * This is only called by one of the nbdkit threads.
|
||
|
*/
|
||
|
-struct curl_handle *
|
||
|
-get_handle (void)
|
||
|
+CURLcode
|
||
|
+send_command_and_wait (struct command *cmd)
|
||
|
{
|
||
|
- ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
|
||
|
- size_t i;
|
||
|
- struct curl_handle *ch;
|
||
|
+ cmd->id = id++;
|
||
|
|
||
|
- again:
|
||
|
- /* Look for a handle which is not in_use. */
|
||
|
- for (i = 0; i < curl_handles.len; ++i) {
|
||
|
- ch = curl_handles.ptr[i];
|
||
|
- if (!ch->in_use) {
|
||
|
- ch->in_use = true;
|
||
|
- in_use++;
|
||
|
- if (curl_debug_pool)
|
||
|
- nbdkit_debug ("get_handle: %zu", ch->i);
|
||
|
- return ch;
|
||
|
- }
|
||
|
- }
|
||
|
-
|
||
|
- /* If more connections are allowed, then allocate a new handle. */
|
||
|
- if (curl_handles.len < connections) {
|
||
|
- ch = allocate_handle ();
|
||
|
- if (ch == NULL)
|
||
|
- return NULL;
|
||
|
- if (curl_handle_list_append (&curl_handles, ch) == -1) {
|
||
|
- free_handle (ch);
|
||
|
- return NULL;
|
||
|
- }
|
||
|
- ch->i = curl_handles.len - 1;
|
||
|
- ch->in_use = true;
|
||
|
- in_use++;
|
||
|
- if (curl_debug_pool)
|
||
|
- nbdkit_debug ("get_handle: %zu", ch->i);
|
||
|
- return ch;
|
||
|
- }
|
||
|
-
|
||
|
- /* Otherwise we have run out of connections so we must wait until
|
||
|
- * another thread calls put_handle.
|
||
|
+ /* CURLcode is 0 (CURLE_OK) or > 0, so use -1 as a sentinel to
|
||
|
+ * indicate that the command has not yet been completed and status
|
||
|
+ * set.
|
||
|
*/
|
||
|
- assert (in_use == connections);
|
||
|
- waiting++;
|
||
|
- while (in_use == connections)
|
||
|
- pthread_cond_wait (&cond, &lock);
|
||
|
- waiting--;
|
||
|
+ cmd->status = -1;
|
||
|
|
||
|
- goto again;
|
||
|
+ /* This will be used to signal command completion back to us. */
|
||
|
+ pthread_mutex_init (&cmd->mutex, NULL);
|
||
|
+ pthread_cond_init (&cmd->cond, NULL);
|
||
|
+
|
||
|
+ /* Send the command to the background thread. */
|
||
|
+ if (write (self_pipe[1], &cmd, sizeof cmd) != sizeof cmd)
|
||
|
+ abort ();
|
||
|
+
|
||
|
+ /* Wait for the command to be completed by the background thread. */
|
||
|
+ {
|
||
|
+ ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
|
||
|
+ while (cmd->status == -1) /* for -1, see above */
|
||
|
+ pthread_cond_wait (&cmd->cond, &cmd->mutex);
|
||
|
+ }
|
||
|
+
|
||
|
+ pthread_mutex_destroy (&cmd->mutex);
|
||
|
+ pthread_cond_destroy (&cmd->cond);
|
||
|
+
|
||
|
+ /* Note the main thread must call nbdkit_error on error! */
|
||
|
+ return cmd->status;
|
||
|
}
|
||
|
|
||
|
-/* Return the handle to the pool. */
|
||
|
-void
|
||
|
-put_handle (struct curl_handle *ch)
|
||
|
+/* The background thread. */
|
||
|
+static struct command *process_multi_handle (void);
|
||
|
+static void check_for_finished_handles (void);
|
||
|
+static void retire_command (struct command *cmd, CURLcode code);
|
||
|
+static void do_easy_handle (struct command *cmd);
|
||
|
+
|
||
|
+static void *
|
||
|
+pool_worker (void *vp)
|
||
|
{
|
||
|
- ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
|
||
|
+ bool stop = false;
|
||
|
+
|
||
|
+ if (curl_debug_pool)
|
||
|
+ nbdkit_debug ("curl: background thread started");
|
||
|
+
|
||
|
+ while (!stop) {
|
||
|
+ struct command *cmd = NULL;
|
||
|
+
|
||
|
+ cmd = process_multi_handle ();
|
||
|
+ if (cmd == NULL)
|
||
|
+ continue; /* or die?? */
|
||
|
+
|
||
|
+ if (curl_debug_pool)
|
||
|
+ nbdkit_debug ("curl: dispatching %s command %" PRIu64,
|
||
|
+ command_type_to_string (cmd->type), cmd->id);
|
||
|
+
|
||
|
+ switch (cmd->type) {
|
||
|
+ case STOP:
|
||
|
+ stop = true;
|
||
|
+ retire_command (cmd, CURLE_OK);
|
||
|
+ break;
|
||
|
+
|
||
|
+ case EASY_HANDLE:
|
||
|
+ do_easy_handle (cmd);
|
||
|
+ break;
|
||
|
+ }
|
||
|
+ } /* while (!stop) */
|
||
|
|
||
|
if (curl_debug_pool)
|
||
|
- nbdkit_debug ("put_handle: %zu", ch->i);
|
||
|
+ nbdkit_debug ("curl: background thread stopped");
|
||
|
+
|
||
|
+ return NULL;
|
||
|
+}
|
||
|
+
|
||
|
+/* Process the multi handle, and look out for new commands. Returns
|
||
|
+ * when there is a new command.
|
||
|
+ */
|
||
|
+static struct command *
|
||
|
+process_multi_handle (void)
|
||
|
+{
|
||
|
+ struct curl_waitfd extra_fds[1] =
|
||
|
+ { { .fd = self_pipe[0], .events = CURL_WAIT_POLLIN } };
|
||
|
+ CURLMcode mc;
|
||
|
+ int numfds, running_handles;
|
||
|
+ struct command *cmd = NULL;
|
||
|
+#ifndef HAVE_CURL_MULTI_POLL
|
||
|
+ int repeats = 0;
|
||
|
+#endif
|
||
|
+
|
||
|
+ while (!cmd) {
|
||
|
+ /* Process the multi handle. */
|
||
|
+ mc = curl_multi_perform (multi, &running_handles);
|
||
|
+ if (mc != CURLM_OK) {
|
||
|
+ nbdkit_error ("curl_multi_perform: %s", curl_multi_strerror (mc));
|
||
|
+ return NULL;
|
||
|
+ }
|
||
|
+
|
||
|
+ check_for_finished_handles ();
|
||
|
+
|
||
|
+#ifdef HAVE_CURL_MULTI_POLL
|
||
|
+ mc = curl_multi_poll (multi, extra_fds, 1, 1000000, &numfds);
|
||
|
+ if (mc != CURLM_OK) {
|
||
|
+ nbdkit_error ("curl_multi_poll: %s", curl_multi_strerror (mc));
|
||
|
+ return NULL;
|
||
|
+ }
|
||
|
+#else
|
||
|
+ /* This is the older curl_multi_wait function. For unclear
|
||
|
+ * reasons this often gets "stuck" in the nbdkit_nanosleep case
|
||
|
+ * below, wasting large amounts of time. Luckily the newer curl
|
||
|
+ * no longer uses this function.
|
||
|
+ */
|
||
|
+ mc = curl_multi_wait (multi, extra_fds, 1, 1000000, &numfds);
|
||
|
+ if (mc != CURLM_OK) {
|
||
|
+ nbdkit_error ("curl_multi_wait: %s", curl_multi_strerror (mc));
|
||
|
+ return NULL;
|
||
|
+ }
|
||
|
+
|
||
|
+ if (numfds == 0) {
|
||
|
+ repeats++;
|
||
|
+ if (repeats > 1)
|
||
|
+ nbdkit_nanosleep (1, 0);
|
||
|
+ continue;
|
||
|
+ }
|
||
|
+ else
|
||
|
+ repeats = 0;
|
||
|
+#endif
|
||
|
+
|
||
|
+ if (curl_debug_pool)
|
||
|
+ nbdkit_debug (
|
||
|
+#ifdef HAVE_CURL_MULTI_POLL
|
||
|
+ "curl_multi_poll"
|
||
|
+#else
|
||
|
+ "curl_multi_wait"
|
||
|
+#endif
|
||
|
+ ": running_handles=%d numfds=%d",
|
||
|
+ running_handles, numfds);
|
||
|
+
|
||
|
+ if (extra_fds[0].revents == CURL_WAIT_POLLIN) {
|
||
|
+ /* There's a command waiting. */
|
||
|
+ if (read (self_pipe[0], &cmd, sizeof cmd) != sizeof cmd)
|
||
|
+ abort ();
|
||
|
+ }
|
||
|
+ }
|
||
|
+
|
||
|
+ return cmd;
|
||
|
+}
|
||
|
+
|
||
|
+/* This checks if any easy handles in the multi have
|
||
|
+ * finished and retires the associated commands.
|
||
|
+ */
|
||
|
+static void
|
||
|
+check_for_finished_handles (void)
|
||
|
+{
|
||
|
+ CURLMsg *msg;
|
||
|
+ int msgs_in_queue;
|
||
|
+
|
||
|
+ while ((msg = curl_multi_info_read (multi, &msgs_in_queue)) != NULL) {
|
||
|
+ size_t i;
|
||
|
+ struct curl_handle *ch = NULL;
|
||
|
+
|
||
|
+ if (msg->msg == CURLMSG_DONE) {
|
||
|
+ /* Find this curl_handle. */
|
||
|
+ for (i = 0; i < curl_handles.len; ++i) {
|
||
|
+ if (curl_handles.ptr[i]->c == msg->easy_handle) {
|
||
|
+ ch = curl_handles.ptr[i];
|
||
|
+ curl_handle_list_remove (&curl_handles, i);
|
||
|
+ break;
|
||
|
+ }
|
||
|
+ }
|
||
|
+ if (ch == NULL) abort ();
|
||
|
+ curl_multi_remove_handle (multi, ch->c);
|
||
|
+
|
||
|
+ retire_command (ch->cmd, msg->data.result);
|
||
|
+ }
|
||
|
+ }
|
||
|
+}
|
||
|
+
|
||
|
+/* Retire a command. status is a CURLcode. */
|
||
|
+static void
|
||
|
+retire_command (struct command *cmd, CURLcode status)
|
||
|
+{
|
||
|
+ if (curl_debug_pool)
|
||
|
+ nbdkit_debug ("curl: retiring %s command %" PRIu64,
|
||
|
+ command_type_to_string (cmd->type), cmd->id);
|
||
|
+
|
||
|
+ ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
|
||
|
+ cmd->status = status;
|
||
|
+ pthread_cond_signal (&cmd->cond);
|
||
|
+}
|
||
|
+
|
||
|
+static void
|
||
|
+do_easy_handle (struct command *cmd)
|
||
|
+{
|
||
|
+ CURLMcode mc;
|
||
|
+
|
||
|
+ cmd->ch->cmd = cmd;
|
||
|
+
|
||
|
+ /* Add the handle to the multi. */
|
||
|
+ mc = curl_multi_add_handle (multi, cmd->ch->c);
|
||
|
+ if (mc != CURLM_OK) {
|
||
|
+ nbdkit_error ("curl_multi_add_handle: %s", curl_multi_strerror (mc));
|
||
|
+ goto err;
|
||
|
+ }
|
||
|
|
||
|
- ch->in_use = false;
|
||
|
- in_use--;
|
||
|
+ if (curl_handle_list_append (&curl_handles, cmd->ch) == -1)
|
||
|
+ goto err;
|
||
|
+ return;
|
||
|
|
||
|
- /* Signal the next thread which is waiting. */
|
||
|
- if (waiting > 0)
|
||
|
- pthread_cond_signal (&cond);
|
||
|
+ err:
|
||
|
+ retire_command (cmd, CURLE_OUT_OF_MEMORY);
|
||
|
}
|
||
|
--
|
||
|
2.39.3
|
||
|
|