diff --git a/ad_threads.py b/ad_threads.py new file mode 100644 index 0000000..5ee941b --- /dev/null +++ b/ad_threads.py @@ -0,0 +1,54 @@ +import threading +from collections.abc import Iterable +from concurrent.futures import ThreadPoolExecutor +from functools import partial +from queue import Queue +from time import perf_counter +from typing import Callable + + +def worker(q: Queue): + thread = threading.current_thread() + print(thread) + while True: + try: + item = q.get() + get_time = perf_counter() + if item is None: + break + elif callable(item): + print(f'Calling {item}') + item() + else: + print(f'{thread.name}: {item}') + except Exception as exc: + print(f'Error: {exc}') + finally: + proc_time = perf_counter() - get_time + q.task_done() + print(f'{proc_time*10**3:.3f}ms') + + # print('Broke worker loop') + return f'Ended {thread.name}' + + +def run_executor(queues: Iterable[Queue]): + with ThreadPoolExecutor(thread_name_prefix='AD-App-Thread') as executor: + for fut in executor.map(worker, queues): + print(fut) + + +def main(n: int, start: bool = True): + queues = [Queue() for _ in range(n)] + thread = threading.Thread( + target=run_executor, + args=(queues,), + name='AD-ThreadPoolExecutor' + ) + if start: + thread.start() + return queues + + +def dispatch_worker(queue: Queue, func: Callable, *args, **kwargs): + return queue.put_nowait(partial(func, *args, **kwargs))