import logging import threading from collections.abc import Iterable from concurrent.futures import ThreadPoolExecutor from functools import partial from queue import Queue from time import perf_counter from typing import Callable logger = logging.getLogger(__name__) def worker(q: Queue): thread = threading.current_thread() logger.info(thread) while True: try: item = q.get() get_time = perf_counter() if item is None: break elif callable(item): logger.info(f'Calling {item}') item() except Exception as exc: logger.info(f'Error: {exc}') finally: q.task_done() proc_time = perf_counter() - get_time logger.info(f'{thread.name}: {proc_time*10**3:.3f}ms {item}') # logger.info('Broke worker loop') return f'Ended {thread.name}' def run_executor(queues: Iterable[Queue]): with ThreadPoolExecutor(thread_name_prefix='AD-App-Thread') as executor: for q in queues: executor.submit(worker, q=q) # for fut in executor.map(worker, queues): # logger.info(fut) def main(n: int, start: bool = True): queues = [Queue() for _ in range(n)] thread = threading.Thread( target=run_executor, args=(queues,), name='AD-ThreadPoolExecutor' ) if start: thread.start() return queues def dispatch_worker(queue: Queue, func: Callable, *args, **kwargs): return queue.put_nowait(partial(func, *args, **kwargs)) if __name__ == '__main__': from time import sleep logging.basicConfig( level='INFO', format='{asctime} [{levelname}] {message}', style='{' ) queues = main(5) logger.info('Starting') for i in range(5): for q in queues: q.put_nowait(i) logger.info('Sleeping') sleep(1.0) logger.info('Done')