Source code for obci.core.asyncio_task_manager


import asyncio
import logging
import threading
import functools
import concurrent.futures
import types
from typing import Optional, Union, Callable, Any

from obci.core import OBCI_DEBUG


[docs]class MessageLoopRunningException(Exception): """ Raised when function requiring to be called from outside message loop was called from inside message loop. """
[docs]def ensure_not_inside_msg_loop(func: Callable[..., Any]) -> Callable[..., Any]: """ Decorator used by AsyncioTaskManager and its subclasses to annotate methods requiring to be called outside message loop. """ @functools.wraps(func) def _wrapper(self, *args, **kwargs): if self._loop == asyncio.get_event_loop() and self._loop.is_running(): raise MessageLoopRunningException('Function was called inside running message loop. ' 'Probably you wanted to use the async version of called function.') return func(self, *args, **kwargs) return _wrapper
[docs]class AsyncioTaskManager: @staticmethod
[docs] def new_event_loop() -> asyncio.BaseEventLoop: """ When AsyncioTaskManager runs with self.owns_asyncio_loop == True this function is called to create a new asyncio event loop. Default implementation returns :func:`asyncio.new_event_loop`. :return: event loop object """ loop = asyncio.new_event_loop() if OBCI_DEBUG: loop.set_debug(True) return loop
_thread_name = 'AsyncioTaskManager' _logger_name = 'AsyncioTaskManager' def __init__(self, asyncio_loop: Optional[asyncio.BaseEventLoop]=None ) -> None: """ AsyncioTaskManager class is used to manage a set of tasks. Tasks are created using `create_task` function. Examples: Can be used as context manager:: with AsyncioTaskManager() as mgr: pass Or as async context manager:: async with AsyncioTaskManager() as mgr: pass :param asyncio_loop: existing message loop or `None` if new message loop should be created """ assert asyncio_loop is None or isinstance(asyncio_loop, asyncio.BaseEventLoop) super().__init__() self._logger = logging.getLogger(self._logger_name) self._tasks = set() self._shutdown_lock = threading.Lock() self._tasks_lock = threading.Lock() self._cancel_tasks_finished = threading.Condition(threading.Lock()) self._create_task_enabled = True if asyncio_loop is not None: self._owns_loop = False self._loop = asyncio_loop self._thread = None else: self._owns_loop = True self._loop = self.new_event_loop() self._thread = threading.Thread(target=self.__thread_func, name=self._thread_name) self._thread.daemon = True self._thread.start() def __del__(self): """ When running with 'self._owns_loop == True' this function requires 'self._thread.daemon == True', otherwise thread will not end properly when program ends and this function won't be called by Python. """ self.shutdown() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.shutdown() async def __aenter__(self): assert self._loop is not None assert asyncio.get_event_loop() == self._loop return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.async_shutdown() @property def owns_asyncio_loop(self) -> bool: """ bool: True if owns asyncio message loop. If this instance owns message loop, it will be closed and destroyed on `AsyncioTaskManager`'s shutdown. """ return self._owns_loop
[docs] def create_task(self, coro: types.CoroutineType ) -> Union[asyncio.Future, concurrent.futures.Future]: """ Create a new task and return Future object. New task will be added to an internal tasks list. When task finishes or raises exception or is cancelled it will be automatically removed from that list. When `AsyncioTaskManager` is asked to close it will cancel all tasks on that list. .. note:: Can be called from any thread or/and from any coroutine. """ assert self._loop is not None if not self._create_task_enabled: raise Exception('AsyncioTaskManager is shutting down. Creating new tasks disabled.') async def coro_wrapper(): nonlocal coro try: return await coro except asyncio.CancelledError: self._logger.info('Coroutine cancelled: {}'.format(coro)) raise except Exception: self._logger.info('Exception in coroutine: {}'.format(coro), exc_info=True) raise if asyncio.get_event_loop() == self._loop: future = asyncio.ensure_future(coro_wrapper(), loop=self._loop) else: future = asyncio.run_coroutine_threadsafe(coro_wrapper(), loop=self._loop) with self._tasks_lock: self._tasks.add(future) def future_done_callback(future_obj): if future_obj in self._tasks: with self._tasks_lock: self._tasks.remove(future_obj) future.add_done_callback(future_done_callback) return future
@ensure_not_inside_msg_loop
[docs] def shutdown(self) -> None: """ Can be called from ANY thread, but NOT from event loop. It will block until all pending tasks are finished. Can be called multiple times. """ with self._shutdown_lock: if not self._create_task_enabled: assert len(self._tasks) == 0 return self._create_task_enabled = False if self._owns_loop: if self._thread.is_alive() and self._loop.is_running: self._loop.call_soon_threadsafe(self._loop.stop) self._thread.join() else: if self._loop.is_running: asyncio.run_coroutine_threadsafe(self.__cancel_all_tasks(notify=True), loop=self._loop) self._cancel_tasks_finished.wait() assert len(self._tasks) == 0
[docs] async def async_shutdown(self) -> None: """ Can be called from coroutine running inside message loop. """ self._create_task_enabled = False await self.__cancel_all_tasks(notify=False)
def _cleanup(self) -> None: """ Can be reimplemented to perform extra cleanup tasks. .. note:: Always remember to call `super()._cleanup()` when overloading this function. """ pass async def __cancel_all_tasks(self, notify: bool) -> None: """ Cancel all pending tasks and wait for them to finish. """ try: gather_future = asyncio.gather(*self._tasks) gather_future.cancel() await asyncio.wait_for(gather_future, None, loop=self._loop) finally: try: self._cleanup() finally: if notify: self._cancel_tasks_finished.notify_all() def __thread_func(self) -> None: """ Entry point of message loop thread. Runs new message loop in a new thread when self.owns_asyncio_loop is True. """ assert self._owns_loop and self._loop is not None and self._thread is not None try: if self._logger.isEnabledFor(logging.DEBUG): self._logger.debug("Setting message loop for thread '{}' ({})." .format(self._thread.name, self._thread.ident)) asyncio.set_event_loop(self._loop) self._logger.debug('Starting message loop...') self._loop.run_forever() except Exception: self._logger.exception('Exception in asyncio event loop:') finally: self._create_task_enabled = False self._logger.debug('Message loop stopped. Cancelling all pending tasks.') try: tasks = asyncio.gather(*asyncio.Task.all_tasks(), loop=self._loop, return_exceptions=True) tasks.cancel() try: self._loop.run_until_complete(tasks) except asyncio.CancelledError: self._logger.debug('cancelled: {}'.format(tasks), exc_info=True) pass except Exception: self._logger.exception('Exception during message loop shutdown phase:') raise finally: self._loop.close() except Exception: self._logger.exception('Unknown exception during message loop shutdown phase:') finally: self._cleanup() self._logger.debug('All cleanup tasks finished. Event loop thread will end now.')