8000 BUG: poor multithreaded performance scaling for small arrays (nogil) · Issue #27786 · numpy/numpy · GitHub
[go: up one dir, main page]

Skip to content

BUG: poor multithreaded performance scaling for small arrays (nogil) #27786

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
eundersander opened this issue Nov 18, 2024 · 14 comments · Fixed by #27896
Closed

BUG: poor multithreaded performance scaling for small arrays (nogil) #27786

eundersander opened this issue Nov 18, 2024 · 14 comments · Fixed by #27896
Assignees
Labels
00 - Bug 39 - free-threading PRs and issues related to support for free-threading CPython (a.k.a. no-GIL, PEP 703)

Comments

@eundersander
Copy link
eundersander commented Nov 18, 2024

Describe the issue:

Using python 3.13 free threading, I observe that multithreaded performance (MFLOPS) scales poorly for numpy array computation, especially on small arrays. For comparison, performance scales well for (1) multiprocess computation, and (2) multithreaded/multiprocess ordinary python list computation. Although I measure MFLOPS here, I would guess the underlying performance issue is some per-array-access overhead, perhaps a lock that results in thread contention.

In the attached benchmark, the workers are embarrassingly parallel. I create an array/list in each worker thread/process and then do computation on it. I only time the computation, not the setup.

The main takeaway from the plots below is that numpy performance drops dramatically when using 8+ threads in the same process (solid orange line).

The original reporter used an AMD Ryzen Threadripper PRO 5955WX 16-Cores, but @ngoldbaum edited the description and reproducer script and used a Macbook Pro M3 Max to reproduce the original report with the updated script.

mflops_array_length_100

Benchmark text output:

goldbaum at Nathans-MBP in ~/Documents/numpy-experiments
○  PYTHON_GIL=0 python parallel_bench.py --array-length 100 --num-iterations 5000
Python Version: 3.13.0 experimental free-threading build (main, Nov  5 2024, 16:45:19) [Clang 16.0.0 (clang-1600.0.26.3)]
NumPy Version: 2.1.3
os.cpu_count(): 11
1 numpy threads
1 numpy processes
1 list threads
1 list processes
2 numpy threads
2 numpy processes
2 list threads
2 list processes
4 numpy threads
4 numpy processes
4 list threads
4 list processes
8 numpy threads
8 numpy processes
8 list threads
8 list processes
16 numpy threads
16 numpy processes
16 list threads
16 list processes
32 numpy threads
32 numpy processes
32 list threads
32 list processes
64 numpy threads
64 numpy processes
64 list threads
64 list processes
128 numpy threads
128 numpy processes
128 list threads
128 list processes
256 numpy threads
256 numpy processes
256 list threads
256 list processes
512 numpy threads
512 numpy processes
512 list threads
512 list processes
1024 numpy threads
1024 numpy processes
1024 list threads
1024 list processes
2048 numpy threads
2048 numpy processes
2048 list threads
2048 list processes

# Workers  | Numpy Threads           | Numpy Processes         | List Threads            | List Processes
-----------+-------------------------+-------------------------+-------------------------+------------------------
1          | 0.01s, 84.01 MFLOPS     | 0.30s, 3.34 MFLOPS      | 0.01s, 88.63 MFLOPS     | 0.31s, 3.21 MFLOPS
2          | 0.01s, 154.94 MFLOPS    | 0.33s, 6.05 MFLOPS      | 0.01s, 171.32 MFLOPS    | 0.33s, 6.01 MFLOPS
4          | 0.01s, 288.72 MFLOPS    | 0.38s, 10.49 MFLOPS     | 0.01s, 332.36 MFLOPS    | 0.38s, 10.40 MFLOPS
8          | 0.04s, 206.42 MFLOPS    | 0.52s, 15.28 MFLOPS     | 0.02s, 340.70 MFLOPS    | 0.53s, 15.22 MFLOPS
16         | 0.11s, 141.60 MFLOPS    | 1.05s, 15.22 MFLOPS     | 0.02s, 820.67 MFLOPS    | 1.05s, 15.30 MFLOPS
32         | 0.24s, 134.02 MFLOPS    | 2.08s, 15.37 MFLOPS     | 0.02s, 1634.83 MFLOPS   | 2.24s, 14.29 MFLOPS
64         | 0.48s, 132.52 MFLOPS    | 4.06s, 15.78 MFLOPS     | 0.12s, 518.98 MFLOPS    | 4.04s, 15.84 MFLOPS
128        | 2.96s, 43.29 MFLOPS     | 8.07s, 15.87 MFLOPS     | 0.23s, 554.86 MFLOPS    | 8.33s, 15.37 MFLOPS
256        | 15.05s, 17.01 MFLOPS    | N/A                     | 0.45s, 563.20 MFLOPS    | N/A
512        | 49.28s, 10.39 MFLOPS    | N/A                     | 0.23s, 2187.81 MFLOPS   | N/A
1024       | 160.21s, 6.39 MFLOPS    | N/A                     | 1.56s, 657.09 MFLOPS    | N/A
2048       | 599.69s, 3.42 MFLOPS    | N/A                     | 2.58s, 793.79 MFLOPS    | N/A
Plot saved to mflops_array_length_100.png

