Skip to content

Semaphores (limit concurrency)

What is a semaphore?

A semaphore controls access to a limited resource.

Example: only 3 workers may access an API at once.

Example

semaphore.py
import threading
import time
 
sem = threading.Semaphore(3)
 
 
def worker(i: int) -> None:
    with sem:
        print("start", i)
        time.sleep(0.5)
        print("end", i)
 
 
threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()
semaphore.py
import threading
import time
 
sem = threading.Semaphore(3)
 
 
def worker(i: int) -> None:
    with sem:
        print("start", i)
        time.sleep(0.5)
        print("end", i)
 
 
threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

BoundedSemaphore

BoundedSemaphoreBoundedSemaphore raises an error if you release too many times.

Use it to catch bugs.

bounded_semaphore.py
import threading
 
sem = threading.BoundedSemaphore(1)
sem.acquire()
sem.release()
# sem.release()  # would raise ValueError
bounded_semaphore.py
import threading
 
sem = threading.BoundedSemaphore(1)
sem.acquire()
sem.release()
# sem.release()  # would raise ValueError

๐Ÿงช Try It Yourself

Exercise 1 โ€“ Create a Semaphore

Exercise 2 โ€“ Semaphore as Context Manager

Exercise 3 โ€“ BoundedSemaphore

If this helped you, consider buying me a coffee โ˜•

Buy me a coffee

Was this page helpful?

Let us know how we did