Source code for fennel.app

from functools import wraps
from typing import Any, Callable, Dict

import structlog

from fennel.client import Task as SyncTask
from fennel.client.aio import Task as AsyncTask
from fennel.keys import Keys
from fennel.logging import init_logging
from fennel.settings import Settings
from fennel.utils import EMPTY, get_aredis, get_redis

logger = structlog.get_logger("fennel.client")


[docs]class App: def __init__(self, name: str, **kwargs): """ The app is the main abstraction provided by Fennel. Python functions are decorated via ``@app.task`` to enable background processing. All settings are configured on this object. Parameters ---------- name : str Used to identify this application, e.g. to set which tasks a worker will execute. kwargs Any settings found in :class:`fennel.settings.Settings` Examples -------- >>> from fennel import App ... >>> app = App( ... name='myapp', ... redis_url='redis://127.0.0.1', ... default_retries=3, ... results_enabled=True, ... log_level='info', ... log_format='json', ... autodiscover='**/tasks.py', ... interface='sync', ... ) ... >>> @app.task(retries=1) >>> def foo(x): ... return x ... >>> x = foo.delay(7) # Execute in the background. >>> x AsyncResult(uuid=Tjr75jM3QDOHoLTLyrsY1g) >>> x.get() # Wait for the result. 7 If your code is running in an asynchronous event loop (e.g. via Starlette, FastAPI, Quart), you will want to use the async interface instead: >>> import asyncio ... >>> app = App(name='foo', interface='async') ... >>> @app.task >>> async def bar(x) ... await asyncio.sleep(x) ... return x ... >>> x = await bar.delay(5) >>> await x.status() SENT >>> await x.get() 5 """ self.name: str = name self.settings: Settings = Settings(**kwargs) self.task_map: Dict[str, Callable] = {} self.keys: Keys = Keys.for_app(self) self.client = get_redis(self) self.aioclient = get_aredis(self) if self.settings.interface == "sync": self.task_class = SyncTask else: self.task_class = AsyncTask init_logging(level=self.settings.log_level, format=self.settings.log_format)
[docs] def task(self, func: Callable = None, *, name=None, retries=EMPTY) -> Any: """ A decorator to register a function with the app to enable background processing via the task queue. The worker process (see :class:`fennel.worker.worker`) will need to discover all registered tasks on startup. The means all the modules containing tasks need to be imported. Fennel will import modules found via :attr:`fennel.settings.Settings.autodiscover`, which by default is ``'**/tasks.py'``. Parameters --------- func : Callable The decorated function. name : str The representation used to uniquely identify this task. retries : int The number of attempts at execution after a task has failed (meaning raised any exception). Examples -------- Exposes an interface similar to Celery: >>> @app.task(retries=1) >>> def foo(x): ... return x Tasks can be enqueued for processing via: >>> foo.delay(8) AsyncResult(uuid=q_jb6KaUT-G4tOAoyQ0yaA) The can also be called normally, bypassing the Fennel system entirely: >>> foo(3) 3 By default, tasks are 'fire-and-forget', meaning we will not wait for their completion. They will be executed by worker process and will be retried automatically on failure (using exponential backoff), so we assume tasks are idempotent. You can also wait for the result: >>> x = foo.delay(4) >>> x.status() SENT >>> x.get(timeout=10) 4 If instead you have many tasks and wish to wait for them to complete you can use the waiting primitives provided (you will want to ensure all tasks have retries=0, which you can set by default with an app setting): >>> from fennel.client import gather, wait >>> results = [foo.delay(x) for x in range(10)] >>> gathered = gather(results) # Or: >>> done, pending = wait(results, timeout=2) If your application is running in an event loop you can elect to use the async interface for your fennel app (see :attr:`fennel.settings.Settings.interface`), which uses `aioredis` under the hood to enqueue items, retrieve results, etc, so you will need to await those coroutines: >>> app = App(name='foo', interface='async') >>> >>> @app.task >>> async def bar(x) ... await asyncio.sleep(x) >>> >>> x = await bar.delay(1) >>> await x.status() SUCCESS """ @wraps(func) def wrapper(f: Callable): nonlocal retries retries = retries if retries is not EMPTY else self.settings.default_retries assert type(retries) == int, "retries must be an integer" assert retries >= 0, "retries must be an integer >= 0" nonlocal name name = name or f"{f.__module__}.{f.__qualname__}" assert type(name) == str, "name must be a str" assert name not in self.task_map, f"Task already found by app: {name}" logger.debug("found-task", name=name) self.task_map[name] = f return self.task_class(name, f, retries=retries, app=self) return wrapper(func) if func else wrapper
def __repr__(self) -> str: return f"App(name={self.name}, redis={self.settings.redis_url})"