Reproduce the code example:

import time
import threading
import multiprocessing
import numpy as np
import sys
import os
import argparse
import matplotlib.pyplot as plt

def numpy_worker(barrier, array_length, num_iterations):
    """Worker function for NumPy computations."""
    x = np.arange(array_length, dtype=np.float64)
    barrier.wait()
    for _ in range(num_iterations):
        x += 0.01  # Element-wise operation
        x[0] += x.mean() * 0.01  # Reduction operation

def list_worker(barrier, array_length, 
8000
num_iterations):
    """Worker function for list computations."""
    x = [float(xi) for xi in range(array_length)]
    barrier.wait()  # Synchronize start
    for _ in range(num_iterations):
        x = [xi + 0.01 for xi in x]  # Element-wise operation
        x[0] += sum(x) / len(x) * 0.01  # Reduction operation

def launch_workers(worker_func, num_workers, method, array_length, num_iterations):
    """Launches workers using threading or multiprocessing."""
    if method == 'threads':
        barrier = threading.Barrier(num_workers + 1)
        workers = []
        for _ in range(num_workers):
            t = threading.Thread(target=worker_func, args=(barrier, array_length, num_iterations))
            workers.append(t)
            t.start()
        barrier.wait()  # Synchronize all threads
        start_time = time.time()
        for t in workers:
            t.join()
        end_time = time.time()
    elif method == 'processes':
        # Use a multiprocessing.Event for synchronization
        start_event = multiprocessing.Event()
        workers = []
        for _ in range(num_workers):
            p = multiprocessing.Process(target=worker_func, args=(start_event, array_length, num_iterations))
            workers.append(p)
            p.start()
        start_time = time.time()
        start_event.set()  # Signal all processes to start
        for p in workers:
            p.join()
        end_time = time.time()
    else:
        raise ValueError("Unknown method")
    return start_time, end_time

def run_benchmark(kind, method, num_workers, args):
    """Runs the benchmark for the specified configuration."""
    if kind == 'numpy':
        worker = numpy_worker
    elif kind == 'list':
        worker = list_worker
    else:
        raise ValueError("Unknown kind")

    # Start workers and measure time
    start_time, end_time = launch_workers(worker, num_workers, method, args.array_length, args.num_iterations)

    elapsed_time = end_time - start_time
    total_ops = num_workers * args.num_iterations * args.array_length * 2
    mflop_per_sec = total_ops / (elapsed_time * 1e6)
    return elapsed_time, mflop_per_sec

def print_results_table(results, num_workers_list):
    """Prints the benchmark results in a formatted table."""
    headers = ["# Workers", "Numpy Threads", "Numpy Processes", "List Threads", "List Processes"]
    row_format = "{:<10} | {:<23} | {:<23} | {:<23} | {:<23}"
    separator = "-" * 10 + "-+-" + "-+-".join(["-" * 23] * 4)

    print()
    print(row_format.format(*headers))
    print(separator)
    for num_workers in num_workers_list:
        row = [str(num_workers)]
        for key in ["numpy_threads", "numpy_processes", "list_threads", "list_processes"]:
            if key in results[num_workers]:
                elapsed, mflop = results[num_workers][key]
                row.append(f"{elapsed:.2f}s, {mflop:.2f} MFLOPS")
            else:
                row.append("N/A")
        print(row_format.format(*row))

