From 7e1bd0f760555c9c565a414c3779269c059acc4b Mon Sep 17 00:00:00 2001 From: ShaharNaveh <50263213+ShaharNaveh@users.noreply.github.com> Date: Tue, 2 Jun 2026 14:07:21 +0300 Subject: [PATCH 1/5] Update thread related tests to 3.14.5 --- Lib/test/test_thread.py | 224 ++++++++++++++++-- Lib/test/test_thread_local_bytecode.py | 198 ++++++++++++++++ Lib/test/test_threadedtempfile.py | 7 +- Lib/test/test_threading.py | 316 ++++++++++++++++++++----- Lib/test/test_threading_local.py | 12 +- 5 files changed, 679 insertions(+), 78 deletions(-) create mode 100644 Lib/test/test_thread_local_bytecode.py diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py index 4ae8a833b99..ac924728feb 100644 --- a/Lib/test/test_thread.py +++ b/Lib/test/test_thread.py @@ -5,13 +5,15 @@ from test.support import threading_helper import _thread as thread import time +import warnings import weakref from test import lock_tests +threading_helper.requires_working_threading(module=True) + NUMTASKS = 10 NUMTRIPS = 3 -POLL_SLEEP = 0.010 # seconds = 10 ms _print_mutex = thread.allocate_lock() @@ -74,6 +76,14 @@ def test_stack_size(self): thread.stack_size(0) self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default") + with self.assertRaises(ValueError): + # 123 bytes is too small + thread.stack_size(123) + + with self.assertRaises(ValueError): + # size must be positive + thread.stack_size(-4096) + @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix') def test_nt_and_posix_stack_size(self): try: @@ -119,19 +129,24 @@ def task(): with threading_helper.wait_threads_exit(): thread.start_new_thread(task, ()) - while not started: - time.sleep(POLL_SLEEP) + for _ in support.sleeping_retry(support.LONG_TIMEOUT): + if started: + break self.assertEqual(thread._count(), orig + 1) + # Allow the task to finish. mut.release() + # The only reliable way to be sure that the thread ended from the - # interpreter's point of view is to wait for the function object to be - # destroyed. + # interpreter's point of view is to wait for the function object to + # be destroyed. done = [] wr = weakref.ref(task, lambda _: done.append(None)) del task - while not done: - time.sleep(POLL_SLEEP) + + for _ in support.sleeping_retry(support.LONG_TIMEOUT): + if done: + break support.gc_collect() # For PyPy or other GCs. self.assertEqual(thread._count(), orig) @@ -148,11 +163,188 @@ def task(): started.acquire() self.assertEqual(str(cm.unraisable.exc_value), "task failed") - self.assertIs(cm.unraisable.object, task) + self.assertIsNone(cm.unraisable.object) self.assertEqual(cm.unraisable.err_msg, - "Exception ignored in thread started by") + f"Exception ignored in thread started by {task!r}") self.assertIsNotNone(cm.unraisable.exc_traceback) + def test_join_thread(self): + finished = [] + + def task(): + time.sleep(0.05) + finished.append(thread.get_ident()) + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + handle.join() + self.assertEqual(len(finished), 1) + self.assertEqual(handle.ident, finished[0]) + + def test_join_thread_already_exited(self): + def task(): + pass + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + time.sleep(0.05) + handle.join() + + def test_join_several_times(self): + def task(): + pass + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + handle.join() + # Subsequent join() calls should succeed + handle.join() + + def test_joinable_not_joined(self): + handle_destroyed = thread.allocate_lock() + handle_destroyed.acquire() + + def task(): + handle_destroyed.acquire() + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + del handle + handle_destroyed.release() + + def test_join_from_self(self): + errors = [] + handles = [] + start_joinable_thread_returned = thread.allocate_lock() + start_joinable_thread_returned.acquire() + task_tried_to_join = thread.allocate_lock() + task_tried_to_join.acquire() + + def task(): + start_joinable_thread_returned.acquire() + try: + handles[0].join() + except Exception as e: + errors.append(e) + finally: + task_tried_to_join.release() + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + handles.append(handle) + start_joinable_thread_returned.release() + # Can still join after joining failed in other thread + task_tried_to_join.acquire() + handle.join() + + assert len(errors) == 1 + with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"): + raise errors[0] + + def test_join_then_self_join(self): + # make sure we can't deadlock in the following scenario with + # threads t0 and t1 (see comment in `ThreadHandle_join()` for more + # details): + # + # - t0 joins t1 + # - t1 self joins + def make_lock(): + lock = thread.allocate_lock() + lock.acquire() + return lock + + error = None + self_joiner_handle = None + self_joiner_started = make_lock() + self_joiner_barrier = make_lock() + def self_joiner(): + nonlocal error + + self_joiner_started.release() + self_joiner_barrier.acquire() + + try: + self_joiner_handle.join() + except Exception as e: + error = e + + joiner_started = make_lock() + def joiner(): + joiner_started.release() + self_joiner_handle.join() + + with threading_helper.wait_threads_exit(): + self_joiner_handle = thread.start_joinable_thread(self_joiner) + # Wait for the self-joining thread to start + self_joiner_started.acquire() + + # Start the thread that joins the self-joiner + joiner_handle = thread.start_joinable_thread(joiner) + + # Wait for the joiner to start + joiner_started.acquire() + + # Not great, but I don't think there's a deterministic way to make + # sure that the self-joining thread has been joined. + time.sleep(0.1) + + # Unblock the self-joiner + self_joiner_barrier.release() + + self_joiner_handle.join() + joiner_handle.join() + + with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"): + raise error + + def test_join_with_timeout(self): + lock = thread.allocate_lock() + lock.acquire() + + def thr(): + lock.acquire() + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(thr) + handle.join(0.1) + self.assertFalse(handle.is_done()) + lock.release() + handle.join() + self.assertTrue(handle.is_done()) + + def test_join_unstarted(self): + handle = thread._ThreadHandle() + with self.assertRaisesRegex(RuntimeError, "thread not started"): + handle.join() + + def test_set_done_unstarted(self): + handle = thread._ThreadHandle() + with self.assertRaisesRegex(RuntimeError, "thread not started"): + handle._set_done() + + def test_start_duplicate_handle(self): + lock = thread.allocate_lock() + lock.acquire() + + def func(): + lock.acquire() + + handle = thread._ThreadHandle() + with threading_helper.wait_threads_exit(): + thread.start_joinable_thread(func, handle=handle) + with self.assertRaisesRegex(RuntimeError, "thread already started"): + thread.start_joinable_thread(func, handle=handle) + lock.release() + handle.join() + + def test_start_with_none_handle(self): + def func(): + pass + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(func, handle=None) + handle.join() + class Barrier: def __init__(self, num_threads): @@ -224,7 +416,7 @@ class TestForkInThread(unittest.TestCase): def setUp(self): self.read_fd, self.write_fd = os.pipe() - @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork') + @support.requires_fork() @threading_helper.reap_threads def test_forkinthread(self): pid = None @@ -232,11 +424,13 @@ def test_forkinthread(self): def fork_thread(read_fd, write_fd): nonlocal pid - # fork in a thread - pid = os.fork() - if pid: - # parent process - return + # Ignore the warning about fork with threads. + with warnings.catch_warnings(category=DeprecationWarning, + action="ignore"): + # fork in a thread (DANGER, undefined per POSIX) + if (pid := os.fork()): + # parent process + return # child process try: diff --git a/Lib/test/test_thread_local_bytecode.py b/Lib/test/test_thread_local_bytecode.py new file mode 100644 index 00000000000..d5c56db8d5d --- /dev/null +++ b/Lib/test/test_thread_local_bytecode.py @@ -0,0 +1,198 @@ +"""Tests for thread-local bytecode.""" +import textwrap +import unittest + +from test import support +from test.support import cpython_only, import_helper, requires_specialization_ft +from test.support.script_helper import assert_python_ok +from test.support.threading_helper import requires_working_threading + +# Skip this test if the _testinternalcapi module isn't available +_testinternalcapi = import_helper.import_module("_testinternalcapi") + + +@cpython_only +@requires_working_threading() +@unittest.skipUnless(support.Py_GIL_DISABLED, "only in free-threaded builds") +class TLBCTests(unittest.TestCase): + @requires_specialization_ft + def test_new_threads_start_with_unspecialized_code(self): + code = textwrap.dedent(""" + import dis + import queue + import threading + + from _testinternalcapi import get_tlbc + + def all_opnames(bc): + return {i.opname for i in dis._get_instructions_bytes(bc)} + + def f(a, b, q=None): + if q is not None: + q.put(get_tlbc(f)) + return a + b + + for _ in range(100): + # specialize + f(1, 2) + + q = queue.Queue() + t = threading.Thread(target=f, args=('a', 'b', q)) + t.start() + t.join() + + assert "BINARY_OP_ADD_INT" in all_opnames(get_tlbc(f)) + assert "BINARY_OP_ADD_INT" not in all_opnames(q.get()) + """) + assert_python_ok("-X", "tlbc=1", "-c", code) + + @requires_specialization_ft + def test_threads_specialize_independently(self): + code = textwrap.dedent(""" + import dis + import queue + import threading + + from _testinternalcapi import get_tlbc + + def all_opnames(bc): + return {i.opname for i in dis._get_instructions_bytes(bc)} + + def f(a, b): + return a + b + + def g(a, b, q=None): + for _ in range(100): + f(a, b) + if q is not None: + q.put(get_tlbc(f)) + + # specialize in main thread + g(1, 2) + + # specialize in other thread + q = queue.Queue() + t = threading.Thread(target=g, args=('a', 'b', q)) + t.start() + t.join() + + assert "BINARY_OP_ADD_INT" in all_opnames(get_tlbc(f)) + t_opnames = all_opnames(q.get()) + assert "BINARY_OP_ADD_INT" not in t_opnames + assert "BINARY_OP_ADD_UNICODE" in t_opnames + """) + assert_python_ok("-X", "tlbc=1", "-c", code) + + def test_reuse_tlbc_across_threads_different_lifetimes(self): + code = textwrap.dedent(""" + import queue + import threading + + from _testinternalcapi import get_tlbc_id + + def f(a, b, q=None): + if q is not None: + q.put(get_tlbc_id(f)) + return a + b + + q = queue.Queue() + tlbc_ids = [] + for _ in range(3): + t = threading.Thread(target=f, args=('a', 'b', q)) + t.start() + t.join() + tlbc_ids.append(q.get()) + + assert tlbc_ids[0] == tlbc_ids[1] + assert tlbc_ids[1] == tlbc_ids[2] + """) + assert_python_ok("-X", "tlbc=1", "-c", code) + + @support.skip_if_sanitizer("gh-129752: data race on adaptive counter", thread=True) + def test_no_copies_if_tlbc_disabled(self): + code = textwrap.dedent(""" + import queue + import threading + + from _testinternalcapi import get_tlbc_id + + def f(a, b, q=None): + if q is not None: + q.put(get_tlbc_id(f)) + return a + b + + q = queue.Queue() + threads = [] + for _ in range(3): + t = threading.Thread(target=f, args=('a', 'b', q)) + t.start() + threads.append(t) + + tlbc_ids = [] + for t in threads: + t.join() + tlbc_ids.append(q.get()) + + main_tlbc_id = get_tlbc_id(f) + assert main_tlbc_id is not None + assert tlbc_ids[0] == main_tlbc_id + assert tlbc_ids[1] == main_tlbc_id + assert tlbc_ids[2] == main_tlbc_id + """) + assert_python_ok("-X", "tlbc=0", "-c", code) + + def test_no_specialization_if_tlbc_disabled(self): + code = textwrap.dedent(""" + import dis + import queue + import threading + + from _testinternalcapi import get_tlbc + + def all_opnames(f): + bc = get_tlbc(f) + return {i.opname for i in dis._get_instructions_bytes(bc)} + + def f(a, b): + return a + b + + for _ in range(100): + f(1, 2) + + assert "BINARY_OP_ADD_INT" not in all_opnames(f) + """) + assert_python_ok("-X", "tlbc=0", "-c", code) + + def test_generator_throw(self): + code = textwrap.dedent(""" + import queue + import threading + + from _testinternalcapi import get_tlbc_id + + def g(): + try: + yield + except: + yield get_tlbc_id(g) + + def f(q): + gen = g() + next(gen) + q.put(gen.throw(ValueError)) + + q = queue.Queue() + t = threading.Thread(target=f, args=(q,)) + t.start() + t.join() + + gen = g() + next(gen) + main_id = gen.throw(ValueError) + assert main_id != q.get() + """) + assert_python_ok("-X", "tlbc=1", "-c", code) + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_threadedtempfile.py b/Lib/test/test_threadedtempfile.py index 12feb465dbe..acb427b0c78 100644 --- a/Lib/test/test_threadedtempfile.py +++ b/Lib/test/test_threadedtempfile.py @@ -15,14 +15,14 @@ import tempfile +from test import support from test.support import threading_helper import unittest import io import threading from traceback import print_exc -import sys # XXX: RUSTPYTHON - +threading_helper.requires_working_threading(module=True) NUM_THREADS = 20 FILES_PER_THREAD = 50 @@ -50,7 +50,8 @@ def run(self): class ThreadedTempFileTest(unittest.TestCase): - def test_main(self): + @support.bigmemtest(size=NUM_THREADS, memuse=60*2**20, dry_run=False) + def test_main(self, size): threads = [TempFileGreedy() for i in range(NUM_THREADS)] with threading_helper.start_threads(threads, startEvent.set): pass diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 17693ae093f..be1f72b30e3 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -5,7 +5,7 @@ import test.support from test.support import threading_helper, requires_subprocess, requires_gil_enabled from test.support import verbose, cpython_only, os_helper -from test.support.import_helper import import_module +from test.support.import_helper import ensure_lazy_imports, import_module from test.support.script_helper import assert_python_ok, assert_python_failure from test.support import force_not_colorized @@ -28,7 +28,7 @@ from test import support try: - from test.support import interpreters + from concurrent import interpreters except ImportError: interpreters = None @@ -120,6 +120,10 @@ def tearDown(self): class ThreadTests(BaseTestCase): maxDiff = 9999 + @cpython_only + def test_lazy_import(self): + ensure_lazy_imports("threading", {"functools", "warnings"}) + @cpython_only def test_name(self): def func(): pass @@ -319,7 +323,6 @@ def f(mutex): # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) # exposed at the Python level. This test relies on ctypes to get at it. - @cpython_only def test_PyThreadState_SetAsyncExc(self): ctypes = import_module("ctypes") @@ -408,7 +411,7 @@ def run(self): t.join() # else the thread is still running, and we have no way to kill it - @unittest.skip('TODO: RUSTPYTHON; threading._start_new_thread not exposed') + @unittest.skip("TODO: RUSTPYTHON; threading._start_new_thread not exposed") def test_limbo_cleanup(self): # Issue 7481: Failure to start thread should cleanup the limbo map. def fail_new_thread(*args, **kwargs): @@ -424,7 +427,7 @@ def fail_new_thread(*args, **kwargs): finally: threading._start_joinable_thread = _start_joinable_thread - @unittest.expectedFailure # TODO: RUSTPYTHON; ctypes.pythonapi is not supported + @unittest.expectedFailure # TODO: RUSTPYTHON; ctypes.pythonapi is not supported def test_finalize_running_thread(self): # Issue 1402: the PyGILState_Ensure / _Release functions may be called # very late on python exit: on deallocation of a running thread for @@ -681,7 +684,7 @@ def background_thread(evt): self.assertEqual(out, b'') self.assertEqual(err, b'') - @unittest.skip('TODO: RUSTPYTHON; flaky') + @unittest.skip("TODO: RUSTPYTHON; flaky") @skip_unless_reliable_fork def test_is_alive_after_fork(self): # Try hard to trigger #18418: is_alive() could sometimes be True on @@ -745,7 +748,7 @@ def test_main_thread_after_fork(self): "main ident True\n" "current is main True\n") - @unittest.skip("TODO: RUSTPYTHON flaky; process timeout after fork") + @unittest.skip("TODO: RUSTPYTHON; flaky; process timeout after fork") @skip_unless_reliable_fork @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") def test_main_thread_after_fork_from_nonmain_thread(self): @@ -1016,7 +1019,7 @@ def noop_trace(frame, event, arg): finally: threading.settrace(old_trace) - @unittest.expectedFailure # TODO: RUSTPYTHON + @unittest.expectedFailure # TODO: RUSTPYTHON def test_gettrace_all_threads(self): def fn(*args): pass old_trace = threading.gettrace() @@ -1055,7 +1058,7 @@ def fn(*args): pass finally: threading.setprofile(old_profile) - @unittest.expectedFailure # TODO: RUSTPYTHON + @unittest.expectedFailure # TODO: RUSTPYTHON def test_getprofile_all_threads(self): def fn(*args): pass old_profile = threading.getprofile() @@ -1179,6 +1182,77 @@ def __del__(self): self.assertEqual(out.strip(), b"OK") self.assertIn(b"can't create new thread at interpreter shutdown", err) + def test_join_daemon_thread_in_finalization(self): + # gh-123940: Py_Finalize() prevents other threads from running Python + # code, so join() can not succeed unless the thread is already done. + # (Non-Python threads, that is `threading._DummyThread`, can't be + # joined at all.) + # We raise an exception rather than hang. + for timeout in (None, 10): + with self.subTest(timeout=timeout): + code = textwrap.dedent(f""" + import threading + + + def loop(): + while True: + pass + + + class Cycle: + def __init__(self): + self.self_ref = self + self.thr = threading.Thread( + target=loop, daemon=True) + self.thr.start() + + def __del__(self): + assert self.thr.is_alive() + try: + self.thr.join(timeout={timeout}) + except PythonFinalizationError: + assert self.thr.is_alive() + print('got the correct exception!') + + # Cycle holds a reference to itself, which ensures it is + # cleaned up during the GC that runs after daemon threads + # have been forced to exit during finalization. + Cycle() + """) + rc, out, err = assert_python_ok("-c", code) + self.assertEqual(err, b"") + self.assertIn(b"got the correct exception", out) + + def test_join_finished_daemon_thread_in_finalization(self): + # (see previous test) + # If the thread is already finished, join() succeeds. + code = textwrap.dedent(""" + import threading + done = threading.Event() + + def set_event(): + done.set() + + class Cycle: + def __init__(self): + self.self_ref = self + self.thr = threading.Thread(target=set_event, daemon=True) + self.thr.start() + self.thr.join() + + def __del__(self): + assert done.is_set() + assert not self.thr.is_alive() + self.thr.join() + assert not self.thr.is_alive() + print('all clear!') + + Cycle() + """) + rc, out, err = assert_python_ok("-c", code) + self.assertEqual(err, b"") + self.assertIn(b"all clear", out) + def test_start_new_thread_failed(self): # gh-109746: if Python fails to start newly created thread # due to failure of underlying PyThread_start_new_thread() call, @@ -1214,36 +1288,6 @@ def f(): self.assertEqual(out, b'ok') self.assertEqual(err, b'') - - @skip_unless_reliable_fork - @unittest.skipUnless(hasattr(threading, 'get_native_id'), "test needs threading.get_native_id()") - def test_native_id_after_fork(self): - script = """if True: - import threading - import os - from test import support - - parent_thread_native_id = threading.current_thread().native_id - print(parent_thread_native_id, flush=True) - assert parent_thread_native_id == threading.get_native_id() - childpid = os.fork() - if childpid == 0: - print(threading.current_thread().native_id, flush=True) - assert threading.current_thread().native_id == threading.get_native_id() - else: - try: - assert parent_thread_native_id == threading.current_thread().native_id - assert parent_thread_native_id == threading.get_native_id() - finally: - support.wait_process(childpid, exitcode=0) - """ - rc, out, err = assert_python_ok('-c', script) - self.assertEqual(rc, 0) - self.assertEqual(err, b"") - native_ids = out.strip().splitlines() - self.assertEqual(len(native_ids), 2) - self.assertNotEqual(native_ids[0], native_ids[1]) - @cpython_only def test_finalize_daemon_thread_hang(self): if support.check_sanitizer(thread=True, memory=True): @@ -1314,11 +1358,66 @@ def do_flush(*args, **kwargs): ''') assert_python_ok("-c", script) - @unittest.skip('TODO: RUSTPYTHON; Thread._tstate_lock not implemented') + @skip_unless_reliable_fork + @unittest.skipUnless(hasattr(threading, 'get_native_id'), "test needs threading.get_native_id()") + def test_native_id_after_fork(self): + script = """if True: + import threading + import os + from test import support + + parent_thread_native_id = threading.current_thread().native_id + print(parent_thread_native_id, flush=True) + assert parent_thread_native_id == threading.get_native_id() + childpid = os.fork() + if childpid == 0: + print(threading.current_thread().native_id, flush=True) + assert threading.current_thread().native_id == threading.get_native_id() + else: + try: + assert parent_thread_native_id == threading.current_thread().native_id + assert parent_thread_native_id == threading.get_native_id() + finally: + support.wait_process(childpid, exitcode=0) + """ + rc, out, err = assert_python_ok('-c', script) + self.assertEqual(rc, 0) + self.assertEqual(err, b"") + native_ids = out.strip().splitlines() + self.assertEqual(len(native_ids), 2) + self.assertNotEqual(native_ids[0], native_ids[1]) + + def test_stop_the_world_during_finalization(self): + # gh-137433: Test functions that trigger a stop-the-world in the free + # threading build concurrent with interpreter finalization. + script = """if True: + import gc + import sys + import threading + NUM_THREADS = 5 + b = threading.Barrier(NUM_THREADS + 1) + def run_in_bg(): + b.wait() + while True: + sys.setprofile(None) + gc.collect() + + for _ in range(NUM_THREADS): + t = threading.Thread(target=run_in_bg, daemon=True) + t.start() + + b.wait() + print("Exiting...") + """ + rc, out, err = assert_python_ok('-c', script) + self.assertEqual(rc, 0) + self.assertEqual(err, b"") + self.assertEqual(out.strip(), b"Exiting...") + + @unittest.skip("TODO: RUSTPYTHON; Thread._tstate_lock not implemented") def test_tstate_lock(self): return super().test_tstate_lock() - class ThreadJoinOnShutdown(BaseTestCase): def _run_and_join(self, script): @@ -1370,7 +1469,7 @@ def test_2_join_in_forked_process(self): """ self._run_and_join(script) - @unittest.skip('TODO: RUSTPYTHON; flaky test') + @unittest.skip("TODO: RUSTPYTHON; flaky test") @skip_unless_reliable_fork def test_3_join_in_forked_from_thread(self): # Like the test above, but fork() was called from a worker thread @@ -1471,8 +1570,7 @@ def thread1(): self.assertEqual(out.strip(), b"OK") self.assertEqual(rc, 0) - # TODO: RUSTPYTHON - parking_lot mutex not fork-safe, child may SIGSEGV - @unittest.skip("TODO: RUSTPYTHON - flaky, parking_lot mutex not fork-safe") + @unittest.skip("TODO: RUSTPYTHON; - flaky, parking_lot mutex not fork-safe") @skip_unless_reliable_fork def test_reinit_tls_after_fork(self): # Issue #13817: fork() would deadlock in a multithreaded program with @@ -1537,7 +1635,7 @@ def pipe(self): os.set_blocking(r, False) return (r, w) - @unittest.expectedFailure # TODO: RUSTPYTHON + @unittest.expectedFailure # TODO: RUSTPYTHON def test_threads_join(self): # Non-daemon threads should be joined at subinterpreter shutdown # (issue #18808) @@ -1566,7 +1664,7 @@ def f(): # The thread was joined properly. self.assertEqual(os.read(r, 1), b"x") - @unittest.expectedFailure # TODO: RUSTPYTHON + @unittest.expectedFailure # TODO: RUSTPYTHON def test_threads_join_2(self): # Same as above, but a delay gets introduced after the thread's # Python code returned but before the thread state is deleted. @@ -1643,6 +1741,7 @@ def task(): self.assertEqual(os.read(r_interp, 1), DONE) @cpython_only + @support.skip_if_sanitizer(thread=True, memory=True) def test_daemon_threads_fatal_error(self): import_module("_testcapi") subinterp_code = f"""if 1: @@ -1661,10 +1760,7 @@ def f(): _testcapi.run_in_subinterp(%r) """ % (subinterp_code,) - with test.support.SuppressCrashReport(): - rc, out, err = assert_python_failure("-c", script) - self.assertIn("Fatal Python error: Py_EndInterpreter: " - "not the last thread", err.decode()) + assert_python_ok("-c", script) def _check_allowed(self, before_start='', *, allowed=True, @@ -1755,7 +1851,7 @@ def test_releasing_unacquired_lock(self): lock = threading.Lock() self.assertRaises(RuntimeError, lock.release) - @unittest.skip('TODO: RUSTPYTHON; flaky test') + @unittest.skip("TODO: RUSTPYTHON; flaky test") @requires_subprocess() def test_recursion_limit(self): # Issue 9670 @@ -2139,7 +2235,7 @@ def __init__(self, a, *, b) -> None: CustomRLock(1, b=2) self.assertEqual(warnings_log, []) - @unittest.skip('TODO: RUSTPYTHON; flaky test') + @unittest.skip("TODO: RUSTPYTHON; flaky test") def test_different_thread(self): return super().test_different_thread() @@ -2153,7 +2249,7 @@ class ConditionAsRLockTests(lock_tests.RLockTests): def test_recursion_count(self): self.skipTest("Condition does not expose _recursion_count()") - @unittest.skip('TODO: RUSTPYTHON; flaky test') + @unittest.skip("TODO: RUSTPYTHON; flaky test") def test_different_thread(self): return super().test_different_thread() @@ -2179,6 +2275,118 @@ def test__all__(self): support.check__all__(self, threading, ('threading', '_thread'), extra=extra, not_exported=not_exported) + @unittest.skipUnless(hasattr(_thread, 'set_name'), "missing _thread.set_name") + @unittest.skipUnless(hasattr(_thread, '_get_name'), "missing _thread._get_name") + def test_set_name(self): + # Ensure main thread name is restored after test + self.addCleanup(_thread.set_name, _thread._get_name()) + + # set_name() limit in bytes + truncate = getattr(_thread, "_NAME_MAXLEN", None) + limit = truncate or 100 + + tests = [ + # test short ASCII name + "CustomName", + + # test short non-ASCII name + "namé€", + + # embedded null character: name is truncated + # at the first null character + "embed\0null", + + # Test long ASCII names (not truncated) + "x" * limit, + + # Test long ASCII names (truncated) + "x" * (limit + 10), + + # Test long non-ASCII name (truncated) + "x" * (limit - 1) + "é€", + + # Test long non-BMP names (truncated) creating surrogate pairs + # on Windows + "x" * (limit - 1) + "\U0010FFFF", + "x" * (limit - 2) + "\U0010FFFF" * 2, + "x" + "\U0001f40d" * limit, + "xx" + "\U0001f40d" * limit, + "xxx" + "\U0001f40d" * limit, + "xxxx" + "\U0001f40d" * limit, + ] + if os_helper.FS_NONASCII: + tests.append(f"nonascii:{os_helper.FS_NONASCII}") + if os_helper.TESTFN_UNENCODABLE: + tests.append(os_helper.TESTFN_UNENCODABLE) + + if sys.platform.startswith("sunos"): + # Use ASCII encoding on Solaris/Illumos/OpenIndiana + encoding = "ascii" + else: + encoding = sys.getfilesystemencoding() + + def work(): + nonlocal work_name + work_name = _thread._get_name() + + for name in tests: + if not support.MS_WINDOWS: + encoded = name.encode(encoding, "replace") + if b'\0' in encoded: + encoded = encoded.split(b'\0', 1)[0] + if truncate is not None: + encoded = encoded[:truncate] + if sys.platform.startswith("sunos"): + expected = encoded.decode("ascii", "surrogateescape") + else: + expected = os.fsdecode(encoded) + else: + size = 0 + chars = [] + for ch in name: + if ord(ch) > 0xFFFF: + size += 2 + else: + size += 1 + if size > truncate: + break + chars.append(ch) + expected = ''.join(chars) + + if '\0' in expected: + expected = expected.split('\0', 1)[0] + + with self.subTest(name=name, expected=expected, thread="main"): + _thread.set_name(name) + self.assertEqual(_thread._get_name(), expected) + + with self.subTest(name=name, expected=expected, thread="worker"): + work_name = None + thread = threading.Thread(target=work, name=name) + thread.start() + thread.join() + self.assertEqual(work_name, expected, + f"{len(work_name)=} and {len(expected)=}") + + @unittest.skipUnless(hasattr(_thread, 'set_name'), "missing _thread.set_name") + @unittest.skipUnless(hasattr(_thread, '_get_name'), "missing _thread._get_name") + def test_change_name(self): + # Change the name of a thread while the thread is running + + name1 = None + name2 = None + def work(): + nonlocal name1, name2 + name1 = _thread._get_name() + threading.current_thread().name = "new name" + name2 = _thread._get_name() + + thread = threading.Thread(target=work, name="name") + thread.start() + thread.join() + self.assertEqual(name1, "name") + self.assertEqual(name2, "new name") + class InterruptMainTests(unittest.TestCase): def check_interrupt_main_with_signal_handler(self, signum): @@ -2204,7 +2412,7 @@ def check_interrupt_main_noerror(self, signum): # Restore original handler signal.signal(signum, handler) - @unittest.skip('TODO: RUSTPYTHON; flaky') + @unittest.skip("TODO: RUSTPYTHON; flaky") @requires_gil_enabled("gh-118433: Flaky due to a longstanding bug") def test_interrupt_main_subthread(self): # Calling start_new_thread with a function that executes interrupt_main diff --git a/Lib/test/test_threading_local.py b/Lib/test/test_threading_local.py index 8d752dbb7aa..0c805c5b055 100644 --- a/Lib/test/test_threading_local.py +++ b/Lib/test/test_threading_local.py @@ -26,7 +26,7 @@ def target(local, weaklist): class BaseLocalTest: - @unittest.skip('TODO: RUSTPYTHON; flaky test') + @unittest.skip("TODO: RUSTPYTHON; flaky test") def test_local_refs(self): self._local_refs(20) self._local_refs(50) @@ -228,14 +228,14 @@ def __eq__(self, other): class ThreadLocalTest(unittest.TestCase, BaseLocalTest): _local = _thread._local - @unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: TypeError not raised by _local - def test_arguments(self): - return super().test_arguments() - - @unittest.expectedFailure # TODO: RUSTPYTHON + @unittest.expectedFailure # TODO: RUSTPYTHON def test_cycle_collection(self): return super().test_cycle_collection() + @unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: TypeError not raised by _local + def test_arguments(self): + return super().test_arguments() + class PyThreadingLocalTest(unittest.TestCase, BaseLocalTest): _local = _threading_local.local From d7a79020034c86a3265ddddad52031ac36c54906 Mon Sep 17 00:00:00 2001 From: ShaharNaveh <50263213+ShaharNaveh@users.noreply.github.com> Date: Wed, 3 Jun 2026 09:49:19 +0300 Subject: [PATCH 2/5] Mark failing tests --- Lib/test/test_thread.py | 1 + Lib/test/test_threading.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py index ac924728feb..b5fee6dd36c 100644 --- a/Lib/test/test_thread.py +++ b/Lib/test/test_thread.py @@ -322,6 +322,7 @@ def test_set_done_unstarted(self): with self.assertRaisesRegex(RuntimeError, "thread not started"): handle._set_done() + @unittest.skip("TODO: RUSTPYTHON; panic") def test_start_duplicate_handle(self): lock = thread.allocate_lock() lock.acquire() diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index be1f72b30e3..5bee24cc7f6 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -323,6 +323,7 @@ def f(mutex): # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) # exposed at the Python level. This test relies on ctypes to get at it. + @unittest.expectedFailure # TODO: RUSTPYTHON def test_PyThreadState_SetAsyncExc(self): ctypes = import_module("ctypes") @@ -1182,6 +1183,7 @@ def __del__(self): self.assertEqual(out.strip(), b"OK") self.assertIn(b"can't create new thread at interpreter shutdown", err) + @unittest.expectedFailure # TODO: RUSTPYTHON def test_join_daemon_thread_in_finalization(self): # gh-123940: Py_Finalize() prevents other threads from running Python # code, so join() can not succeed unless the thread is already done. From 14de105c1ca0790f3af985380394802e974a4c6c Mon Sep 17 00:00:00 2001 From: ShaharNaveh <50263213+ShaharNaveh@users.noreply.github.com> Date: Wed, 3 Jun 2026 10:30:04 +0300 Subject: [PATCH 3/5] nits --- crates/vm/src/stdlib/_thread.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/crates/vm/src/stdlib/_thread.rs b/crates/vm/src/stdlib/_thread.rs index 0af8d7add38..83af08188f1 100644 --- a/crates/vm/src/stdlib/_thread.rs +++ b/crates/vm/src/stdlib/_thread.rs @@ -53,6 +53,7 @@ pub(crate) mod _thread { // this is a value in seconds #[pyattr] const TIMEOUT_MAX: f64 = (TIMEOUT_MAX_IN_MICROSECONDS / 1_000_000) as f64; + #[pyattr] fn error(vm: &VirtualMachine) -> PyTypeRef { vm.ctx.exceptions.runtime_error.to_owned() @@ -76,22 +77,23 @@ pub(crate) mod _thread { Ok(true) } true if timeout < 0.0 => { - Err(vm - .new_value_error("timeout value must be a non-negative number".to_owned())) + Err(vm.new_value_error("timeout value must be a non-negative number")) } true => { if timeout > TIMEOUT_MAX { - return Err(vm.new_overflow_error("timeout value is too large".to_owned())); + return Err(vm.new_overflow_error("timeout value is too large")); } Ok(vm.allow_threads(|| mu.try_lock_for(Duration::from_secs_f64(timeout)))) } - false if timeout != -1.0 => Err(vm - .new_value_error("can't specify a timeout for a non-blocking call".to_owned())), + false if timeout != -1.0 => { + Err(vm.new_value_error("can't specify a timeout for a non-blocking call")) + } false => Ok(mu.try_lock()), } }}; } + macro_rules! repr_lock_impl { ($zelf:expr) => {{ let status = if $zelf.mu.is_locked() { From 1a6d0f9c7a52bc986192ae88873663028677dd3c Mon Sep 17 00:00:00 2001 From: ShaharNaveh <50263213+ShaharNaveh@users.noreply.github.com> Date: Wed, 3 Jun 2026 11:35:56 +0300 Subject: [PATCH 4/5] Validate stack_size --- crates/vm/src/stdlib/_thread.rs | 38 ++++++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/crates/vm/src/stdlib/_thread.rs b/crates/vm/src/stdlib/_thread.rs index 83af08188f1..7291026ed58 100644 --- a/crates/vm/src/stdlib/_thread.rs +++ b/crates/vm/src/stdlib/_thread.rs @@ -15,7 +15,7 @@ pub(crate) use _thread::{ pub(crate) mod _thread { use crate::{ AsObject, Py, PyPayload, PyRef, PyResult, VirtualMachine, - builtins::{PyDictRef, PyStr, PyTupleRef, PyType, PyTypeRef, PyUtf8StrRef}, + builtins::{PyDictRef, PyIntRef, PyStr, PyTupleRef, PyType, PyTypeRef, PyUtf8StrRef}, common::wtf8::Wtf8Buf, frame::FrameRef, function::{ArgCallable, FuncArgs, KwArgs, OptionalArg, PySetterValue, TimeoutSeconds}, @@ -50,6 +50,26 @@ pub(crate) mod _thread { #[cfg(target_os = "windows")] const TIMEOUT_MAX_IN_MICROSECONDS: i64 = 0xffffffff * 1_000; + /// [CPython `SYSTEM_PAGE_SIZE`](https://github.com/python/cpython/blob/v3.14.5/Include/internal/pycore_obmalloc.h#L170) + const SYSTEM_PAGE_SIZE: usize = 4 * 1024; + + // TODO: Check for sanitize once https://github.com/rust-lang/rust/issues/39699 is closed + /// [CPython `_PyOS_LOG2_STACK_MARGIN`](https://github.com/python/cpython/blob/v3.14.5/Include/internal/pycore_pythonrun.h#L41-L47) + const PY_OS_LOG2_STACK_MARGIN: u32 = cfg_select! { + debug_assertions => 12, + _ => 11, + }; + + /// [CPython `_PyOS_STACK_MARGIN`](https://github.com/python/cpython/blob/v3.14.5/Include/internal/pycore_pythonrun.h#L48) + const PY_OS_STACK_MARGIN: usize = 1 << PY_OS_LOG2_STACK_MARGIN; + + /// [CPython `_PyOS_STACK_MARGIN_BYTES`](https://github.com/python/cpython/blob/v3.14.5/Include/internal/pycore_pythonrun.h#L49) + const PY_OS_STACK_MARGIN_BYTES: usize = PY_OS_STACK_MARGIN * size_of::<*const ()>(); + + // TODO: Check for sanitize once https://github.com/rust-lang/rust/issues/39699 is closed + /// [CPython `_PyOS_MIN_STACK_SIZE`](https://github.com/python/cpython/blob/v3.14.5/Include/internal/pycore_pythonrun.h#L57-L61) + const PY_OS_MIN_STACK_SIZE: usize = PY_OS_STACK_MARGIN_BYTES * 3; + // this is a value in seconds #[pyattr] const TIMEOUT_MAX: f64 = (TIMEOUT_MAX_IN_MICROSECONDS / 1_000_000) as f64; @@ -605,10 +625,18 @@ pub(crate) mod _thread { } #[pyfunction] - fn stack_size(size: OptionalArg, vm: &VirtualMachine) -> usize { - let size = size.unwrap_or(0); - // TODO: do validation on this to make sure it's not too small - vm.state.stacksize.swap(size) + fn stack_size(size: OptionalArg, vm: &VirtualMachine) -> PyResult { + const MIN_SIZE: usize = PY_OS_MIN_STACK_SIZE + SYSTEM_PAGE_SIZE; + + let Ok(size) = size.map_or(Ok(0), |v| v.try_to_primitive(vm)) else { + return Err(vm.new_value_error(format!("size must be at least {MIN_SIZE} bytes"))); + }; + + if size != 0 && size < MIN_SIZE { + return Err(vm.new_value_error(format!("size must be at least {MIN_SIZE} bytes"))); + } + + Ok(vm.state.stacksize.swap(size)) } #[pyfunction] From 52acc28b7a58d0b7846d4c837c0ecbba685ba9ed Mon Sep 17 00:00:00 2001 From: ShaharNaveh <50263213+ShaharNaveh@users.noreply.github.com> Date: Wed, 3 Jun 2026 11:46:08 +0300 Subject: [PATCH 5/5] Mark failing --- Lib/test/test_thread.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py index b5fee6dd36c..bf04cc536dc 100644 --- a/Lib/test/test_thread.py +++ b/Lib/test/test_thread.py @@ -150,6 +150,7 @@ def task(): support.gc_collect() # For PyPy or other GCs. self.assertEqual(thread._count(), orig) + @unittest.expectedFailure # TODO: RUSTPYTHON def test_unraisable_exception(self): def task(): started.release() @@ -322,7 +323,7 @@ def test_set_done_unstarted(self): with self.assertRaisesRegex(RuntimeError, "thread not started"): handle._set_done() - @unittest.skip("TODO: RUSTPYTHON; panic") + @unittest.skipIf(__import__("sys").platform == "linux", "TODO: RUSTPYTHON; panic") def test_start_duplicate_handle(self): lock = thread.allocate_lock() lock.acquire() @@ -338,6 +339,7 @@ def func(): lock.release() handle.join() + @unittest.skipIf(__import__("sys").platform == "linux", "TODO: RUSTPYTHON; panic") def test_start_with_none_handle(self): def func(): pass