diff --git a/Lib/test/test_docxmlrpc.py b/Lib/test/test_docxmlrpc.py
index 99469a58496..3c05412ef57 100644
--- a/Lib/test/test_docxmlrpc.py
+++ b/Lib/test/test_docxmlrpc.py
@@ -100,8 +100,6 @@ def test_valid_get_response(self):
# Server raises an exception if we don't start to read the data
response.read()
- # TODO: RUSTPYTHON
- @unittest.expectedFailure
def test_get_css(self):
self.client.request("GET", "/pydoc.css")
response = self.client.getresponse()
@@ -121,6 +119,7 @@ def test_invalid_get_response(self):
response.read()
+ @unittest.skip('TODO: RUSTPYTHON; http.client.RemoteDisconnected: Remote end closed connection without response')
def test_lambda(self):
"""Test that lambda functionality stays the same. The output produced
currently is, I suspect invalid because of the unencoded brackets in the
@@ -163,6 +162,7 @@ def test_autolinking(self):
@make_request_and_skipIf(sys.flags.optimize >= 2,
"Docstrings are omitted with -O2 and above")
+ @unittest.skip('TODO: RUSTPYTHON; http.client.RemoteDisconnected: Remote end closed connection without response')
def test_system_methods(self):
"""Test the presence of three consecutive system.* methods.
@@ -190,6 +190,7 @@ def test_system_methods(self):
b'
\nThis server does NOT support system'
b'.methodSignature.'), response)
+ @unittest.skip('TODO: RUSTPYTHON; http.client.RemoteDisconnected: Remote end closed connection without response')
def test_autolink_dotted_methods(self):
"""Test that selfdot values are made strong automatically in the
documentation."""
@@ -199,6 +200,7 @@ def test_autolink_dotted_methods(self):
self.assertIn(b"""Try self.add, too.""",
response.read())
+ @unittest.skip('TODO: RUSTPYTHON; http.client.RemoteDisconnected: Remote end closed connection without response')
def test_annotations(self):
""" Test that annotations works as expected """
self.client.request("GET", "/")
@@ -212,6 +214,7 @@ def test_annotations(self):
b'method_annotation(x: bytes)'),
response.read())
+ @unittest.skip('TODO: RUSTPYTHON; TypeError: HTMLDoc.heading() missing 2 required positional arguments: "fgcol" and "bgcol"')
def test_server_title_escape(self):
# bpo-38243: Ensure that the server title and documentation
# are escaped for HTML.
diff --git a/Lib/test/test_wsgiref.py b/Lib/test/test_wsgiref.py
index b89e181f9f8..1a3b4d4b721 100644
--- a/Lib/test/test_wsgiref.py
+++ b/Lib/test/test_wsgiref.py
@@ -1,6 +1,6 @@
from unittest import mock
from test import support
-from test.support import warnings_helper
+from test.support import socket_helper
from test.test_httpservers import NoLogRequestHandler
from unittest import TestCase
from wsgiref.util import setup_testing_defaults
@@ -80,41 +80,26 @@ def run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"):
return out.getvalue(), err.getvalue()
-def compare_generic_iter(make_it,match):
- """Utility to compare a generic 2.1/2.2+ iterator with an iterable
- If running under Python 2.2+, this tests the iterator using iter()/next(),
- as well as __getitem__. 'make_it' must be a function returning a fresh
+def compare_generic_iter(make_it, match):
+ """Utility to compare a generic iterator with an iterable
+
+ This tests the iterator using iter()/next().
+ 'make_it' must be a function returning a fresh
iterator to be tested (since this may test the iterator twice)."""
it = make_it()
- n = 0
+ if not iter(it) is it:
+ raise AssertionError
for item in match:
- if not it[n]==item: raise AssertionError
- n+=1
- try:
- it[n]
- except IndexError:
- pass
- else:
- raise AssertionError("Too many items from __getitem__",it)
-
+ if not next(it) == item:
+ raise AssertionError
try:
- iter, StopIteration
- except NameError:
+ next(it)
+ except StopIteration:
pass
else:
- # Only test iter mode under 2.2+
- it = make_it()
- if not iter(it) is it: raise AssertionError
- for item in match:
- if not next(it) == item: raise AssertionError
- try:
- next(it)
- except StopIteration:
- pass
- else:
- raise AssertionError("Too many items from .__next__()", it)
+ raise AssertionError("Too many items from .__next__()", it)
class IntegrationTests(TestCase):
@@ -149,10 +134,11 @@ def test_environ(self):
b"Python test,Python test 2;query=test;/path/"
)
+ @unittest.expectedFailure # TODO: RUSTPYTHON; http library needs to be updated
def test_request_length(self):
out, err = run_amock(data=b"GET " + (b"x" * 65537) + b" HTTP/1.0\n\n")
self.assertEqual(out.splitlines()[0],
- b"HTTP/1.0 414 Request-URI Too Long")
+ b"HTTP/1.0 414 URI Too Long")
def test_validated_hello(self):
out, err = run_amock(validator(hello_app))
@@ -264,7 +250,7 @@ def app(environ, start_response):
class WsgiHandler(NoLogRequestHandler, WSGIRequestHandler):
pass
- server = make_server(support.HOST, 0, app, handler_class=WsgiHandler)
+ server = make_server(socket_helper.HOST, 0, app, handler_class=WsgiHandler)
self.addCleanup(server.server_close)
interrupted = threading.Event()
@@ -339,7 +325,6 @@ def checkReqURI(self,uri,query=1,**kw):
util.setup_testing_defaults(kw)
self.assertEqual(util.request_uri(kw,query),uri)
- @warnings_helper.ignore_warnings(category=DeprecationWarning)
def checkFW(self,text,size,match):
def make_it(text=text,size=size):
@@ -358,15 +343,6 @@ def make_it(text=text,size=size):
it.close()
self.assertTrue(it.filelike.closed)
- # TODO: RUSTPYTHON
- @unittest.expectedFailure
- def test_filewrapper_getitem_deprecation(self):
- wrapper = util.FileWrapper(StringIO('foobar'), 3)
- with self.assertWarnsRegex(DeprecationWarning,
- r'Use iterator protocol instead'):
- # This should have returned 'bar'.
- self.assertEqual(wrapper[1], 'foo')
-
def testSimpleShifts(self):
self.checkShift('','/', '', '/', '')
self.checkShift('','/x', 'x', '/x', '')
@@ -473,6 +449,10 @@ def testHopByHop(self):
for alt in hop, hop.title(), hop.upper(), hop.lower():
self.assertFalse(util.is_hop_by_hop(alt))
+ @unittest.expectedFailure # TODO: RUSTPYTHON
+ def test_filewrapper_getitem_deprecation(self):
+ return super().test_filewrapper_getitem_deprecation()
+
class HeaderTests(TestCase):
def testMappingInterface(self):
@@ -581,7 +561,7 @@ def testEnviron(self):
# Test handler.environ as a dict
expected = {}
setup_testing_defaults(expected)
- # Handler inherits os_environ variables which are not overriden
+ # Handler inherits os_environ variables which are not overridden
# by SimpleHandler.add_cgi_vars() (SimpleHandler.base_env)
for key, value in os_environ.items():
if key not in expected:
@@ -821,8 +801,6 @@ def flush(self):
b"Hello, world!",
written)
- # TODO: RUSTPYTHON
- @unittest.expectedFailure
def testClientConnectionTerminations(self):
environ = {"SERVER_PROTOCOL": "HTTP/1.0"}
for exception in (
@@ -841,8 +819,6 @@ def write(self, b):
self.assertFalse(stderr.getvalue())
- # TODO: RUSTPYTHON
- @unittest.expectedFailure
def testDontResetInternalStateOnException(self):
class CustomException(ValueError):
pass
diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py
index cf3f535b190..6b2e6c44ca4 100644
--- a/Lib/test/test_xmlrpc.py
+++ b/Lib/test/test_xmlrpc.py
@@ -1042,55 +1042,46 @@ def test_path2(self):
self.assertEqual(p.add(6,8), 6+8)
self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
- @unittest.expectedFailure # TODO: RUSTPYTHON
@support.requires_resource('walltime')
def test_path3(self):
p = xmlrpclib.ServerProxy(URL+"/is/broken")
self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
- @unittest.expectedFailure # TODO: RUSTPYTHON
@support.requires_resource('walltime')
def test_invalid_path(self):
p = xmlrpclib.ServerProxy(URL+"/invalid")
self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
- @unittest.expectedFailure # TODO: RUSTPYTHON
@support.requires_resource('walltime')
def test_path_query_fragment(self):
p = xmlrpclib.ServerProxy(URL+"/foo?k=v#frag")
self.assertEqual(p.test(), "/foo?k=v#frag")
- @unittest.expectedFailure # TODO: RUSTPYTHON
@support.requires_resource('walltime')
def test_path_fragment(self):
p = xmlrpclib.ServerProxy(URL+"/foo#frag")
self.assertEqual(p.test(), "/foo#frag")
- @unittest.expectedFailure # TODO: RUSTPYTHON
@support.requires_resource('walltime')
def test_path_query(self):
p = xmlrpclib.ServerProxy(URL+"/foo?k=v")
self.assertEqual(p.test(), "/foo?k=v")
- @unittest.expectedFailure # TODO: RUSTPYTHON
@support.requires_resource('walltime')
def test_empty_path(self):
p = xmlrpclib.ServerProxy(URL)
self.assertEqual(p.test(), "/RPC2")
- @unittest.expectedFailure # TODO: RUSTPYTHON
@support.requires_resource('walltime')
def test_root_path(self):
p = xmlrpclib.ServerProxy(URL + "/")
self.assertEqual(p.test(), "/")
- @unittest.expectedFailure # TODO: RUSTPYTHON
@support.requires_resource('walltime')
def test_empty_path_query(self):
p = xmlrpclib.ServerProxy(URL + "?k=v")
self.assertEqual(p.test(), "?k=v")
- @unittest.expectedFailure # TODO: RUSTPYTHON
@support.requires_resource('walltime')
def test_empty_path_fragment(self):
p = xmlrpclib.ServerProxy(URL + "#frag")
@@ -1142,7 +1133,6 @@ def test_two(self):
#test special attribute access on the serverproxy, through the __call__
#function.
-@unittest.skip("TODO: RUSTPYTHON, appears to hang")
class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):
#ask for two keepalive requests to be handled.
request_count=2
diff --git a/Lib/wsgiref/__init__.py b/Lib/wsgiref/__init__.py
index 1efbba01a30..59ee48fddec 100644
--- a/Lib/wsgiref/__init__.py
+++ b/Lib/wsgiref/__init__.py
@@ -13,6 +13,8 @@
* validate -- validation wrapper that sits between an app and a server
to detect errors in either
+* types -- collection of WSGI-related types for static type checking
+
To-Do:
* cgi_gateway -- Run WSGI apps under CGI (pending a deployment standard)
diff --git a/Lib/wsgiref/handlers.py b/Lib/wsgiref/handlers.py
index f4300b831a4..cafe872c7aa 100644
--- a/Lib/wsgiref/handlers.py
+++ b/Lib/wsgiref/handlers.py
@@ -136,6 +136,10 @@ def run(self, application):
self.setup_environ()
self.result = application(self.environ, self.start_response)
self.finish_response()
+ except (ConnectionAbortedError, BrokenPipeError, ConnectionResetError):
+ # We expect the client to close the connection abruptly from time
+ # to time.
+ return
except:
try:
self.handle_error()
@@ -179,7 +183,16 @@ def finish_response(self):
for data in self.result:
self.write(data)
self.finish_content()
- finally:
+ except:
+ # Call close() on the iterable returned by the WSGI application
+ # in case of an exception.
+ if hasattr(self.result, 'close'):
+ self.result.close()
+ raise
+ else:
+ # We only call close() when no exception is raised, because it
+ # will set status, result, headers, and environ fields to None.
+ # See bpo-29183 for more details.
self.close()
@@ -215,8 +228,7 @@ def start_response(self, status, headers,exc_info=None):
if exc_info:
try:
if self.headers_sent:
- # Re-raise original exception if headers sent
- raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
+ raise
finally:
exc_info = None # avoid dangling circular ref
elif self.headers is not None:
@@ -225,18 +237,25 @@ def start_response(self, status, headers,exc_info=None):
self.status = status
self.headers = self.headers_class(headers)
status = self._convert_string_type(status, "Status")
- assert len(status)>=4,"Status must be at least 4 characters"
- assert status[:3].isdigit(), "Status message must begin w/3-digit code"
- assert status[3]==" ", "Status message must have a space after code"
+ self._validate_status(status)
if __debug__:
for name, val in headers:
name = self._convert_string_type(name, "Header name")
val = self._convert_string_type(val, "Header value")
- assert not is_hop_by_hop(name),"Hop-by-hop headers not allowed"
+ assert not is_hop_by_hop(name),\
+ f"Hop-by-hop header, '{name}: {val}', not allowed"
return self.write
+ def _validate_status(self, status):
+ if len(status) < 4:
+ raise AssertionError("Status must be at least 4 characters")
+ if not status[:3].isdigit():
+ raise AssertionError("Status message must begin w/3-digit code")
+ if status[3] != " ":
+ raise AssertionError("Status message must have a space after code")
+
def _convert_string_type(self, value, title):
"""Convert/check value type."""
if type(value) is str:
@@ -456,10 +475,7 @@ def _write(self,data):
from warnings import warn
warn("SimpleHandler.stdout.write() should not do partial writes",
DeprecationWarning)
- while True:
- data = data[result:]
- if not data:
- break
+ while data := data[result:]:
result = self.stdout.write(data)
def _flush(self):
diff --git a/Lib/wsgiref/simple_server.py b/Lib/wsgiref/simple_server.py
index f71563a5ae0..a0f2397fcf0 100644
--- a/Lib/wsgiref/simple_server.py
+++ b/Lib/wsgiref/simple_server.py
@@ -84,10 +84,6 @@ def get_environ(self):
env['PATH_INFO'] = urllib.parse.unquote(path, 'iso-8859-1')
env['QUERY_STRING'] = query
-
- host = self.address_string()
- if host != self.client_address[0]:
- env['REMOTE_HOST'] = host
env['REMOTE_ADDR'] = self.client_address[0]
if self.headers.get('content-type') is None:
@@ -127,7 +123,8 @@ def handle(self):
return
handler = ServerHandler(
- self.rfile, self.wfile, self.get_stderr(), self.get_environ()
+ self.rfile, self.wfile, self.get_stderr(), self.get_environ(),
+ multithread=False,
)
handler.request_handler = self # backpointer for logging
handler.run(self.server.get_app())
diff --git a/Lib/wsgiref/types.py b/Lib/wsgiref/types.py
new file mode 100644
index 00000000000..ef0aead5b28
--- /dev/null
+++ b/Lib/wsgiref/types.py
@@ -0,0 +1,54 @@
+"""WSGI-related types for static type checking"""
+
+from collections.abc import Callable, Iterable, Iterator
+from types import TracebackType
+from typing import Any, Protocol, TypeAlias
+
+__all__ = [
+ "StartResponse",
+ "WSGIEnvironment",
+ "WSGIApplication",
+ "InputStream",
+ "ErrorStream",
+ "FileWrapper",
+]
+
+_ExcInfo: TypeAlias = tuple[type[BaseException], BaseException, TracebackType]
+_OptExcInfo: TypeAlias = _ExcInfo | tuple[None, None, None]
+
+class StartResponse(Protocol):
+ """start_response() callable as defined in PEP 3333"""
+ def __call__(
+ self,
+ status: str,
+ headers: list[tuple[str, str]],
+ exc_info: _OptExcInfo | None = ...,
+ /,
+ ) -> Callable[[bytes], object]: ...
+
+WSGIEnvironment: TypeAlias = dict[str, Any]
+WSGIApplication: TypeAlias = Callable[[WSGIEnvironment, StartResponse],
+ Iterable[bytes]]
+
+class InputStream(Protocol):
+ """WSGI input stream as defined in PEP 3333"""
+ def read(self, size: int = ..., /) -> bytes: ...
+ def readline(self, size: int = ..., /) -> bytes: ...
+ def readlines(self, hint: int = ..., /) -> list[bytes]: ...
+ def __iter__(self) -> Iterator[bytes]: ...
+
+class ErrorStream(Protocol):
+ """WSGI error stream as defined in PEP 3333"""
+ def flush(self) -> object: ...
+ def write(self, s: str, /) -> object: ...
+ def writelines(self, seq: list[str], /) -> object: ...
+
+class _Readable(Protocol):
+ def read(self, size: int = ..., /) -> bytes: ...
+ # Optional: def close(self) -> object: ...
+
+class FileWrapper(Protocol):
+ """WSGI file wrapper as defined in PEP 3333"""
+ def __call__(
+ self, file: _Readable, block_size: int = ..., /,
+ ) -> Iterable[bytes]: ...
diff --git a/Lib/wsgiref/util.py b/Lib/wsgiref/util.py
index 516fe898d01..63b92331737 100644
--- a/Lib/wsgiref/util.py
+++ b/Lib/wsgiref/util.py
@@ -4,7 +4,7 @@
__all__ = [
'FileWrapper', 'guess_scheme', 'application_uri', 'request_uri',
- 'shift_path_info', 'setup_testing_defaults',
+ 'shift_path_info', 'setup_testing_defaults', 'is_hop_by_hop',
]
@@ -17,12 +17,6 @@ def __init__(self, filelike, blksize=8192):
if hasattr(filelike,'close'):
self.close = filelike.close
- def __getitem__(self,key):
- data = self.filelike.read(self.blksize)
- if data:
- return data
- raise IndexError
-
def __iter__(self):
return self
@@ -155,9 +149,9 @@ def setup_testing_defaults(environ):
_hoppish = {
- 'connection':1, 'keep-alive':1, 'proxy-authenticate':1,
- 'proxy-authorization':1, 'te':1, 'trailers':1, 'transfer-encoding':1,
- 'upgrade':1
+ 'connection', 'keep-alive', 'proxy-authenticate',
+ 'proxy-authorization', 'te', 'trailers', 'transfer-encoding',
+ 'upgrade'
}.__contains__
def is_hop_by_hop(header_name):
diff --git a/Lib/wsgiref/validate.py b/Lib/wsgiref/validate.py
index 6107dcd7a4d..1a1853cd63a 100644
--- a/Lib/wsgiref/validate.py
+++ b/Lib/wsgiref/validate.py
@@ -1,6 +1,6 @@
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
-# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
-# Also licenced under the Apache License, 2.0: http://opensource.org/licenses/apache2.0.php
+# Licensed under the MIT license: https://opensource.org/licenses/mit-license.php
+# Also licenced under the Apache License, 2.0: https://opensource.org/licenses/apache2.0.php
# Licensed to PSF under a Contributor Agreement
"""
Middleware to check for obedience to the WSGI specification.
@@ -77,7 +77,7 @@
* That wsgi.input is used properly:
- - .read() is called with zero or one argument
+ - .read() is called with exactly one argument
- That it returns a string
@@ -137,7 +137,7 @@ def validator(application):
"""
When applied between a WSGI server and a WSGI application, this
- middleware will check for WSGI compliancy on a number of levels.
+ middleware will check for WSGI compliance on a number of levels.
This middleware does not modify the request or response in any
way, but will raise an AssertionError if anything seems off
(except for a failure to close the application iterator, which
@@ -214,10 +214,7 @@ def readlines(self, *args):
return lines
def __iter__(self):
- while 1:
- line = self.readline()
- if not line:
- return
+ while line := self.readline():
yield line
def close(self):
@@ -390,7 +387,6 @@ def check_headers(headers):
assert_(type(headers) is list,
"Headers (%r) must be of type list: %r"
% (headers, type(headers)))
- header_names = {}
for item in headers:
assert_(type(item) is tuple,
"Individual headers (%r) must be of type tuple: %r"
@@ -403,7 +399,6 @@ def check_headers(headers):
"The Status header cannot be used; it conflicts with CGI "
"script, and HTTP status is not given through headers "
"(value: %r)." % value)
- header_names[name.lower()] = None
assert_('\n' not in name and ':' not in name,
"Header names may not contain ':' or '\\n': %r" % name)
assert_(header_re.search(name), "Bad header name: %r" % name)
diff --git a/Lib/xmlrpc/client.py b/Lib/xmlrpc/client.py
index a614cef6ab2..f441376d09c 100644
--- a/Lib/xmlrpc/client.py
+++ b/Lib/xmlrpc/client.py
@@ -245,41 +245,15 @@ def __repr__(self):
##
# Backwards compatibility
-
boolean = Boolean = bool
-##
-# Wrapper for XML-RPC DateTime values. This converts a time value to
-# the format used by XML-RPC.
-#
-# The value can be given as a datetime object, as a string in the -# format "yyyymmddThh:mm:ss", as a 9-item time tuple (as returned by -# time.localtime()), or an integer value (as returned by time.time()). -# The wrapper uses time.localtime() to convert an integer to a time -# tuple. -# -# @param value The time, given as a datetime object, an ISO 8601 string, -# a time tuple, or an integer time value. - -# Issue #13305: different format codes across platforms -_day0 = datetime(1, 1, 1) -def _try(fmt): - try: - return _day0.strftime(fmt) == '0001' - except ValueError: - return False -if _try('%Y'): # Mac OS X - def _iso8601_format(value): - return value.strftime("%Y%m%dT%H:%M:%S") -elif _try('%4Y'): # Linux - def _iso8601_format(value): - return value.strftime("%4Y%m%dT%H:%M:%S") -else: - def _iso8601_format(value): - return value.strftime("%Y%m%dT%H:%M:%S").zfill(17) -del _day0 -del _try +def _iso8601_format(value): + if value.tzinfo is not None: + # XML-RPC only uses the naive portion of the datetime + value = value.replace(tzinfo=None) + # XML-RPC doesn't use '-' separator in the date part + return value.isoformat(timespec='seconds').replace('-', '') def _strftime(value): @@ -850,9 +824,9 @@ def __init__(self, results): def __getitem__(self, i): item = self.results[i] - if type(item) == type({}): + if isinstance(item, dict): raise Fault(item['faultCode'], item['faultString']) - elif type(item) == type([]): + elif isinstance(item, list): return item[0] else: raise ValueError("unexpected type in multicall result") @@ -1339,10 +1313,7 @@ def parse_response(self, response): p, u = self.getparser() - while 1: - data = stream.read(1024) - if not data: - break + while data := stream.read(1024): if self.verbose: print("body:", repr(data)) p.feed(data) diff --git a/Lib/xmlrpc/server.py b/Lib/xmlrpc/server.py index 69a260f5b12..4dddb1d10e0 100644 --- a/Lib/xmlrpc/server.py +++ b/Lib/xmlrpc/server.py @@ -268,17 +268,11 @@ def _marshaled_dispatch(self, data, dispatch_method = None, path = None): except Fault as fault: response = dumps(fault, allow_none=self.allow_none, encoding=self.encoding) - except: - # report exception back to server - exc_type, exc_value, exc_tb = sys.exc_info() - try: - response = dumps( - Fault(1, "%s:%s" % (exc_type, exc_value)), - encoding=self.encoding, allow_none=self.allow_none, - ) - finally: - # Break reference cycle - exc_type = exc_value = exc_tb = None + except BaseException as exc: + response = dumps( + Fault(1, "%s:%s" % (type(exc), exc)), + encoding=self.encoding, allow_none=self.allow_none, + ) return response.encode(self.encoding, 'xmlcharrefreplace') @@ -368,16 +362,11 @@ def system_multicall(self, call_list): {'faultCode' : fault.faultCode, 'faultString' : fault.faultString} ) - except: - exc_type, exc_value, exc_tb = sys.exc_info() - try: - results.append( - {'faultCode' : 1, - 'faultString' : "%s:%s" % (exc_type, exc_value)} - ) - finally: - # Break reference cycle - exc_type = exc_value = exc_tb = None + except BaseException as exc: + results.append( + {'faultCode' : 1, + 'faultString' : "%s:%s" % (type(exc), exc)} + ) return results def _dispatch(self, method, params): @@ -440,7 +429,7 @@ class SimpleXMLRPCRequestHandler(BaseHTTPRequestHandler): # Class attribute listing the accessible path components; # paths not on this list will result in a 404 error. - rpc_paths = ('/', '/RPC2') + rpc_paths = ('/', '/RPC2', '/pydoc.css') #if not None, encode responses larger than this, if possible encode_threshold = 1400 #a common MTU @@ -634,19 +623,14 @@ def _marshaled_dispatch(self, data, dispatch_method = None, path = None): try: response = self.dispatchers[path]._marshaled_dispatch( data, dispatch_method, path) - except: + except BaseException as exc: # report low level exception back to server # (each dispatcher should have handled their own # exceptions) - exc_type, exc_value = sys.exc_info()[:2] - try: - response = dumps( - Fault(1, "%s:%s" % (exc_type, exc_value)), - encoding=self.encoding, allow_none=self.allow_none) - response = response.encode(self.encoding, 'xmlcharrefreplace') - finally: - # Break reference cycle - exc_type = exc_value = None + response = dumps( + Fault(1, "%s:%s" % (type(exc), exc)), + encoding=self.encoding, allow_none=self.allow_none) + response = response.encode(self.encoding, 'xmlcharrefreplace') return response class CGIXMLRPCRequestHandler(SimpleXMLRPCDispatcher): @@ -736,9 +720,7 @@ def markup(self, text, escape=None, funcs={}, classes={}, methods={}): r'RFC[- ]?(\d+)|' r'PEP[- ]?(\d+)|' r'(self\.)?((?:\w|\.)+))\b') - while 1: - match = pattern.search(text, here) - if not match: break + while match := pattern.search(text, here): start, end = match.span() results.append(escape(text[here:start])) @@ -747,10 +729,10 @@ def markup(self, text, escape=None, funcs={}, classes={}, methods={}): url = escape(all).replace('"', '"') results.append('%s' % (url, url)) elif rfc: - url = 'http://www.rfc-editor.org/rfc/rfc%d.txt' % int(rfc) + url = 'https://www.rfc-editor.org/rfc/rfc%d.txt' % int(rfc) results.append('%s' % (url, escape(all))) elif pep: - url = 'https://www.python.org/dev/peps/pep-%04d/' % int(pep) + url = 'https://peps.python.org/pep-%04d/' % int(pep) results.append('%s' % (url, escape(all))) elif text[end:end+1] == '(': results.append(self.namelink(name, methods, funcs, classes)) @@ -801,7 +783,7 @@ def docserver(self, server_name, package_documentation, methods): server_name = self.escape(server_name) head = '%s' % server_name - result = self.heading(head, '#ffffff', '#7799ee') + result = self.heading(head) doc = self.markup(package_documentation, self.preformat, fdict) doc = doc and '%s' % doc @@ -812,10 +794,25 @@ def docserver(self, server_name, package_documentation, methods): for key, value in method_items: contents.append(self.docroutine(value, key, funcs=fdict)) result = result + self.bigsection( - 'Methods', '#ffffff', '#eeaa77', ''.join(contents)) + 'Methods', 'functions', ''.join(contents)) return result + + def page(self, title, contents): + """Format an HTML page.""" + css_path = "/pydoc.css" + css_link = ( + '' % + css_path) + return '''\ + + +
+ +