def save_plot(plot_data, array_length):
    """Generates and saves the benchmark plot."""
    plt.figure(figsize=(6.5, 4), tight_layout=True)
    styles = {
        'numpy_threads': ('solid', '#FFA500'),
        'numpy_processes': ('dotted', '#FFA500'),
        'list_threads': ('solid', '#1E90FF'),
        'list_processes': ('dotted', '#1E90FF'),
    }

    for key in plot_data:
        x = plot_data[key]['num_workers']
        y = plot_data[key]['mflop_per_sec']
        linestyle, color = styles[key]
        plt.plot(x, y, linestyle=linestyle, color=color, marker='o', label=key.replace('_', ' ').title(), linewidth=2)

    # add labels for the far-right datapoints
    printed_labels = []
    # Add padding to ensure labels outside the plot fit within the figure
    plt.gcf().subplots_adjust(right=0.8)
    for key in plot_data:
        x = plot_data[key]['num_workers']
        y = plot_data[key]['mflop_per_sec']
        _, color = styles[key]

        # Check if the current value is sufficiently distinct from previously printed labels
        if all(abs(y[-1] - prev) / max(y[-1], prev) > 0.2 for prev in printed_labels):
            # Position the label just outside the right edge of the plot
            x_pos = plt.xlim()[1] * 1.25  # 125% of the x-axis range (outside the plot)
            plt.text(
                x_pos, y[-1], f"{y[-1]:.1f}",
                fontsize=12, color=color, ha='left', va='center'
            )
            printed_labels.append(y[-1])  # Mark this label as printed
            
    plt.xlabel('Number of Workers', fontsize=14)
    plt.ylabel('MFLOPS', fontsize=14)
    plt.title(f'MFLOPS for array length {array_length}', fontsize=16)
    plt.legend(fontsize=12)
    plt.xscale('log', base=2)
    plt.yscale('log')
    plt.xticks(plot_data['numpy_threads']['num_workers'], labels=plot_data['numpy_threads']['num_workers'], fontsize=12)
    plt.yticks(fontsize=12)
    plt.grid(True, which="both", ls="--", linewidth=0.5)
    plt.gcf().subplots_adjust(right=0.9, bottom=0.2)
    filename = f'mflops_array_length_{array_length}.png'
    plt.savefig(filename)
    print(f'Plot saved to {filename}')

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Benchmark multithreading and multiprocessing with NumPy and list operations.')
    parser.add_argument('--array-length', type=int, default=1000, help='Length of the arrays/lists used in computations.')
    parser.add_argument('--num-iterations', type=int, default=4000, help='Number of iterations each worker performs.')
    parser.add_argument('--num-workers-list', type=int, nargs='+', default=[1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048], help='List of worker counts to benchmark.')
    args = parser.parse_args()

    print(f"Python Version: {sys.version}")
    print(f"NumPy Version: {np.__version__}")
    print(f"os.cpu_count(): {os.cpu_count()}")

    results = {num_workers: {} for num_workers in args.num_workers_list}
    plot_data = {
        'numpy_threads': {'num_workers': [], 'mflop_per_sec': []},
        'numpy_processes': {'num_workers': [], 'mflop_per_sec': []},
        'list_threads': {'num_workers': [], 'mflop_per_sec': []},
        'list_processes': {'num_workers': [], 'mflop_per_sec': []},
    }

    for num_workers in args.num_workers_list:
        for kind in ['numpy', 'list']:
            for method in ['threads', 'processes']:
                print(num_workers, kind, method)
                if num_workers > 128 and method == 'processes':
                    continue
                elapsed_time, mflop_per_sec = run_benchmark(kind, method, num_workers, args)
                key = f"{kind}_{method}"
                results[num_workers][key] = (elapsed_time, mflop_per_sec)
                plot_data[key]['num_workers'].append(num_workers)
                plot_data[key]['mflop_per_sec'].append(mflop_per_sec)

    print_results_table(results, args.num_workers_list)
    save_plot(plot_data, args.array_length)

Error message:

No response

Python and NumPy Versions:

2.1.3
3.13.0 experimental free-threading build | packaged by conda-forge | (main, Oct 8 2024, 20:16:19) [GCC 13.3.0]

Runtime Environment:


