You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
159 lines
6.3 KiB
159 lines
6.3 KiB
2 months ago
|
From c8ed63653cfdf4a55ad4cf26cb11e8938b227f13 Mon Sep 17 00:00:00 2001
|
||
|
From: Julien Rische <jrische@redhat.com>
|
||
|
Date: Mon, 18 Nov 2024 09:38:13 +0100
|
||
|
Subject: [PATCH] Use exponential backoff for connection retries
|
||
|
|
||
|
Calls to socket.connect() are non-blocking, hence all subsequent calls
|
||
|
to socket.sendall() will fail if the target KDC service is temporarily
|
||
|
or indefinitely unreachable. Since the kdcproxy task uses busy-looping,
|
||
|
it results in the journal to be flooded with warning logs.
|
||
|
|
||
|
This commit introduces a per-socket reactivation delay which increases
|
||
|
exponentially as the number of reties is incremented, until timeout is
|
||
|
reached (i.e. 100ms, 200ms, 400ms, 800ms, 1.6s, 3.2s, ...).
|
||
|
|
||
|
Signed-off-by: Julien Rische <jrische@redhat.com>
|
||
|
(cherry picked from commit bac3c99c1b23487e38d965a79173ce9519e19c75)
|
||
|
---
|
||
|
kdcproxy/__init__.py | 63 +++++++++++++++++++++++++++++++++++++++++---
|
||
|
1 file changed, 60 insertions(+), 3 deletions(-)
|
||
|
|
||
|
diff --git a/kdcproxy/__init__.py b/kdcproxy/__init__.py
|
||
|
index 9bc7044..cfec0f3 100644
|
||
|
--- a/kdcproxy/__init__.py
|
||
|
+++ b/kdcproxy/__init__.py
|
||
|
@@ -60,6 +60,13 @@ class HTTPException(Exception):
|
||
|
return "%d %s" % (self.code, httplib.responses[self.code])
|
||
|
|
||
|
|
||
|
+class SocketException(Exception):
|
||
|
+
|
||
|
+ def __init__(self, message, sock):
|
||
|
+ super(Exception, self).__init__(message)
|
||
|
+ self.sockfno = sock.fileno()
|
||
|
+
|
||
|
+
|
||
|
class Application:
|
||
|
MAX_LENGTH = 128 * 1024
|
||
|
SOCKTYPES = {
|
||
|
@@ -67,10 +74,23 @@ class Application:
|
||
|
"udp": socket.SOCK_DGRAM,
|
||
|
}
|
||
|
|
||
|
+ def addr2socktypename(self, addr):
|
||
|
+ ret = None
|
||
|
+ for name in self.SOCKTYPES:
|
||
|
+ if self.SOCKTYPES[name] == addr[1]:
|
||
|
+ ret = name
|
||
|
+ break
|
||
|
+ return ret
|
||
|
+
|
||
|
def __init__(self):
|
||
|
self.__resolver = MetaResolver()
|
||
|
|
||
|
def __await_reply(self, pr, rsocks, wsocks, timeout):
|
||
|
+ starting_time = time.time()
|
||
|
+ send_error = None
|
||
|
+ recv_error = None
|
||
|
+ failing_sock = None
|
||
|
+ reactivations = {}
|
||
|
extra = 0
|
||
|
read_buffers = {}
|
||
|
while (timeout + extra) > time.time():
|
||
|
@@ -91,6 +111,12 @@ class Application:
|
||
|
pass
|
||
|
|
||
|
for sock in w:
|
||
|
+ # Fetch reactivation tuple:
|
||
|
+ # 1st element: reactivation index (-1 = first activation)
|
||
|
+ # 2nd element: planned reactivation time (0.0 = now)
|
||
|
+ (rn, rt) = reactivations.get(sock, (-1, 0.0))
|
||
|
+ if rt > time.time():
|
||
|
+ continue
|
||
|
try:
|
||
|
if self.sock_type(sock) == socket.SOCK_DGRAM:
|
||
|
# If we proxy over UDP, remove the 4-byte length
|
||
|
@@ -100,8 +126,13 @@ class Application:
|
||
|
sock.sendall(pr.request)
|
||
|
extra = 10 # New connections get 10 extra seconds
|
||
|
except Exception as e:
|
||
|
- logging.warning("Conection broken while writing (%s)", e)
|
||
|
+ send_error = e
|
||
|
+ failing_sock = sock
|
||
|
+ reactivations[sock] = (rn + 1,
|
||
|
+ time.time() + 2.0**(rn + 1) / 10)
|
||
|
continue
|
||
|
+ if sock in reactivations:
|
||
|
+ del reactivations[sock]
|
||
|
rsocks.append(sock)
|
||
|
wsocks.remove(sock)
|
||
|
|
||
|
@@ -109,7 +140,8 @@ class Application:
|
||
|
try:
|
||
|
reply = self.__handle_recv(sock, read_buffers)
|
||
|
except Exception as e:
|
||
|
- logging.warning("Connection broken while reading (%s)", e)
|
||
|
+ recv_error = e
|
||
|
+ failing_sock = sock
|
||
|
if self.sock_type(sock) == socket.SOCK_STREAM:
|
||
|
# Remove broken TCP socket from readers
|
||
|
rsocks.remove(sock)
|
||
|
@@ -117,6 +149,21 @@ class Application:
|
||
|
if reply is not None:
|
||
|
return reply
|
||
|
|
||
|
+ if reactivations:
|
||
|
+ raise SocketException("Timeout while sending packets after %.2fs "
|
||
|
+ "and %d tries: %s" % (
|
||
|
+ (timeout + extra) - starting_time,
|
||
|
+ sum(map(lambda r: r[0],
|
||
|
+ reactivations.values())),
|
||
|
+ send_error),
|
||
|
+ failing_sock)
|
||
|
+ elif recv_error is not None:
|
||
|
+ raise SocketException("Timeout while receiving packets after "
|
||
|
+ "%.2fs: %s" % (
|
||
|
+ (timeout + extra) - starting_time,
|
||
|
+ recv_error),
|
||
|
+ failing_sock)
|
||
|
+
|
||
|
return None
|
||
|
|
||
|
def __handle_recv(self, sock, read_buffers):
|
||
|
@@ -214,6 +261,7 @@ class Application:
|
||
|
reply = None
|
||
|
wsocks = []
|
||
|
rsocks = []
|
||
|
+ sockfno2addr = {}
|
||
|
for server in map(urlparse.urlparse, servers):
|
||
|
# Enforce valid, supported URIs
|
||
|
scheme = server.scheme.lower().split("+", 1)
|
||
|
@@ -260,6 +308,7 @@ class Application:
|
||
|
continue
|
||
|
except io.BlockingIOError:
|
||
|
pass
|
||
|
+ sockfno2addr[sock.fileno()] = addr
|
||
|
wsocks.append(sock)
|
||
|
|
||
|
# Resend packets to UDP servers
|
||
|
@@ -270,7 +319,15 @@ class Application:
|
||
|
|
||
|
# Call select()
|
||
|
timeout = time.time() + (15 if addr is None else 2)
|
||
|
- reply = self.__await_reply(pr, rsocks, wsocks, timeout)
|
||
|
+ try:
|
||
|
+ reply = self.__await_reply(pr, rsocks, wsocks, timeout)
|
||
|
+ except SocketException as e:
|
||
|
+ fail_addr = sockfno2addr[e.sockfno]
|
||
|
+ fail_socktype = self.addr2socktypename(fail_addr)
|
||
|
+ fail_ip = fail_addr[4][0]
|
||
|
+ fail_port = fail_addr[4][1]
|
||
|
+ logging.warning("Exchange with %s:[%s]:%d failed: %s",
|
||
|
+ fail_socktype, fail_ip, fail_port, e)
|
||
|
if reply is not None:
|
||
|
break
|
||
|
|
||
|
--
|
||
|
2.46.0
|
||
|
|