From 85fc0dcf395bdb42c7952ea50751a6429c87cf74 Mon Sep 17 00:00:00 2001 From: Sergey Cherevko Date: Fri, 15 Sep 2023 13:05:29 +0300 Subject: [PATCH] psycopg2 get result async --- psycopg/connection.h | 1 + psycopg/connection_int.c | 26 ++++--- psycopg/cursor_type.c | 3 +- psycopg/green.c | 6 +- psycopg/pqpath.c | 163 +++++++++++++++++++++------------------ psycopg/pqpath.h | 3 +- tests/test_async.py | 40 +++++++++- tests/test_green.py | 35 ++++++++- 8 files changed, 183 insertions(+), 94 deletions(-) diff --git a/psycopg/connection.h b/psycopg/connection.h index 6b1f244..7ccd56c 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -108,6 +108,7 @@ struct connectionObject { * for a green connection. If NULL, the connection is idle. */ PyObject *async_cursor; int async_status; /* asynchronous execution status */ + PGresult *pgres; /* notice processing */ PyObject *notice_list; diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index a60c4a9..10447ef 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -891,11 +891,14 @@ _conn_poll_advance_write(connectionObject *self, int flush) /* Advance to the next state after a call to a pq_is_busy* function */ static int -_conn_poll_advance_read(connectionObject *self, int busy) +_conn_poll_advance_read(connectionObject *self) { - int res; + int res, busy; Dprintf("conn_poll: poll reading"); + + busy = pq_get_result_async(self); + switch (busy) { case 0: /* result is ready */ res = PSYCO_POLL_OK; @@ -909,7 +912,7 @@ _conn_poll_advance_read(connectionObject *self, int busy) res = PSYCO_POLL_ERROR; break; default: - Dprintf("conn_poll: unexpected result from pq_is_busy: %d", busy); + Dprintf("conn_poll: unexpected result from pq_get_result_async: %d", busy); res = PSYCO_POLL_ERROR; break; } @@ -935,21 +938,21 @@ _conn_poll_query(connectionObject *self) case ASYNC_READ: Dprintf("conn_poll: async_status = ASYNC_READ"); if (self->async) { - res = _conn_poll_advance_read(self, pq_is_busy(self)); + res = _conn_poll_advance_read(self); } else { /* we are a green connection being polled as result of a query. this means that our caller has the lock and we are being called from the callback. If we tried to acquire the lock now it would be a deadlock. */ - res = _conn_poll_advance_read(self, pq_is_busy_locked(self)); + res = _conn_poll_advance_read(self); } break; case ASYNC_DONE: Dprintf("conn_poll: async_status = ASYNC_DONE"); /* We haven't asked anything: just check for notifications. */ - res = _conn_poll_advance_read(self, pq_is_busy(self)); + res = _conn_poll_advance_read(self); break; default: @@ -972,7 +975,6 @@ static int _conn_poll_setup_async(connectionObject *self) { int res = PSYCO_POLL_ERROR; - PGresult *pgres; switch (self->status) { case CONN_STATUS_CONNECTING: @@ -1023,12 +1025,11 @@ _conn_poll_setup_async(connectionObject *self) res = _conn_poll_query(self); if (res == PSYCO_POLL_OK) { res = PSYCO_POLL_ERROR; - pgres = pq_get_last_result(self); - if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) { + if (self->pgres == NULL || PQresultStatus(self->pgres) != PGRES_COMMAND_OK ) { PyErr_SetString(OperationalError, "can't set datestyle to ISO"); break; } - CLEARPGRES(pgres); + CLEARPGRES(self->pgres); Dprintf("conn_poll: status -> CONN_STATUS_READY"); self->status = CONN_STATUS_READY; @@ -1089,8 +1090,9 @@ conn_poll(connectionObject *self) } curs = (cursorObject *)py_curs; - CLEARPGRES(curs->pgres); - curs->pgres = pq_get_last_result(self); + PQclear(curs->pgres); + curs->pgres = self->pgres; + self->pgres = NULL; /* fetch the tuples (if there are any) and build the result. We * don't care if pq_fetch return 0 or 1, but if there was an error, diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index d73bc3a..1942052 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -49,7 +49,6 @@ static PyObject * psyco_curs_close(cursorObject *self) { - EXC_IF_ASYNC_IN_PROGRESS(self, close); if (self->closed) { goto exit; @@ -59,6 +58,8 @@ psyco_curs_close(cursorObject *self) char buffer[128]; PGTransactionStatusType status; + EXC_IF_ASYNC_IN_PROGRESS(self, close_named); + if (!self->query) { Dprintf("skipping named cursor close because unused"); goto close; diff --git a/psycopg/green.c b/psycopg/green.c index ab37b8b..be831a9 100644 --- a/psycopg/green.c +++ b/psycopg/green.c @@ -177,10 +177,12 @@ psyco_exec_green(connectionObject *conn, const char *command) goto end; } - /* Now we can read the data without fear of blocking. */ - result = pq_get_last_result(conn); + /* the result is now in the connection: take its ownership */ + result = conn->pgres; + conn->pgres = NULL; end: + CLEARPGRES(conn->pgres); conn->async_status = ASYNC_DONE; Py_CLEAR(conn->async_cursor); return result; diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 204a6b0..c026682 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -842,80 +842,6 @@ exit: return rv; } - -/* pq_is_busy - consume input and return connection status - - a status of 1 means that a call to pq_fetch will block, while a status of 0 - means that there is data available to be collected. -1 means an error, the - exception will be set accordingly. - - this function locks the connection object - this function call Py_*_ALLOW_THREADS macros */ - -int -pq_is_busy(connectionObject *conn) -{ - int res; - Dprintf("pq_is_busy: consuming input"); - - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&(conn->lock)); - - if (PQconsumeInput(conn->pgconn) == 0) { - Dprintf("pq_is_busy: PQconsumeInput() failed"); - pthread_mutex_unlock(&(conn->lock)); - Py_BLOCK_THREADS; - - /* if the libpq says pgconn is lost, close the py conn */ - if (CONNECTION_BAD == PQstatus(conn->pgconn)) { - conn->closed = 2; - } - - PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn)); - return -1; - } - - res = PQisBusy(conn->pgconn); - - Py_BLOCK_THREADS; - conn_notifies_process(conn); - conn_notice_process(conn); - Py_UNBLOCK_THREADS; - - pthread_mutex_unlock(&(conn->lock)); - Py_END_ALLOW_THREADS; - - return res; -} - -/* pq_is_busy_locked - equivalent to pq_is_busy but we already have the lock - * - * The function should be called with the lock and holding the GIL. - */ - -int -pq_is_busy_locked(connectionObject *conn) -{ - Dprintf("pq_is_busy_locked: consuming input"); - - if (PQconsumeInput(conn->pgconn) == 0) { - Dprintf("pq_is_busy_locked: PQconsumeInput() failed"); - - /* if the libpq says pgconn is lost, close the py conn */ - if (CONNECTION_BAD == PQstatus(conn->pgconn)) { - conn->closed = 2; - } - - PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn)); - return -1; - } - - /* notices and notifies will be processed at the end of the loop we are in - * (async reading) by pq_fetch. */ - - return PQisBusy(conn->pgconn); -} - /* pq_flush - flush output and return connection status a status of 1 means that a some data is still pending to be flushed, while a @@ -1094,6 +1020,8 @@ pq_send_query(connectionObject *conn, const char *query) Dprintf("pq_send_query: sending ASYNC query:"); Dprintf(" %-.200s", query); + CLEARPGRES(conn->pgres); + if (0 == (rv = PQsendQuery(conn->pgconn, query))) { Dprintf("pq_send_query: error: %s", PQerrorMessage(conn->pgconn)); } @@ -1998,3 +1926,90 @@ pq_fetch(cursorObject *curs, int no_result) return ex; } +/* pq_get_result_async - read an available result without blocking. + * + * Return 0 if the result is ready, 1 if it will block, -1 on error. + * The last result will be returned in pgres. + * + * The function should be called with the lock and holding the GIL. + */ + +RAISES_NEG int +pq_get_result_async(connectionObject *conn) +{ + int rv = -1; + + Dprintf("pq_get_result_async: calling PQconsumeInput()"); + if (PQconsumeInput(conn->pgconn) == 0) { + Dprintf("pq_get_result_async: PQconsumeInput() failed"); + + /* if the libpq says pgconn is lost, close the py conn */ + if (CONNECTION_BAD == PQstatus(conn->pgconn)) { + conn->closed = 2; + } + + PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn)); + goto exit; + } + + conn_notifies_process(conn); + conn_notice_process(conn); + + for (;;) { + int busy; + PGresult *res; + ExecStatusType status; + + Dprintf("pq_get_result_async: calling PQisBusy()"); + busy = PQisBusy(conn->pgconn); + + if (busy) { + /* try later */ + Dprintf("pq_get_result_async: PQisBusy() = 1"); + rv = 1; + goto exit; + } + + if (!(res = PQgetResult(conn->pgconn))) { + Dprintf("pq_get_result_async: got no result"); + /* the result is ready: it was the previously read one */ + rv = 0; + goto exit; + } + + status = PQresultStatus(res); + Dprintf("pq_get_result_async: got result %s", PQresStatus(status)); + + /* Store the result outside because we want to return the last non-null + * one and we may have to do it across poll calls. However if there is + * an error in the stream of results we want to handle the *first* + * error. So don't clobber it with the following ones. */ + if (conn->pgres && PQresultStatus(conn->pgres) == PGRES_FATAL_ERROR) { + Dprintf("previous pgres is error: discarding"); + PQclear(res); + } + else { + PQclear(conn->pgres); + conn->pgres = res; + } + + switch (status) { + case PGRES_COPY_OUT: + case PGRES_COPY_IN: + case PGRES_COPY_BOTH: + /* After entering copy mode, libpq will make a phony + * PGresult for us every time we query for it, so we need to + * break out of this endless loop. */ + rv = 0; + goto exit; + + default: + /* keep on reading to check if there are other results or + * we have finished. */ + continue; + } + } + +exit: + return rv; +} diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h index 5cf2230..4299643 100644 --- a/psycopg/pqpath.h +++ b/psycopg/pqpath.h @@ -59,9 +59,8 @@ HIDDEN int pq_tpc_command_locked(connectionObject *conn, const char *cmd, const char *tid, PGresult **pgres, char **error, PyThreadState **tstate); -HIDDEN int pq_is_busy(connectionObject *conn); -HIDDEN int pq_is_busy_locked(connectionObject *conn); HIDDEN int pq_flush(connectionObject *conn); +RAISES_NEG HIDDEN int pq_get_result_async(connectionObject *conn); HIDDEN void pq_clear_async(connectionObject *conn); RAISES_NEG HIDDEN int pq_set_non_blocking(connectionObject *conn, int arg); diff --git a/tests/test_async.py b/tests/test_async.py index 4eb5e6a..ff3b1a7 100755 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -99,7 +99,6 @@ class AsyncTests(ConnectingTestCase): self.assertEquals(cur.fetchone()[0], "a") @slow - @skip_before_postgres(8, 2) def test_async_callproc(self): cur = self.conn.cursor() cur.callproc("pg_sleep", (0.1, )) @@ -406,6 +405,15 @@ class AsyncTests(ConnectingTestCase): cur.execute("delete from table1") self.wait(cur) + def test_stop_on_first_error(self): + cur = self.conn.cursor() + cur.execute("select 1; select x; select 1/0; select 2") + self.assertRaises(psycopg2.ProgrammingError, self.wait, cur) + + cur.execute("select 1") + self.wait(cur) + self.assertEqual(cur.fetchone(), (1,)) + def test_error_two_cursors(self): cur = self.conn.cursor() cur2 = self.conn.cursor() @@ -450,6 +458,36 @@ class AsyncTests(ConnectingTestCase): else: self.fail("no exception raised") + @slow + def test_non_block_after_notification(self): + from select import select + + cur = self.conn.cursor() + cur.execute(""" + select 1; + do $$ + begin + raise notice 'hello'; + end + $$ language plpgsql; + select pg_sleep(1); + """) + + polls = 0 + while True: + state = self.conn.poll() + if state == psycopg2.extensions.POLL_OK: + break + elif state == psycopg2.extensions.POLL_READ: + select([self.conn], [], [], 0.1) + elif state == psycopg2.extensions.POLL_WRITE: + select([], [self.conn], [], 0.1) + else: + raise Exception("Unexpected result from poll: %r", state) + polls += 1 + + self.assert_(polls >= 8, polls) + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) diff --git a/tests/test_green.py b/tests/test_green.py index 8c1c20c..c8a24d6 100755 --- a/tests/test_green.py +++ b/tests/test_green.py @@ -22,12 +22,14 @@ # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. +import select import unittest import psycopg2 import psycopg2.extensions import psycopg2.extras from testutils import ConnectingTestCase, slow +from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE class ConnectionStub(object): @@ -55,10 +57,10 @@ class GreenTestCase(ConnectingTestCase): ConnectingTestCase.tearDown(self) psycopg2.extensions.set_wait_callback(self._cb) - def set_stub_wait_callback(self, conn): + def set_stub_wait_callback(self, conn, cb=None): stub = ConnectionStub(conn) psycopg2.extensions.set_wait_callback( - lambda conn: psycopg2.extras.wait_select(stub)) + lambda conn: (cb or psycopg2.extras.wait_select)(stub)) return stub @slow @@ -111,6 +113,35 @@ class GreenTestCase(ConnectingTestCase): curs.execute("select 1") self.assertEqual(curs.fetchone()[0], 1) + @slow + def test_non_block_after_notification(self): + def wait(conn): + while 1: + state = conn.poll() + if state == POLL_OK: + break + elif state == POLL_READ: + select.select([conn.fileno()], [], [], 0.1) + elif state == POLL_WRITE: + select.select([], [conn.fileno()], [], 0.1) + else: + raise conn.OperationalError("bad state from poll: %s" % state) + + stub = self.set_stub_wait_callback(self.conn, wait) + cur = self.conn.cursor() + cur.execute(""" + select 1; + do $$ + begin + raise notice 'hello'; + end + $$ language plpgsql; + select pg_sleep(1); + """) + + polls = stub.polls.count(POLL_READ) + self.assert_(polls > 8, polls) + class CallbackErrorTestCase(ConnectingTestCase): def setUp(self): -- 2.41.0