[{'numpy_version': '2.1.3',
  'python': '3.13.0 experimental free-threading build | packaged by '
            'conda-forge | (main, Oct  8 2024, 20:16:19) [GCC 13.3.0]',
  'uname': uname_result(system='Linux', node='eric-Lambda-Vector', release='6.8.0-48-generic', version='#48~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Mon Oct  7 11:24:13 UTC 2', machine='x86_64')},
 {'simd_extensions': {'baseline': ['SSE', 'SSE2', 'SSE3'],
                      'found': ['SSSE3',
                                'SSE41',
                                'POPCNT',
                                'SSE42',
                                'AVX',
                                'F16C',
                                'FMA3',
                                'AVX2'],
                      'not_found': ['AVX512F',
                                    'AVX512CD',
                                    'AVX512_KNL',
                                    'AVX512_KNM',
                                    'AVX512_SKX',
                                    'AVX512_CLX',
                                    'AVX512_CNL',
                                    'AVX512_ICL']}},
 {'architecture': 'Haswell',
  'filepath': '/home/eric/miniforge3/envs/habitat-lab3-py313/lib/python3.13t/site-packages/numpy.libs/libscipy_openblas64_-ff651d7f.so',
  'internal_api': 'openblas',
  'num_threads': 32,
  'prefix': 'libscipy_openblas',
  'threading_layer': 'pthreads',
  'user_api': 'blas',
  'version': '0.3.27'}]

AMD Ryzen Threadripper PRO 5955WX 16-Cores

Context for the issue:

Our application is RL training with a robotics simulator. We use multiprocessing, with each worker doing mostly-independent CPU-heavy work. I was excited to try python 3.13 free-threading to reduce the cost of gathering results from workers--use multithreading instead of multiprocessing and thus avoid interprocess communication overhead. Instead, I see a big drop in overall performance. We use a lot of small numpy arrays for 3D math (3D positions, 4D rotation quaternions, 4x4 transform matrices, etc.).

@mattip
Copy link
Member
mattip commented Nov 18, 2024

Whithout diving in too deeply, are you controlling for the OpenBLAS threading underneath NumPy? You might want to use threadpoolctl or set OMP_NUM_THREADS=1

@eundersander
Copy link
Author

Whithout diving in too deeply, are you controlling for the OpenBLAS threading underneath NumPy? You might want to use threadpoolctl or set OMP_NUM_THREADS=1

I tried this and I don't see any significant change in results.

Using OMP_NUM_THREADS=1:

(habitat-lab3-py313) .../eric/projects/habitat-lab3> OMP_NUM_THREADS=1 PYTHON_GIL=0 pyth
on bench_parallel_numpy2.py --array-length 1000 --num-iterations 5000
Python Version: 3.13.0 experimental free-threading build | packaged by conda-forge | (main, Oct  8 2024, 20:16:19) [GCC 13.3.0]
NumPy Version: 2.1.3
os.cpu_count(): 32

# Workers  | Numpy Threads           | Numpy Processes         | List Threads            | List Processes         
-----------+-------------------------+-------------------------+-------------------------+------------------------
1          | 0.02s, 491.25 MFLOPS    | 0.02s, 435.66 MFLOPS    | 0.16s, 61.01 MFLOPS     | 0.14s, 69.07 MFLOPS    
2          | 0.03s, 690.85 MFLOPS    | 0.02s, 892.60 MFLOPS    | 0.17s, 118.17 MFLOPS    | 0.15s, 136.72 MFLOPS   
4          | 0.03s, 1282.00 MFLOPS   | 0.04s, 1005.90 MFLOPS   | 0.17s, 237.00 MFLOPS    | 0.17s, 234.51 MFLOPS   
8          | 0.03s, 3016.43 MFLOPS   | 0.04s, 1994.34 MFLOPS   | 0.17s, 461.44 MFLOPS    | 0.20s, 406.46 MFLOPS   
16         | 0.07s, 2204.74 MFLOPS   | 0.04s, 4028.53 MFLOPS   | 0.20s, 782.43 MFLOPS    | 0.19s, 835.31 MFLOPS   
32         | 0.75s, 426.18 MFLOPS    | 0.05s, 6079.83 MFLOPS   | 0.32s, 1012.60 MFLOPS   | 0.27s, 1186.59 MFLOPS  
64         | 1.69s, 377.93 MFLOPS    | 0.09s, 6768.71 MFLOPS   | 0.52s, 1227.61 MFLOPS   | 0.57s, 1122.68 MFLOPS  
128        | 11.68s, 109.56 MFLOPS   | 0.18s, 7025.79 MFLOPS   | 1.24s, 1031.60 MFLOPS   | 1.11s, 1150.95 MFLOPS  
Plot saved to mflops_array_length_1000.png

