Python Threading / Concurrency Example
Threads are processes which run in parallel to other threads. In a utopian scenario, if you split a big process in 2 threads, these threads will run in parallel so it would take half the time.
This is not true in most cases. Using CPython, there is a mutex that prevents multiple native threads from executing Python byte codes at once. It’s called GIL (global interpreter lock). This lock is necessary mainly because CPython’s memory management is not thread-safe, but notice that I/O, image processing, and other potentially blocking operations, happen outside the GIL, so it will only become a bottle neck in processes that spend a lot of time inside the GIL.
In most applications nowadays, concurrency is something we all must be able to handle. Mostly in web applications, where one request usually starts a thread, we need to have concurrency and threading in mind so we can write our programs accordingly.
Threading is also a good solution to optimize response times. Given a scenario in which we have to process 4 million objects, a good thing to do would be divide them in 4 groups of a million objects and process them in 4 separated threads.
1. Python _thread module
The _thread
module is very effective for low level threading, let’s see an example to understand the concept. But keep in mind that since Python 2.4, this module is not used anymore.
The process of spawning a thread is pretty simple. We just need to call a method called start_new_thread
, available in the _thread
module, which receives a function, and arguments to pass to it. It returns immediately and a thread will run in parallel. Let’s see:
import _thread as thread import time executed_count = 0 # Define a function for the thread def print_time(thread_name, delay): global executed_count count = 0 while count < 5: time.sleep(delay) count += 1 print("%s: %s" % (thread_name, time.ctime(time.time()))) executed_count += 1 # Create two threads as follows try: threads = [thread.start_new_thread(print_time, ("Thread-1", 2,)), thread.start_new_thread(print_time, ("Thread-2", 4,))] except: print("Error: unable to start thread") while executed_count < 2: pass
When we run this script we’ll see an output that looks like:
Thread-1: Mon Dec 21 12:55:23 2015 Thread-2: Mon Dec 21 12:55:25 2015 Thread-1: Mon Dec 21 12:55:25 2015 Thread-1: Mon Dec 21 12:55:27 2015 Thread-2: Mon Dec 21 12:55:29 2015 Thread-1: Mon Dec 21 12:55:29 2015 Thread-1: Mon Dec 21 12:55:31 2015 Thread-2: Mon Dec 21 12:55:33 2015 Thread-2: Mon Dec 21 12:55:37 2015 Thread-2: Mon Dec 21 12:55:41 2015
So, let’s see what is going on there.
We have a variable called executed_count
initialized in zero. Then there is a function called print_time
which receives the name of the thread and a delay, and every {delay} seconds it will print the date with the thread name, five times. Then it adds one to the executed_count
variable.
Then we create two threads, each of them containing our print_time
function, with a name and a delay assigned to them. And we have a while which makes sure the program won’t exit until executed_count
is equal or greater than 2.
2. Python threading module
The newer threading
module included with Python 2.4 provides much more powerful, high-level support for threads than the _thread
module.
2.1. Extending Thread
The most used procedure for spawning a thread using this module, is defining a subclass of the Thread
class. Once you’ve done it, you should override the __init__
and run
methods.
Once you’ve got your class, you just instantiate an object of it and call the method called start
. Let’s see an example of this:
import threading import time class MyThread(threading.Thread): def __init__(self, name, sleep_time): threading.Thread.__init__(self) self.name = name self.sleep_time = sleep_time def run(self): print("{} start".format(self.name)) time.sleep(self.sleep_time) print("{} end".format(self.name)) threads = [MyThread("Thread-{}".format(i), i) for i in range(1, 4)] for t in threads: t.start()
Then we run it an see something like:
Thread-1 start Thread-2 start Thread-3 start Thread-1 end Thread-2 end Thread-3 end
Of course, we don’t need to name our threads like that. Each Thread instance has a name with a default value that can be changed as the thread is created. Naming threads is useful in server processes with multiple service threads handling different operations. So, let’s change our code to avoid reinventing the wheel. In our constructor:
def __init__(self, sleep_time): threading.Thread.__init__(self) threading.Thread.__init__(self) self.sleep_time = sleep_time
Then the output will look like:
Thread-2 start Thread-4 start Thread-6 start Thread-2 end Thread-4 end Thread-6 end
But it usually isn’t so simple, the logic isn’t in the run method ouf our Thread
class, so let’s make it a little more real and do the print
and sleep in a method outside our MyThread
.
2.2. Getting Current Thread Information
Our problem here is that we don’t have our thread name outside of it, and we don’t want that information going around in our function’s signature. But the threading
module, has some methods that give us access to the current thread information:
import threading import time def my_logic(sleep_time): thread_name = threading.current_thread().getName() print("{} start".format(thread_name)) time.sleep(sleep_time) print("{} end".format(thread_name)) class MyThread(threading.Thread): def __init__(self, sleep_time): threading.Thread.__init__(self) threading.Thread.__init__(self) self.sleep_time = sleep_time def run(self): my_logic(self.sleep_time) threads = [MyThread(i) for i in range(1, 4)] for t in threads: t.start()
By executing threading.current_thread()
, we gain access to the current thread information. Among that information we can find its status (is_alive()
), its daemon flag (isDaemon()
), and other useful methods.
2.3. Daemon Threads
Now, let’s talk about daemon threads. Until now, our programs waited for every thread to end before actually terminating, but sometimes we don’t want that behaviour. If we have a thread, pushing status or metrics to a service, we usually don’t care if it has finished or not when we shut down our program, and maybe we don’t want to explicitly terminate it before exiting.
Daemon threads run without blocking the main thread from exiting. They are useful when we have services where there may not be an easy way to interrupt the thread or where letting the thread die in the middle of its work does not lose or corrupt data.
To spawn a daemon thread, we just spawn a normal thread an call its setDaemon()
method with True
as a parameter. By default threads are not daemon. Let’s se how our program behaves when we make one of those threads daemon:
threads = [MyThread(i) for i in range(1, 4)] threads[2].setDaemon(True) for t in threads: t.start()
We are now grabbing the last thread we create, and making it daemon. The output will now look like:
Thread-2 start Thread-4 start Thread-6 start Thread-2 end Thread-4 end
As you can see, the main thread is not waiting for Thread-6 to finish before exiting, daemon threads are terminated when the main thread finished its execution.
Let’s write something that resembles a real-life problem solution. Let’s make a script, that given an array of URL’s, crawls them and saves the html in files.
site-crawler.py
import http.client import threading import logging logging.basicConfig(level=logging.INFO, format='(%(threadName)-10s) %(message)s', ) def save(html, file_absolute_path): logging.info("saving {} bytes to {}".format(len(html), file_absolute_path)) with open(file_absolute_path, 'wb+') as file: file.write(html) file.flush() def crawl(req): logging.info("executing get request for parameters: {}".format(str(req))) connection = http.client.HTTPConnection(req["host"], req["port"]) connection.request("GET", req["path"]) response = connection.getresponse() logging.info("got {} response http code".format(response.status)) logging.debug("headers: {}".format(str(response.headers))) response_content = response.read() logging.debug("actual response: {}".format(response_content)) return response_content class MyCrawler(threading.Thread): def __init__(self, req, file_path): threading.Thread.__init__(self, name="Crawler-{}".format(req["host"])) self.req = req self.file_path = file_path def run(self): global executed_crawlers html = crawl(self.req) save(html, self.file_path) def __main__(): continue_input = True threads = [] while continue_input: host = input("host: ") port = 80 # int(input("port: ")) path = "/" # input("path: ") file_path = input("output file absolute path: ") req = {"host": host, "port": port, "path": path} threads.append(MyCrawler(req, file_path)) continue_input = input("add another? (y/N) ") == "y" for t in threads: t.start() # t.join() __main__()
So, what do we’ve got here? This is a script that asks the user to input a host and the absolute path of the file to which it’ll write the site’s output html, and measures the time it’ll take. The script as is gives the following input:
host: www.google.com.ar output file absolute path: /tmp/google-ar.html add another? (y/N) y host: www.google.com.br output file absolute path: /tmp/google-br.html add another? (y/N) y host: www.google.com.co output file absolute path: /tmp/google-co.html add another? (y/N) y host: www.google.cl output file absolute path: /tmp/google-cl.html add another? (y/N) (Crawler-www.google.com.ar) executing get request for parameters: {'path': '/', 'host': 'www.google.com.ar', 'port': 80} (Crawler-www.google.com.br) executing get request for parameters: {'path': '/', 'host': 'www.google.com.br', 'port': 80} (Crawler-www.google.com.co) executing get request for parameters: {'path': '/', 'host': 'www.google.com.co', 'port': 80} (Crawler-www.google.cl) executing get request for parameters: {'path': '/', 'host': 'www.google.cl', 'port': 80} (Crawler-www.google.com.co) got 200 response http code (Crawler-www.google.com.ar) got 200 response http code (Crawler-www.google.com.br) got 200 response http code (Crawler-www.google.com.co) saving 53181 bytes to /tmp/google-co.html (Crawler-www.google.com.ar) saving 53008 bytes to /tmp/google-ar.html (Crawler-www.google.com.br) saving 53605 bytes to /tmp/google-br.html (Crawler-www.google.cl) got 200 response http code (Crawler-www.google.cl) saving 53069 bytes to /tmp/google-cl.html
As the log shows us, threads are running in parallel, a thread runs while the others make I/O operations (write to a file, or connect via http to a server). Now, there is a line commented on the __main__()
function that says t.join()
, the join method, causes the current thread to wait for the thread it joined before continuing its execution, and the log looks like:
host: www.google.com.ar output file absolute path: /tmp/google-ar.html add another? (y/N) y host: www.google.com.br output file absolute path: /tmp/google-ar.html add another? (y/N) y host: www.google.com.co output file absolute path: /tmp/google-co.html add another? (y/N) y host: www.google.cl output file absolute path: /tmp/google-cl.html add another? (y/N) (Crawler-www.google.com.ar) executing get request for parameters: {'port': 80, 'path': '/', 'host': 'www.google.com.ar'} (Crawler-www.google.com.ar) got 200 response http code (Crawler-www.google.com.ar) saving 52973 bytes to /tmp/google-ar.html (Crawler-www.google.com.br) executing get request for parameters: {'port': 80, 'path': '/', 'host': 'www.google.com.br'} (Crawler-www.google.com.br) got 200 response http code (Crawler-www.google.com.br) saving 54991 bytes to /tmp/google-ar.html (Crawler-www.google.com.co) executing get request for parameters: {'port': 80, 'path': '/', 'host': 'www.google.com.co'} (Crawler-www.google.com.co) got 200 response http code (Crawler-www.google.com.co) saving 53172 bytes to /tmp/google-co.html (Crawler-www.google.cl) executing get request for parameters: {'port': 80, 'path': '/', 'host': 'www.google.cl'} (Crawler-www.google.cl) got 200 response http code (Crawler-www.google.cl) saving 53110 bytes to /tmp/google-cl.html
See? First it crawls google Argentina, then Brazil, and so on. You sure are wondering why would someone do this. Well… this is not the only use case of the join
method. Imagine these threads where daemon, and you don’t have control over that. You would have to instantiate a variable which holds the amount of threads executed and then wait for it to equal the number of threads that must be executed before exiting the main thread. It’s not very elegant.
2.4. Joining Threads
Well, there is another way, let’s make these threads daemon, just to experiment a little bit, and wait for all of them to finish before exiting the main thread:
site-crawler.py
import http.client import threading import logging logging.basicConfig(level=logging.INFO, format='(%(threadName)-10s) %(message)s', ) def save(html, file_absolute_path): logging.info("saving {} bytes to {}".format(len(html), file_absolute_path)) with open(file_absolute_path, 'wb+') as file: file.write(html) file.flush() def crawl(req): logging.info("executing get request for parameters: {}".format(str(req))) connection = http.client.HTTPConnection(req["host"], req["port"]) connection.request("GET", req["path"]) response = connection.getresponse() logging.info("got {} response http code".format(response.status)) logging.debug("headers: {}".format(str(response.headers))) response_content = response.read() logging.debug("actual response: {}".format(response_content)) return response_content class MyCrawler(threading.Thread): def __init__(self, req, file_path): threading.Thread.__init__(self, name="Crawler-{}".format(req["host"]), daemon=True) self.req = req self.file_path = file_path def run(self): global executed_crawlers html = crawl(self.req) save(html, self.file_path) def __main__(): continue_input = True threads = [] while continue_input: host = input("host: ") port = 80 # int(input("port: ")) path = "/" # input("path: ") file_path = input("output file absolute path: ") req = {"host": host, "port": port, "path": path} threads.append(MyCrawler(req, file_path)) continue_input = input("add another? (y/N) ") == "y" for t in threads: t.start() current_thread = threading.currentThread() for thread in threading.enumerate(): if thread is not current_thread: thread.join() __main__()
Here, we are creating every thread as daemon, but we are enumerating every active thread by calling threading.enumerate()
and joining every thread which is not the main one. The behavior remains the same:
host: www.google.com.ar output file absolute path: /tmp/google-ar.html add another? (y/N) y host: www.google.com.br output file absolute path: /tmp/google-br.html add another? (y/N) y host: www.google.com.co output file absolute path: /tmp/google-co.html add another? (y/N) y host: www.google.cl output file absolute path: /tmp/google-cl.html add another? (y/N) (Crawler-www.google.com.ar) executing get request for parameters: {'port': 80, 'host': 'www.google.com.ar', 'path': '/'} (Crawler-www.google.com.br) executing get request for parameters: {'port': 80, 'host': 'www.google.com.br', 'path': '/'} (Crawler-www.google.com.co) executing get request for parameters: {'port': 80, 'host': 'www.google.com.co', 'path': '/'} (Crawler-www.google.cl) executing get request for parameters: {'port': 80, 'host': 'www.google.cl', 'path': '/'} (Crawler-www.google.com.ar) got 200 response http code (Crawler-www.google.cl) got 200 response http code (Crawler-www.google.com.br) got 200 response http code (Crawler-www.google.com.ar) saving 52980 bytes to /tmp/google-ar.html (Crawler-www.google.cl) saving 53088 bytes to /tmp/google-cl.html (Crawler-www.google.com.br) saving 53549 bytes to /tmp/google-br.html (Crawler-www.google.com.co) got 200 response http code (Crawler-www.google.com.co) saving 53117 bytes to /tmp/google-co.html
2.5. Time Threads
Another thing that’s worthy of being pointed out, is the existence of the class threading.Timer
. It is basically a subclass of Thread
which, given a delay and a function, it executes the function after the delay has passed. Also, it can be cancelled at any point.
import threading import time import logging logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',) def delayed(): logging.debug('worker running') return t1 = threading.Timer(3, delayed) t1.setName('t1') t2 = threading.Timer(3, delayed) t2.setName('t2') logging.debug('starting timers') t1.start() t2.start() logging.debug('waiting before canceling %s', t2.getName()) time.sleep(2) logging.debug('canceling %s', t2.getName()) t2.cancel() logging.debug('done')
If we execute it:
(MainThread) starting timers (MainThread) waiting before canceling t2 (MainThread) canceling t2 (MainThread) done (t1 ) worker running
Here, we are creating two timers, both execute the same function after 3 seconds. Then we wait 2 seconds and cancel one of them. In the output we can see only one of the timers executed the delayed function.
This is useful on scenarios where we need to execute some process if something didn’t happen in an interval of time, or even for scheduling.
2.6. Events: Communication Between Threads
Now, we all know that the idea of using threads is making tasks independent from each other, but some times we need for a thread to wait for an event caused by another. Python provides a way of signaling between threads. To experiment with this, we’ll make a race:
race.py
import threading class Racer(threading.Thread): def __init__(self, name, start_signal): threading.Thread.__init__(self, name=name) self.start_signal = start_signal def run(self): self.start_signal.wait() print("I, {}, got to the goal!".format(self.name)) class Race: def __init__(self, racer_names): self.start_signal = threading.Event() self.racers = [Racer(name, self.start_signal) for name in racer_names] for racer in self.racers: racer.start() def start(self): self.start_signal.set() def __main__(): race = Race(["rabbit", "turtle", "cheetah", "monkey", "cow", "horse", "tiger", "lion"]) race.start() __main__()
Here, we created a subclass of Thread
called Racer
, which on its run waits for the event to be set and then prints “I, {name}, got to the goal!”.
We create a couple of threads, an then set the event, so they all try to start at the same time, the output is interesting:
first run output
I, rabbit, got to the goal! I, lion, got to the goal! I, turtle, got to the goal! I, cheetah, got to the goal! I, monkey, got to the goal! I, cow, got to the goal! I, horse, got to the goal! I, tiger, got to the goal!
second run output
I, lion, got to the goal! I, turtle, got to the goal! I, monkey, got to the goal! I, horse, got to the goal! I, cow, got to the goal! I, tiger, got to the goal! I, cheetah, got to the goal! I, rabbit, got to the goal!
Here we can see how the rabbit won the first race, but ended last on the second. Either he got tired, or our event behaves just as we wanted it to behave.
If we didn’t use the event, and start each thread in a loop, the first thread would have an advantage of milliseconds over the last one. And we all know every millisecond counts on computer times.
2.7. Locking Resources
Sometimes we have a couple threads accessing the same resource, and if it’s not thread safe, we don’t want threads to access it at the same time. One solution to this problem could be locking.
A side note: Python’s built-in data structures (lists, dictionaries, etc.) are thread-safe as a side-effect of having atomic byte-codes for manipulating them (the GIL is not released in the middle of an update). Other data structures implemented in Python, or simpler types like integers and floats, don’t have that protection.
Let’s imagine, just to make a fun example, that dictionaries in python are not thread safe. We’ll make an on-memory repository and make a couple of threads read and write data to it:
locking.py
import random import threading import logging logging.basicConfig(level=logging.INFO, format='[%(levelname)s] (%(threadName)-s) (%(module)-s) (%(funcName)-s) %(message)s', filename='/tmp/locking-py.log') class Repository: def __init__(self): self.repo = {} self.lock = threading.Lock() def create(self, entry): logging.info("waiting for lock") self.lock.acquire() try: logging.info("acquired lock") new_id = len(self.repo.keys()) entry["id"] = new_id self.repo[new_id] = entry finally: logging.info("releasing lock") self.lock.release() def find(self, entry_id): logging.info("waiting for lock") self.lock.acquire() try: logging.info("acquired lock") return self.repo[entry_id] except KeyError: return None finally: logging.info("releasing lock") self.lock.release() def all(self): logging.info("waiting for lock") self.lock.acquire() try: logging.info("acquired lock") return self.repo finally: logging.info("releasing lock") self.lock.release() class ProductRepository(Repository): def __init__(self): Repository.__init__(self) def add_product(self, description, price): self.create({"description": description, "price": price}) class PurchaseRepository(Repository): def __init__(self, product_repository): Repository.__init__(self) self.product_repository = product_repository def add_purchase(self, product_id, qty): product = self.product_repository.find(product_id) if product is not None: total_amount = product["price"] * qty self.create({"product_id": product_id, "qty": qty, "total_amount": total_amount}) def sales_by_product(self, product_id): sales = {"product_id": product_id, "qty": 0, "total_amount": 0} all_purchases = self.all() for k in all_purchases: purchase = all_purchases[k] if purchase["product_id"] == sales["product_id"]: sales["qty"] += purchase["qty"] sales["total_amount"] += purchase["total_amount"] return sales class Buyer(threading.Thread): def __init__(self, name, product_repository, purchase_repository): threading.Thread.__init__(self, name="Buyer-" + name) self.product_repository = product_repository self.purchase_repository = purchase_repository def run(self): for i in range(0, 1000): max_product_id = len(self.product_repository.all().keys()) product_id = random.randrange(0, max_product_id + 1, 1) qty = random.randrange(0, 100, 1) self.purchase_repository.add_purchase(product_id, qty) class ProviderAuditor(threading.Thread): def __init__(self, product_id, purchase_repository): threading.Thread.__init__(self, name="Auditor-product_id=" + str(product_id)) self.product_id = product_id self.purchase_repository = purchase_repository def run(self): logging.info(str(self.purchase_repository.sales_by_product(self.product_id))) def __main__(): product_repository = ProductRepository() purchase_repository = PurchaseRepository(product_repository) input_another_product = True while input_another_product: description = input("product description: ") price = float(input("product price: ")) product_repository.add_product(description, price) input_another_product = input("continue (y/N): ") == "y" buyers = [Buyer("carlos", product_repository, purchase_repository), Buyer("juan", product_repository, purchase_repository), Buyer("mike", product_repository, purchase_repository), Buyer("sarah", product_repository, purchase_repository)] for b in buyers: b.start() b.join() for i in product_repository.all(): ProviderAuditor(i, purchase_repository).start() __main__()
As you see, both resources (purchases and products) are extending a class Repository
which has locks for every access method (let’s assume every developer will know that he mustn’t access the repository’s dictionary directly).
This lock will guarantee that only one thread at a time can access one repository. One thing to notice is how the lock is released in a finally
block, you should be very careful with that. If you don’t put the release in a finally block, whenever an exception is raised and interrupts the function’s execution, your lock will not be released, and there will be no way to access that resource anymore.
Now, let’s execute this code and input something like:
product description: a product price: 1 continue (y/N): y product description: b product price: 2 continue (y/N): y product description: c product price: 3 continue (y/N): y product description: d product price: 4 continue (y/N): y product description: e product price: 5 continue (y/N): y product description: f product price: 6 continue (y/N): y product description: g product price: 7 continue (y/N): y product description: h product price: 8 continue (y/N): y product description: i product price: 9 continue (y/N):
As you see, the logger was configured to output its log to a file. We don’t care about the logging of the Buyer
threads, since they perform a thousand actions each. That log won’t be readable, BUT, ProviderAuditor
threads will log some very interesting information. So we run grep "Auditor" /tmp/locking-py.log
and see:
[INFO] (Auditor-product_id=0) (locking) (all) waiting for lock [INFO] (Auditor-product_id=0) (locking) (all) acquired lock [INFO] (Auditor-product_id=0) (locking) (all) releasing lock [INFO] (Auditor-product_id=1) (locking) (all) waiting for lock [INFO] (Auditor-product_id=0) (locking) (run) {'total_amount': 19850.0, 'product_id': 0, 'qty': 19850} [INFO] (Auditor-product_id=2) (locking) (all) waiting for lock [INFO] (Auditor-product_id=1) (locking) (all) acquired lock [INFO] (Auditor-product_id=3) (locking) (all) waiting for lock [INFO] (Auditor-product_id=4) (locking) (all) waiting for lock [INFO] (Auditor-product_id=5) (locking) (all) waiting for lock [INFO] (Auditor-product_id=1) (locking) (all) releasing lock [INFO] (Auditor-product_id=6) (locking) (all) waiting for lock [INFO] (Auditor-product_id=7) (locking) (all) waiting for lock [INFO] (Auditor-product_id=1) (locking) (run) {'total_amount': 41586.0, 'product_id': 1, 'qty': 20793} [INFO] (Auditor-product_id=2) (locking) (all) acquired lock [INFO] (Auditor-product_id=2) (locking) (all) releasing lock [INFO] (Auditor-product_id=2) (locking) (run) {'total_amount': 60294.0, 'product_id': 2, 'qty': 20098} [INFO] (Auditor-product_id=3) (locking) (all) acquired lock [INFO] (Auditor-product_id=3) (locking) (all) releasing lock [INFO] (Auditor-product_id=3) (locking) (run) {'total_amount': 86752.0, 'product_id': 3, 'qty': 21688} [INFO] (Auditor-product_id=4) (locking) (all) acquired lock [INFO] (Auditor-product_id=8) (locking) (all) waiting for lock [INFO] (Auditor-product_id=4) (locking) (all) releasing lock [INFO] (Auditor-product_id=4) (locking) (run) {'total_amount': 93960.0, 'product_id': 4, 'qty': 18792} [INFO] (Auditor-product_id=5) (locking) (all) acquired lock [INFO] (Auditor-product_id=5) (locking) (all) releasing lock [INFO] (Auditor-product_id=5) (locking) (run) {'total_amount': 109776.0, 'product_id': 5, 'qty': 18296} [INFO] (Auditor-product_id=6) (locking) (all) acquired lock [INFO] (Auditor-product_id=6) (locking) (all) releasing lock [INFO] (Auditor-product_id=6) (locking) (run) {'total_amount': 140945.0, 'product_id': 6, 'qty': 20135} [INFO] (Auditor-product_id=7) (locking) (all) acquired lock [INFO] (Auditor-product_id=7) (locking) (all) releasing lock [INFO] (Auditor-product_id=7) (locking) (run) {'total_amount': 164152.0, 'product_id': 7, 'qty': 20519} [INFO] (Auditor-product_id=8) (locking) (all) acquired lock [INFO] (Auditor-product_id=8) (locking) (all) releasing lock [INFO] (Auditor-product_id=8) (locking) (run) {'total_amount': 182475.0, 'product_id': 8, 'qty': 20275}
There are our 8 ProviderAuditor
threads, the first one to acquire the lock is the Auditor-product_id=0, then releases it and prints our sales. Then goes Auditor-product_id=1 and 2, 3, 4 and 5 are waiting. And it goes on and on. Now (again, imagining python’s dictionaries are not thread safe) our resources are thread safe.
Another side note here: Let’s imagine another scenario. We have a thread and a resource. The thread locks de resource to write some data, and in the middle, it needs to lock it again to read some other data without releasing the first lock. Well, we have a problem here… Normal Lock
objects can not be acquired more than once, even by the same thread. Changing it is easy, just substitute Lock
with RLock
, which is a Re-entrant Lock and will provide access to a locked resource, only to the thread which performed the lock.
Locks implement the context manager API and are compatible with the with statement. Using with removes the need to explicitly acquire and release the lock. Let’s modify our Repository
class code to make it prettier:
locking.py
class Repository: def __init__(self): self.repo = {} self.lock = threading.Lock() def create(self, entry): logging.info("waiting lock") with self.lock: logging.info("acquired lock") new_id = len(self.repo.keys()) entry["id"] = new_id self.repo[new_id] = entry def find(self, entry_id): logging.info("waiting for lock") with self.lock: try: logging.info("acquired lock") return self.repo[entry_id] except KeyError: return None def all(self): logging.info("waiting for lock") with self.lock: logging.info("acquired lock") return self.repo
And the behavior remains the same:
[INFO] (Auditor-product_id=0) (locking) (all) waiting for lock [INFO] (Auditor-product_id=0) (locking) (all) acquired lock [INFO] (Auditor-product_id=1) (locking) (all) waiting for lock [INFO] (Auditor-product_id=0) (locking) (run) {'product_id': 0, 'total_amount': 19098.0, 'qty': 19098} [INFO] (Auditor-product_id=2) (locking) (all) waiting for lock [INFO] (Auditor-product_id=1) (locking) (all) acquired lock [INFO] (Auditor-product_id=3) (locking) (all) waiting for lock [INFO] (Auditor-product_id=4) (locking) (all) waiting for lock [INFO] (Auditor-product_id=5) (locking) (all) waiting for lock [INFO] (Auditor-product_id=1) (locking) (run) {'product_id': 1, 'total_amount': 36344.0, 'qty': 18172} [INFO] (Auditor-product_id=6) (locking) (all) waiting for lock [INFO] (Auditor-product_id=2) (locking) (all) acquired lock [INFO] (Auditor-product_id=7) (locking) (all) waiting for lock [INFO] (Auditor-product_id=8) (locking) (all) waiting for lock [INFO] (Auditor-product_id=9) (locking) (all) waiting for lock [INFO] (Auditor-product_id=2) (locking) (run) {'product_id': 2, 'total_amount': 57555.0, 'qty': 19185} [INFO] (Auditor-product_id=3) (locking) (all) acquired lock [INFO] (Auditor-product_id=3) (locking) (run) {'product_id': 3, 'total_amount': 72292.0, 'qty': 18073} [INFO] (Auditor-product_id=4) (locking) (all) acquired lock [INFO] (Auditor-product_id=4) (locking) (run) {'product_id': 4, 'total_amount': 88835.0, 'qty': 17767} [INFO] (Auditor-product_id=5) (locking) (all) acquired lock [INFO] (Auditor-product_id=5) (locking) (run) {'product_id': 5, 'total_amount': 110754.0, 'qty': 18459} [INFO] (Auditor-product_id=6) (locking) (all) acquired lock [INFO] (Auditor-product_id=6) (locking) (run) {'product_id': 6, 'total_amount': 129766.0, 'qty': 18538} [INFO] (Auditor-product_id=7) (locking) (all) acquired lock [INFO] (Auditor-product_id=7) (locking) (run) {'product_id': 7, 'total_amount': 152576.0, 'qty': 19072} [INFO] (Auditor-product_id=8) (locking) (all) acquired lock [INFO] (Auditor-product_id=8) (locking) (run) {'product_id': 8, 'total_amount': 150210.0, 'qty': 16690} [INFO] (Auditor-product_id=9) (locking) (all) acquired lock [INFO] (Auditor-product_id=9) (locking) (run) {'product_id': 9, 'total_amount': 160150.0, 'qty': 16015}
2.8. Limiting Concurrent Access to Resources
Using a lock like that, ensures only one thread at a time can access a resource, but imagine a data base access. Maybe you have a connection pool of, at most, 100 connections. In this case you want concurrent access, but you don’t want more than a 100 threads to access this resource at once. Semaphore
comes to help, it tells the Lock
how many threads can acquire lock at once. Let’s modify our ProviderAuditor
code to let at most 5 providers acquire lock at the same time:
locking.py
class ProviderAuditor(threading.Thread): def __init__(self, product_id, purchase_repository): threading.Thread.__init__(self, name="Auditor-product_id=" + str(product_id)) self.product_id = product_id self.purchase_repository = purchase_repository self.semaphore = threading.Semaphore(5) def run(self): with self.semaphore: logging.info(str(self.purchase_repository.sales_by_product(self.product_id)))
2.9. Thread-Specific Data
We often need data to be only accessible from one thread (eg.: identifiers of the current process), python provides the local()
method which returns data that is only accessible from one thread, here goes an example:
process-identifier.py
import logging import random from threading import Thread, local logging.basicConfig(level=logging.INFO, format='[%(levelname)s] (%(threadName)-s) (%(module)-s) (%(funcName)-s) %(message)s',) def my_method(data): try: logging.info(str(data.value)) except AttributeError: logging.info("data does not have a value yet") class MyProcess(Thread): def __init__(self): Thread.__init__(self) def run(self): data = local() my_method(data) data.value = {"process_id": random.randint(0, 1000)} my_method(data) for i in range(0, 4): MyProcess().start()
It’s pretty easy to use, a very useful. If you run it you’ll see something like:
[INFO] (Thread-1) (process-identifier) (my_method) data does not have a value yet [INFO] (Thread-1) (process-identifier) (my_method) {'process_id': 567} [INFO] (Thread-2) (process-identifier) (my_method) data does not have a value yet [INFO] (Thread-2) (process-identifier) (my_method) {'process_id': 477} [INFO] (Thread-3) (process-identifier) (my_method) data does not have a value yet [INFO] (Thread-3) (process-identifier) (my_method) {'process_id': 812} [INFO] (Thread-4) (process-identifier) (my_method) data does not have a value yet [INFO] (Thread-4) (process-identifier) (my_method) {'process_id': 981}
Every thread has to initialize local().value
, data from other threads will never be available there.
3. Download the Code Project
This was an example on Threading in Python.
You can download the full source code of this example here: python-threading