#!/usr/bin/env python3 # From https://pybay.com/site_media/slides/raymond2017-keynote/threading.html import threading, queue _counter:int = 0 counter_queue = queue.Queue() def counter_manager() -> None: 'I have EXCLUSIVE rights to update the _counter variable' global _counter while True: increment = counter_queue.get() _counter += increment print_queue.put([ 'The count is %d' % _counter, '---------------']) counter_queue.task_done() print_queue = queue.Queue() def print_manager() -> None: 'I have EXCLUSIVE rights to call the "print" keyword' while True: job = print_queue.get() for line in job: print(line) print_queue.task_done() def worker() -> None: 'My job is to increment the counter and print the current count' counter_queue.put(1) def run() -> None: # Set up the counter manager t = threading.Thread(target=counter_manager) t.daemon = True t.start() del t # Set up the print manager t = threading.Thread(target=print_manager) t.daemon = True t.start() del t # Start doing work print_queue.put(['Starting up']) worker_threads = [] for _ in range(10): t = threading.Thread(target=worker) worker_threads.append(t) t.start() for t in worker_threads: t.join() counter_queue.join() print_queue.put(['Finishing up']) print_queue.join() if __name__ == '__main__': run()