Using with threadpool_limits(limits=1, user_api='blas'):

(habitat-lab3-py313) .../eric/projects/habitat-lab3> git diff -- *bench*
diff --git a/bench_parallel_numpy2.py b/bench_parallel_numpy2.py
index 37ecd80e..6010677b 100644
--- a/bench_parallel_numpy2.py
+++ b/bench_parallel_numpy2.py
@@ -6,6 +6,7 @@ import sys
 import os
 import argparse
 import matplotlib.pyplot as plt
+from threadpoolctl import threadpool_limits
 
 def numpy_worker(sync_func, array_length, num_iterations):
     """Worker function for NumPy computations."""
@@ -162,14 +163,15 @@ if __name__ == '__main__':
         'list_processes': {'num_workers': [], 'mflop_per_sec': []}
     }
 
-    for num_workers in args.num_workers_list:
-        for kind in ['numpy', 'list']:
-            for method in ['threads', 'processes']:
-                elapsed_time, mflop_per_sec = run_benchmark(kind, method, num_workers, args)
-                key = f"{kind}_{method}"
-                results[num_workers][key] = (elapsed_time, mflop_per_sec)
-                plot_data[key]['num_workers'].append(num_workers)
-                plot_data[key]['mflop_per_sec'].append(mflop_per_sec)
+    with threadpool_limits(limits=1, user_api='blas'):
+        for num_workers in args.num_workers_list:
+            for kind in ['numpy', 'list']:
+                for method in ['threads', 'processes']:
+                    elapsed_time, mflop_per_sec = run_benchmark(kind, method, num_workers, args)
+                    key = f"{kind}_{method}"
+                    results[num_workers][key] = (elapsed_time, mflop_per_sec)
+                    plot_data[key]['num_workers'].append(num_workers)
+                    plot_data[key]['mflop_per_sec'].append(mflop_per_sec)
 
     print_results_table(results, args.num_workers_list)
     save_plot(plot_data, args.array_length)
(habitat-lab3-py313) .../eric/projects/habitat-lab3> PYTHON_GIL=0 python bench_par
allel_numpy2.py --array-length 1000 --num-iterations 5000
Python Version: 3.13.0 experimental free-threading build | packaged by conda-forge | (main, Oct  8 2024, 20:16:19) [GCC 13.3.0]
NumPy Version: 2.1.3
os.cpu_count(): 32

# Workers  | Numpy Threads           | Numpy Processes         | List Threads            | List Processes         
-----------+-------------------------+-------------------------+-------------------------+------------------------
1          | 0.02s, 495.42 MFLOPS    | 0.02s, 439.03 MFLOPS    | 0.17s, 59.29 MFLOPS     | 0.15s, 68.26 MFLOPS    
2          | 0.02s, 972.59 MFLOPS    | 0.02s, 908.35 MFLOPS    | 0.17s, 119.74 MFLOPS    | 0.15s, 136.52 MFLOPS   
4          | 0.02s, 1655.31 MFLOPS   | 0.03s, 1555.79 MFLOPS   | 0.17s, 233.46 MFLOPS    | 0.17s, 235.38 MFLOPS   
8          | 0.04s, 1951.72 MFLOPS   | 0.03s, 3120.99 MFLOPS   | 0.17s, 468.99 MFLOPS    | 0.15s, 528.60 MFLOPS   
16         | 0.07s, 2401.98 MFLOPS   | 0.04s, 3770.33 MFLOPS   | 0.26s, 613.33 MFLOPS    | 0.23s, 688.85 MFLOPS   
32         | 1.02s, 313.50 MFLOPS    | 0.06s, 5601.79 MFLOPS   | 0.30s, 1052.49 MFLOPS   | 0.27s, 1201.13 MFLOPS  
64         | 1.92s, 334.18 MFLOPS    | 0.09s, 6875.34 MFLOPS   | 0.51s, 1266.58 MFLOPS   | 0.58s, 1098.56 MFLOPS  
128        | 12.58s, 101.74 MFLOPS   | 0.18s, 7236.04 MFLOPS   | 1.23s, 1036.92 MFLOPS   | 1.10s, 1165.71 MFLOPS  
Plot saved to mflops_array_length_1000.png

