diff --git a/Lib/test/test_docxmlrpc.py b/Lib/test/test_docxmlrpc.py index 99469a58496..3c05412ef57 100644 --- a/Lib/test/test_docxmlrpc.py +++ b/Lib/test/test_docxmlrpc.py @@ -100,8 +100,6 @@ def test_valid_get_response(self): # Server raises an exception if we don't start to read the data response.read() - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_get_css(self): self.client.request("GET", "/pydoc.css") response = self.client.getresponse() @@ -121,6 +119,7 @@ def test_invalid_get_response(self): response.read() + @unittest.skip('TODO: RUSTPYTHON; http.client.RemoteDisconnected: Remote end closed connection without response') def test_lambda(self): """Test that lambda functionality stays the same. The output produced currently is, I suspect invalid because of the unencoded brackets in the @@ -163,6 +162,7 @@ def test_autolinking(self): @make_request_and_skipIf(sys.flags.optimize >= 2, "Docstrings are omitted with -O2 and above") + @unittest.skip('TODO: RUSTPYTHON; http.client.RemoteDisconnected: Remote end closed connection without response') def test_system_methods(self): """Test the presence of three consecutive system.* methods. @@ -190,6 +190,7 @@ def test_system_methods(self): b'
\nThis server does NOT support system' b'.methodSignature.'), response) + @unittest.skip('TODO: RUSTPYTHON; http.client.RemoteDisconnected: Remote end closed connection without response') def test_autolink_dotted_methods(self): """Test that selfdot values are made strong automatically in the documentation.""" @@ -199,6 +200,7 @@ def test_autolink_dotted_methods(self): self.assertIn(b"""Try self.add, too.""", response.read()) + @unittest.skip('TODO: RUSTPYTHON; http.client.RemoteDisconnected: Remote end closed connection without response') def test_annotations(self): """ Test that annotations works as expected """ self.client.request("GET", "/") @@ -212,6 +214,7 @@ def test_annotations(self): b'method_annotation(x: bytes)'), response.read()) + @unittest.skip('TODO: RUSTPYTHON; TypeError: HTMLDoc.heading() missing 2 required positional arguments: "fgcol" and "bgcol"') def test_server_title_escape(self): # bpo-38243: Ensure that the server title and documentation # are escaped for HTML. diff --git a/Lib/test/test_wsgiref.py b/Lib/test/test_wsgiref.py index b89e181f9f8..1a3b4d4b721 100644 --- a/Lib/test/test_wsgiref.py +++ b/Lib/test/test_wsgiref.py @@ -1,6 +1,6 @@ from unittest import mock from test import support -from test.support import warnings_helper +from test.support import socket_helper from test.test_httpservers import NoLogRequestHandler from unittest import TestCase from wsgiref.util import setup_testing_defaults @@ -80,41 +80,26 @@ def run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"): return out.getvalue(), err.getvalue() -def compare_generic_iter(make_it,match): - """Utility to compare a generic 2.1/2.2+ iterator with an iterable - If running under Python 2.2+, this tests the iterator using iter()/next(), - as well as __getitem__. 'make_it' must be a function returning a fresh +def compare_generic_iter(make_it, match): + """Utility to compare a generic iterator with an iterable + + This tests the iterator using iter()/next(). + 'make_it' must be a function returning a fresh iterator to be tested (since this may test the iterator twice).""" it = make_it() - n = 0 + if not iter(it) is it: + raise AssertionError for item in match: - if not it[n]==item: raise AssertionError - n+=1 - try: - it[n] - except IndexError: - pass - else: - raise AssertionError("Too many items from __getitem__",it) - + if not next(it) == item: + raise AssertionError try: - iter, StopIteration - except NameError: + next(it) + except StopIteration: pass else: - # Only test iter mode under 2.2+ - it = make_it() - if not iter(it) is it: raise AssertionError - for item in match: - if not next(it) == item: raise AssertionError - try: - next(it) - except StopIteration: - pass - else: - raise AssertionError("Too many items from .__next__()", it) + raise AssertionError("Too many items from .__next__()", it) class IntegrationTests(TestCase): @@ -149,10 +134,11 @@ def test_environ(self): b"Python test,Python test 2;query=test;/path/" ) + @unittest.expectedFailure # TODO: RUSTPYTHON; http library needs to be updated def test_request_length(self): out, err = run_amock(data=b"GET " + (b"x" * 65537) + b" HTTP/1.0\n\n") self.assertEqual(out.splitlines()[0], - b"HTTP/1.0 414 Request-URI Too Long") + b"HTTP/1.0 414 URI Too Long") def test_validated_hello(self): out, err = run_amock(validator(hello_app)) @@ -264,7 +250,7 @@ def app(environ, start_response): class WsgiHandler(NoLogRequestHandler, WSGIRequestHandler): pass - server = make_server(support.HOST, 0, app, handler_class=WsgiHandler) + server = make_server(socket_helper.HOST, 0, app, handler_class=WsgiHandler) self.addCleanup(server.server_close) interrupted = threading.Event() @@ -339,7 +325,6 @@ def checkReqURI(self,uri,query=1,**kw): util.setup_testing_defaults(kw) self.assertEqual(util.request_uri(kw,query),uri) - @warnings_helper.ignore_warnings(category=DeprecationWarning) def checkFW(self,text,size,match): def make_it(text=text,size=size): @@ -358,15 +343,6 @@ def make_it(text=text,size=size): it.close() self.assertTrue(it.filelike.closed) - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_filewrapper_getitem_deprecation(self): - wrapper = util.FileWrapper(StringIO('foobar'), 3) - with self.assertWarnsRegex(DeprecationWarning, - r'Use iterator protocol instead'): - # This should have returned 'bar'. - self.assertEqual(wrapper[1], 'foo') - def testSimpleShifts(self): self.checkShift('','/', '', '/', '') self.checkShift('','/x', 'x', '/x', '') @@ -473,6 +449,10 @@ def testHopByHop(self): for alt in hop, hop.title(), hop.upper(), hop.lower(): self.assertFalse(util.is_hop_by_hop(alt)) + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_filewrapper_getitem_deprecation(self): + return super().test_filewrapper_getitem_deprecation() + class HeaderTests(TestCase): def testMappingInterface(self): @@ -581,7 +561,7 @@ def testEnviron(self): # Test handler.environ as a dict expected = {} setup_testing_defaults(expected) - # Handler inherits os_environ variables which are not overriden + # Handler inherits os_environ variables which are not overridden # by SimpleHandler.add_cgi_vars() (SimpleHandler.base_env) for key, value in os_environ.items(): if key not in expected: @@ -821,8 +801,6 @@ def flush(self): b"Hello, world!", written) - # TODO: RUSTPYTHON - @unittest.expectedFailure def testClientConnectionTerminations(self): environ = {"SERVER_PROTOCOL": "HTTP/1.0"} for exception in ( @@ -841,8 +819,6 @@ def write(self, b): self.assertFalse(stderr.getvalue()) - # TODO: RUSTPYTHON - @unittest.expectedFailure def testDontResetInternalStateOnException(self): class CustomException(ValueError): pass diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py index cf3f535b190..6b2e6c44ca4 100644 --- a/Lib/test/test_xmlrpc.py +++ b/Lib/test/test_xmlrpc.py @@ -1042,55 +1042,46 @@ def test_path2(self): self.assertEqual(p.add(6,8), 6+8) self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8) - @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_resource('walltime') def test_path3(self): p = xmlrpclib.ServerProxy(URL+"/is/broken") self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) - @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_resource('walltime') def test_invalid_path(self): p = xmlrpclib.ServerProxy(URL+"/invalid") self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) - @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_resource('walltime') def test_path_query_fragment(self): p = xmlrpclib.ServerProxy(URL+"/foo?k=v#frag") self.assertEqual(p.test(), "/foo?k=v#frag") - @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_resource('walltime') def test_path_fragment(self): p = xmlrpclib.ServerProxy(URL+"/foo#frag") self.assertEqual(p.test(), "/foo#frag") - @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_resource('walltime') def test_path_query(self): p = xmlrpclib.ServerProxy(URL+"/foo?k=v") self.assertEqual(p.test(), "/foo?k=v") - @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_resource('walltime') def test_empty_path(self): p = xmlrpclib.ServerProxy(URL) self.assertEqual(p.test(), "/RPC2") - @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_resource('walltime') def test_root_path(self): p = xmlrpclib.ServerProxy(URL + "/") self.assertEqual(p.test(), "/") - @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_resource('walltime') def test_empty_path_query(self): p = xmlrpclib.ServerProxy(URL + "?k=v") self.assertEqual(p.test(), "?k=v") - @unittest.expectedFailure # TODO: RUSTPYTHON @support.requires_resource('walltime') def test_empty_path_fragment(self): p = xmlrpclib.ServerProxy(URL + "#frag") @@ -1142,7 +1133,6 @@ def test_two(self): #test special attribute access on the serverproxy, through the __call__ #function. -@unittest.skip("TODO: RUSTPYTHON, appears to hang") class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase): #ask for two keepalive requests to be handled. request_count=2 diff --git a/Lib/wsgiref/__init__.py b/Lib/wsgiref/__init__.py index 1efbba01a30..59ee48fddec 100644 --- a/Lib/wsgiref/__init__.py +++ b/Lib/wsgiref/__init__.py @@ -13,6 +13,8 @@ * validate -- validation wrapper that sits between an app and a server to detect errors in either +* types -- collection of WSGI-related types for static type checking + To-Do: * cgi_gateway -- Run WSGI apps under CGI (pending a deployment standard) diff --git a/Lib/wsgiref/handlers.py b/Lib/wsgiref/handlers.py index f4300b831a4..cafe872c7aa 100644 --- a/Lib/wsgiref/handlers.py +++ b/Lib/wsgiref/handlers.py @@ -136,6 +136,10 @@ def run(self, application): self.setup_environ() self.result = application(self.environ, self.start_response) self.finish_response() + except (ConnectionAbortedError, BrokenPipeError, ConnectionResetError): + # We expect the client to close the connection abruptly from time + # to time. + return except: try: self.handle_error() @@ -179,7 +183,16 @@ def finish_response(self): for data in self.result: self.write(data) self.finish_content() - finally: + except: + # Call close() on the iterable returned by the WSGI application + # in case of an exception. + if hasattr(self.result, 'close'): + self.result.close() + raise + else: + # We only call close() when no exception is raised, because it + # will set status, result, headers, and environ fields to None. + # See bpo-29183 for more details. self.close() @@ -215,8 +228,7 @@ def start_response(self, status, headers,exc_info=None): if exc_info: try: if self.headers_sent: - # Re-raise original exception if headers sent - raise exc_info[0](exc_info[1]).with_traceback(exc_info[2]) + raise finally: exc_info = None # avoid dangling circular ref elif self.headers is not None: @@ -225,18 +237,25 @@ def start_response(self, status, headers,exc_info=None): self.status = status self.headers = self.headers_class(headers) status = self._convert_string_type(status, "Status") - assert len(status)>=4,"Status must be at least 4 characters" - assert status[:3].isdigit(), "Status message must begin w/3-digit code" - assert status[3]==" ", "Status message must have a space after code" + self._validate_status(status) if __debug__: for name, val in headers: name = self._convert_string_type(name, "Header name") val = self._convert_string_type(val, "Header value") - assert not is_hop_by_hop(name),"Hop-by-hop headers not allowed" + assert not is_hop_by_hop(name),\ + f"Hop-by-hop header, '{name}: {val}', not allowed" return self.write + def _validate_status(self, status): + if len(status) < 4: + raise AssertionError("Status must be at least 4 characters") + if not status[:3].isdigit(): + raise AssertionError("Status message must begin w/3-digit code") + if status[3] != " ": + raise AssertionError("Status message must have a space after code") + def _convert_string_type(self, value, title): """Convert/check value type.""" if type(value) is str: @@ -456,10 +475,7 @@ def _write(self,data): from warnings import warn warn("SimpleHandler.stdout.write() should not do partial writes", DeprecationWarning) - while True: - data = data[result:] - if not data: - break + while data := data[result:]: result = self.stdout.write(data) def _flush(self): diff --git a/Lib/wsgiref/simple_server.py b/Lib/wsgiref/simple_server.py index f71563a5ae0..a0f2397fcf0 100644 --- a/Lib/wsgiref/simple_server.py +++ b/Lib/wsgiref/simple_server.py @@ -84,10 +84,6 @@ def get_environ(self): env['PATH_INFO'] = urllib.parse.unquote(path, 'iso-8859-1') env['QUERY_STRING'] = query - - host = self.address_string() - if host != self.client_address[0]: - env['REMOTE_HOST'] = host env['REMOTE_ADDR'] = self.client_address[0] if self.headers.get('content-type') is None: @@ -127,7 +123,8 @@ def handle(self): return handler = ServerHandler( - self.rfile, self.wfile, self.get_stderr(), self.get_environ() + self.rfile, self.wfile, self.get_stderr(), self.get_environ(), + multithread=False, ) handler.request_handler = self # backpointer for logging handler.run(self.server.get_app()) diff --git a/Lib/wsgiref/types.py b/Lib/wsgiref/types.py new file mode 100644 index 00000000000..ef0aead5b28 --- /dev/null +++ b/Lib/wsgiref/types.py @@ -0,0 +1,54 @@ +"""WSGI-related types for static type checking""" + +from collections.abc import Callable, Iterable, Iterator +from types import TracebackType +from typing import Any, Protocol, TypeAlias + +__all__ = [ + "StartResponse", + "WSGIEnvironment", + "WSGIApplication", + "InputStream", + "ErrorStream", + "FileWrapper", +] + +_ExcInfo: TypeAlias = tuple[type[BaseException], BaseException, TracebackType] +_OptExcInfo: TypeAlias = _ExcInfo | tuple[None, None, None] + +class StartResponse(Protocol): + """start_response() callable as defined in PEP 3333""" + def __call__( + self, + status: str, + headers: list[tuple[str, str]], + exc_info: _OptExcInfo | None = ..., + /, + ) -> Callable[[bytes], object]: ... + +WSGIEnvironment: TypeAlias = dict[str, Any] +WSGIApplication: TypeAlias = Callable[[WSGIEnvironment, StartResponse], + Iterable[bytes]] + +class InputStream(Protocol): + """WSGI input stream as defined in PEP 3333""" + def read(self, size: int = ..., /) -> bytes: ... + def readline(self, size: int = ..., /) -> bytes: ... + def readlines(self, hint: int = ..., /) -> list[bytes]: ... + def __iter__(self) -> Iterator[bytes]: ... + +class ErrorStream(Protocol): + """WSGI error stream as defined in PEP 3333""" + def flush(self) -> object: ... + def write(self, s: str, /) -> object: ... + def writelines(self, seq: list[str], /) -> object: ... + +class _Readable(Protocol): + def read(self, size: int = ..., /) -> bytes: ... + # Optional: def close(self) -> object: ... + +class FileWrapper(Protocol): + """WSGI file wrapper as defined in PEP 3333""" + def __call__( + self, file: _Readable, block_size: int = ..., /, + ) -> Iterable[bytes]: ... diff --git a/Lib/wsgiref/util.py b/Lib/wsgiref/util.py index 516fe898d01..63b92331737 100644 --- a/Lib/wsgiref/util.py +++ b/Lib/wsgiref/util.py @@ -4,7 +4,7 @@ __all__ = [ 'FileWrapper', 'guess_scheme', 'application_uri', 'request_uri', - 'shift_path_info', 'setup_testing_defaults', + 'shift_path_info', 'setup_testing_defaults', 'is_hop_by_hop', ] @@ -17,12 +17,6 @@ def __init__(self, filelike, blksize=8192): if hasattr(filelike,'close'): self.close = filelike.close - def __getitem__(self,key): - data = self.filelike.read(self.blksize) - if data: - return data - raise IndexError - def __iter__(self): return self @@ -155,9 +149,9 @@ def setup_testing_defaults(environ): _hoppish = { - 'connection':1, 'keep-alive':1, 'proxy-authenticate':1, - 'proxy-authorization':1, 'te':1, 'trailers':1, 'transfer-encoding':1, - 'upgrade':1 + 'connection', 'keep-alive', 'proxy-authenticate', + 'proxy-authorization', 'te', 'trailers', 'transfer-encoding', + 'upgrade' }.__contains__ def is_hop_by_hop(header_name): diff --git a/Lib/wsgiref/validate.py b/Lib/wsgiref/validate.py index 6107dcd7a4d..1a1853cd63a 100644 --- a/Lib/wsgiref/validate.py +++ b/Lib/wsgiref/validate.py @@ -1,6 +1,6 @@ # (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) -# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php -# Also licenced under the Apache License, 2.0: http://opensource.org/licenses/apache2.0.php +# Licensed under the MIT license: https://opensource.org/licenses/mit-license.php +# Also licenced under the Apache License, 2.0: https://opensource.org/licenses/apache2.0.php # Licensed to PSF under a Contributor Agreement """ Middleware to check for obedience to the WSGI specification. @@ -77,7 +77,7 @@ * That wsgi.input is used properly: - - .read() is called with zero or one argument + - .read() is called with exactly one argument - That it returns a string @@ -137,7 +137,7 @@ def validator(application): """ When applied between a WSGI server and a WSGI application, this - middleware will check for WSGI compliancy on a number of levels. + middleware will check for WSGI compliance on a number of levels. This middleware does not modify the request or response in any way, but will raise an AssertionError if anything seems off (except for a failure to close the application iterator, which @@ -214,10 +214,7 @@ def readlines(self, *args): return lines def __iter__(self): - while 1: - line = self.readline() - if not line: - return + while line := self.readline(): yield line def close(self): @@ -390,7 +387,6 @@ def check_headers(headers): assert_(type(headers) is list, "Headers (%r) must be of type list: %r" % (headers, type(headers))) - header_names = {} for item in headers: assert_(type(item) is tuple, "Individual headers (%r) must be of type tuple: %r" @@ -403,7 +399,6 @@ def check_headers(headers): "The Status header cannot be used; it conflicts with CGI " "script, and HTTP status is not given through headers " "(value: %r)." % value) - header_names[name.lower()] = None assert_('\n' not in name and ':' not in name, "Header names may not contain ':' or '\\n': %r" % name) assert_(header_re.search(name), "Bad header name: %r" % name) diff --git a/Lib/xmlrpc/client.py b/Lib/xmlrpc/client.py index a614cef6ab2..f441376d09c 100644 --- a/Lib/xmlrpc/client.py +++ b/Lib/xmlrpc/client.py @@ -245,41 +245,15 @@ def __repr__(self): ## # Backwards compatibility - boolean = Boolean = bool -## -# Wrapper for XML-RPC DateTime values. This converts a time value to -# the format used by XML-RPC. -#