@mattip
Copy link
Member
mattip commented Nov 18, 2024

Thanks. Now that I look at this more carefully, it seems there is some thread contention limiting NumPy's performance. Looking only at the "NumPy Threads" graphs, they appear to scale "per call": i.e. if you divide the mflops by the vector length, the graphs will align. The core calculation x[0] += x.mean() * 0.01 uses mean() which is roughly y = x.sum(); x[0] += y / np.intp(np.prod(x.shape)). The call to x.sum() must allocate a new ndarray. for the answer. I wonder what happens if you avoid the allocation by using x[0] += x.mean(out=y) * 0.01 where y=np.empty(1, dtype=np.float64).

@rgommers rgommers added the 39 - free-threading PRs and issues related to support for free-threading CPython (a.k.a. no-GIL, PEP 703) label Nov 20, 2024
@ngoldbaum
Copy link
Member
ngoldbaum commented Nov 20, 2024

When I run the attached script, I get:

AttributeError: Can't get local object 'launch_workers.<locals>.sync_func'

My understanding is that closures aren't pickleable and you can't use them with multiprocessing like you're doing with sync_func in the multiprocessing case. Did you make some edits before submitting the issue maybe?

That said if I hack the script to skip the multiprocessing benchmark then I am able to reproduce the behavior described in the issue:

mflops_array_length_10

@eundersander
Copy link
Author
eundersander commented Nov 21, 2024

it seems there is some thread contention limiting NumPy's performance
they appear to scale "per call"

Agreed! Python free-threading has biased reference counting , "a fast-path for objects owned by the current thread", I wonder if numpy could do something similar for whatever it's doing per call.

AttributeError: Can't get local object 'launch_workers..sync_func'
Did you make some edits before submitting the issue maybe?

Hmmm, the script runs as-is for me. Maybe some platform or version difference?

Here's an alternate snippet that just removes the synchronization for the multiprocessing case. So, the timing includes the setup (not just computation), which is sloppy, but you'll still get similar results.

def numpy_worker(sync_func, array_length, num_iterations):
    """Worker function for NumPy computations."""
    x = np.arange(array_length, dtype=np.float64)
    if sync_func:
        sync_func()  # Synchronize start
    for _ in range(num_iterations):
        x += 0.01  # Element-wise operation
        x[0] += x.mean() * 0.01  # Reduction operation

def list_worker(sync_func, array_length, num_iterations):
    """Worker function for list computations."""
    x = [float(xi) for xi in range(array_length)]
    if sync_func:
        sync_func()  # Synchronize start
    for _ in range(num_iterations):
        x = [xi + 0.01 for xi in x]  # Element-wise operation
        x[0] += sum(x) / len(x) * 0.01  # Reduction operation

def launch_workers(worker_func, num_workers, method, array_length, num_iterations):
    """Launches workers using threading or multiprocessing."""
    if method == 'threads':
        barrier = threading.Barrier(num_workers + 1)
        def sync_func():
            barrier.wait()
        workers = []
        for _ in range(num_workers):
            t = threading.Thread(target=worker_func, args=(sync_func, array_length, num_iterations))
            workers.append(t)
            t.start()
        barrier.wait()  # Synchronize all threads
        start_time = time.time()
        for t in workers:
            t.join()
        end_time = time.time()
    elif method == 'processes':
        start_time = time.time()
        workers = []
        for _ in range(num_workers):
            p = multiprocessing.Process(target=worker_func, args=(None, array_length, num_iterations))
            workers.append(p)
            p.start()
        for p in workers:
            p.join()
        end_time = time.time()
    else:
        raise ValueError("Unknown method")
    return start_time, end_time

@ngoldbaum
Copy link
Member

Thanks! I wonder if your site environment patches python to use something like dill or cloudpickle instead of the regular pickle. e.g..