-# The value can be given as a datetime object, as a string in the -# format "yyyymmddThh:mm:ss", as a 9-item time tuple (as returned by -# time.localtime()), or an integer value (as returned by time.time()). -# The wrapper uses time.localtime() to convert an integer to a time -# tuple. -# -# @param value The time, given as a datetime object, an ISO 8601 string, -# a time tuple, or an integer time value. - -# Issue #13305: different format codes across platforms -_day0 = datetime(1, 1, 1) -def _try(fmt): - try: - return _day0.strftime(fmt) == '0001' - except ValueError: - return False -if _try('%Y'): # Mac OS X - def _iso8601_format(value): - return value.strftime("%Y%m%dT%H:%M:%S") -elif _try('%4Y'): # Linux - def _iso8601_format(value): - return value.strftime("%4Y%m%dT%H:%M:%S") -else: - def _iso8601_format(value): - return value.strftime("%Y%m%dT%H:%M:%S").zfill(17) -del _day0 -del _try +def _iso8601_format(value): + if value.tzinfo is not None: + # XML-RPC only uses the naive portion of the datetime + value = value.replace(tzinfo=None) + # XML-RPC doesn't use '-' separator in the date part + return value.isoformat(timespec='seconds').replace('-', '') def _strftime(value): @@ -850,9 +824,9 @@ def __init__(self, results): def __getitem__(self, i): item = self.results[i] - if type(item) == type({}): + if isinstance(item, dict): raise Fault(item['faultCode'], item['faultString']) - elif type(item) == type([]): + elif isinstance(item, list): return item[0] else: raise ValueError("unexpected type in multicall result") @@ -1339,10 +1313,7 @@ def parse_response(self, response): p, u = self.getparser() - while 1: - data = stream.read(1024) - if not data: - break + while data := stream.read(1024): if self.verbose: print("body:", repr(data)) p.feed(data) diff --git a/Lib/xmlrpc/server.py b/Lib/xmlrpc/server.py index 69a260f5b12..4dddb1d10e0 100644 --- a/Lib/xmlrpc/server.py +++ b/Lib/xmlrpc/server.py @@ -268,17 +268,11 @@ def _marshaled_dispatch(self, data, dispatch_method = None, path = None): except Fault as fault: response = dumps(fault, allow_none=self.allow_none, encoding=self.encoding) - except: - # report exception back to server - exc_type, exc_value, exc_tb = sys.exc_info() - try: - response = dumps( - Fault(1, "%s:%s" % (exc_type, exc_value)), - encoding=self.encoding, allow_none=self.allow_none, - ) - finally: - # Break reference cycle - exc_type = exc_value = exc_tb = None + except BaseException as exc: + response = dumps( + Fault(1, "%s:%s" % (type(exc), exc)), + encoding=self.encoding, allow_none=self.allow_none, + ) return response.encode(self.encoding, 'xmlcharrefreplace') @@ -368,16 +362,11 @@ def system_multicall(self, call_list): {'faultCode' : fault.faultCode, 'faultString' : fault.faultString} ) - except: - exc_type, exc_value, exc_tb = sys.exc_info() - try: - results.append( - {'faultCode' : 1, - 'faultString' : "%s:%s" % (exc_type, exc_value)} - ) - finally: - # Break reference cycle - exc_type = exc_value = exc_tb = None + except BaseException as exc: + results.append( + {'faultCode' : 1, + 'faultString' : "%s:%s" % (type(exc), exc)} + ) return results def _dispatch(self, method, params): @@ -440,7 +429,7 @@ class SimpleXMLRPCRequestHandler(BaseHTTPRequestHandler): # Class attribute listing the accessible path components; # paths not on this list will result in a 404 error. - rpc_paths = ('/', '/RPC2') + rpc_paths = ('/', '/RPC2', '/pydoc.css') #if not None, encode responses larger than this, if possible encode_threshold = 1400 #a common MTU @@ -634,19 +623,14 @@ def _marshaled_dispatch(self, data, dispatch_method = None, path = None): try: response = self.dispatchers[path]._marshaled_dispatch( data, dispatch_method, path) - except: + except BaseException as exc: # report low level exception back to server # (each dispatcher should have handled their own # exceptions) - exc_type, exc_value = sys.exc_info()[:2] - try: - response = dumps( - Fault(1, "%s:%s" % (exc_type, exc_value)), - encoding=self.encoding, allow_none=self.allow_none) - response = response.encode(self.encoding, 'xmlcharrefreplace') - finally: - # Break reference cycle - exc_type = exc_value = None + response = dumps( + Fault(1, "%s:%s" % (type(exc), exc)), + encoding=self.encoding, allow_none=self.allow_none) + response = response.encode(self.encoding, 'xmlcharrefreplace') return response class CGIXMLRPCRequestHandler(SimpleXMLRPCDispatcher): @@ -736,9 +720,7 @@ def markup(self, text, escape=None, funcs={}, classes={}, methods={}): r'RFC[- ]?(\d+)|' r'PEP[- ]?(\d+)|' r'(self\.)?((?:\w|\.)+))\b') - while 1: - match = pattern.search(text, here) - if not match: break + while match := pattern.search(text, here): start, end = match.span() results.append(escape(text[here:start])) @@ -747,10 +729,10 @@ def markup(self, text, escape=None, funcs={}, classes={}, methods={}): url = escape(all).replace('"', '"') results.append('%s' % (url, url)) elif rfc: - url = 'http://www.rfc-editor.org/rfc/rfc%d.txt' % int(rfc) + url = 'https://www.rfc-editor.org/rfc/rfc%d.txt' % int(rfc) results.append('%s' % (url, escape(all))) elif pep: - url = 'https://www.python.org/dev/peps/pep-%04d/' % int(pep) + url = 'https://peps.python.org/pep-%04d/' % int(pep) results.append('%s' % (url, escape(all))) elif text[end:end+1] == '(': results.append(self.namelink(name, methods, funcs, classes)) @@ -801,7 +783,7 @@ def docserver(self, server_name, package_documentation, methods): server_name = self.escape(server_name) head = '%s' % server_name - result = self.heading(head, '#ffffff', '#7799ee') + result = self.heading(head) doc = self.markup(package_documentation, self.preformat, fdict) doc = doc and '%s' % doc @@ -812,10 +794,25 @@ def docserver(self, server_name, package_documentation, methods): for key, value in method_items: contents.append(self.docroutine(value, key, funcs=fdict)) result = result + self.bigsection( - 'Methods', '#ffffff', '#eeaa77', ''.join(contents)) + 'Methods', 'functions', ''.join(contents)) return result + + def page(self, title, contents): + """Format an HTML page.""" + css_path = "/pydoc.css" + css_link = ( + '' % + css_path) + return '''\ + + + + +Python: %s +%s%s''' % (title, css_link, contents) + class XMLRPCDocGenerator: """Generates documentation for an XML-RPC server. @@ -907,6 +904,12 @@ class DocXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): for documentation. """ + def _get_css(self, url): + path_here = os.path.dirname(os.path.realpath(__file__)) + css_path = os.path.join(path_here, "..", "pydoc_data", "_pydoc.css") + with open(css_path, mode="rb") as fp: + return fp.read() + def do_GET(self): """Handles the HTTP GET request. @@ -918,9 +921,15 @@ def do_GET(self): self.report_404() return - response = self.server.generate_html_documentation().encode('utf-8') + if self.path.endswith('.css'): + content_type = 'text/css' + response = self._get_css(self.path) + else: + content_type = 'text/html' + response = self.server.generate_html_documentation().encode('utf-8') + self.send_response(200) - self.send_header("Content-type", "text/html") + self.send_header('Content-Type', '%s; charset=UTF-8' % content_type) self.send_header("Content-length", str(len(response))) self.end_headers() self.wfile.write(response)