I'm going to try to profile this today using a few tools to see if this is due to lock contention inside CPython or inside NumPy. If it's due to locking that we added in NumPy to make things thread-safe, maybe we can simply disable it or find a quick fix for the scaling issue.

It's also probably worth adding documentation to py-free-threading.github.io to help with debugging issues like this. This is the first time this has come up so far in NumPy but doubtless it will happen a lot across the ecosystem.

@ngoldbaum
Copy link
Member
ngoldbaum commented Nov 21, 2024

I rewrote the test script to pass the threading.Barrier or multiprocessing.Event into the worker function (which avoids the issue with pickling a closure), added some extra prints and skipped the very slow multiprocessing-based runs for large worker counts.

I went ahead and edited the issue description so it only includes one graph and includes working reproducer code, hope you don't mind @eundersander.

@ngoldbaum
Copy link
Member
ngoldbaum commented Nov 21, 2024

I've attached a flamegraph I generated using the cargo-flamegraph CLI tool, which seems to do a great job of sampling Python C extensions.

image

I ran this script:

import threading

import numpy as np

def numpy_worker(barrier, array_length, num_iterations):
    """Worker function for NumPy computations."""
    x = np.arange(array_length, dtype=np.float64)
    barrier.wait()
    for _ in range(num_iterations):
        x += 0.01  # Element-wise operation
        x[0] += x.mean() * 0.01  # Reduction operation
    print(f"ended {threading.get_ident()}")

num_workers = 1024

barrier = threading.Barrier(num_workers + 1)

workers = []
for _ in range(num_workers):
    t = threading.Thread(target=numpy_worker, args=(barrier, 100, 5000))
    workers.append(t)
    t.start()
barrier.wait()

[t.join() for t in workers]

So clearly the way we're using a critical section to lock the ufunc identity cache doesn't scale well:

PyObject *info;
Py_BEGIN_CRITICAL_SECTION((PyObject *)ufunc);
info = promote_and_get_info_and_ufuncimpl(ufunc,
ops, signature, op_dtypes, legacy_promotion_is_possible);
Py_END_CRITICAL 8000 _SECTION();

I wonder how hard it would be to make the ufunc idetity cache per-thread. It's tricky because the cache hangs off of a global ufunc object. If instead we had some kind of thread-local registry mapping ufuncs to the corresponding identity cache that would avoid the need for any kind of locking.

@colesbury I'd appreciate your thoughts here.

@colesbury
Copy link

Yeah, we should make the fast path of promote_and_get_info_and_ufuncimpl scale well. I think the important part is just these few lines, if I understand correctly:

PyObject *info = PyArrayIdentityHash_GetItem(ufunc->_dispatch_cache,
(PyObject **)op_dtypes);
if (info != NULL && PyObject_TypeCheck(
PyTuple_GET_ITEM(info, 1), &PyArrayMethod_Type)) {
/* Found the ArrayMethod and NOT a promoter: return it */
return info;
}

One option is to use a a readers-writer lock. The downsides are:

  • They don't necessarily scale that well, even for read-only workloads, because of contention of the count of the number of readers. That may or may not be an issue here. It's hard to know without testing it
  • It's not part of the public Python C API or the C standard library. So we might need to use a mix of pthreads and Windows API.

I'll think about it some more.

@seberg
Copy link
Member
seberg commented Nov 22, 2024

Yeah, it's basically a custom dict lookup that needs to be safe (and if that fails, we shouldn't worry about speed).
I could imagine that having a custom hashmap is just too terrible in a free-threaded world. IIRC it was maybe 15%-20% faster for small arrays, but maybe we can (ab)use dicts and still shave off most of that anyway...

@ngoldbaum
Copy link
Member

Unfortunately it's not just that there's a custom hashmap, the problem is that if you have a cache miss and lots of threads are doing similar ufunc operations then you're almost certain to have races inside promote_and_get_ufuncimpl outside of the custom hashmap implementation.

@colesbury
Copy link

I think that's fine -- we can lock around the other parts.

@ngoldbaum
Copy link
Member

See #27859

@ngoldbaum
Copy link
Member

See #27896 for a second try at a fix :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
00 - Bug 39 - free-threading PRs and issues related to support for free-threading CPython (a.k.a. no-GIL, PEP 703)
Projects
None yet
6 participants
0