quattro package

Module contents

Task control for asyncio.

class quattro.CancelScope(deadline=None)

Bases: object

Method generated by attrs for class CancelScope.

Parameters:

deadline (float | None)

cancel()

Request cancellation of this scope.

Return type:

None

cancelled_caught: bool

Whether the scope finished by cancellation or not.

property deadline: float | None
class quattro.Deferrer

Bases: AsyncExitStack

A decorator and class to enable deferring functions until the end of a coroutine.

First, apply the Deferrer.enable decorator to a coroutine function. The coroutine function will receive an instance of Deferrer as its first positional argument.

This instance of Deferrer can be used to enter async context managers; these context managers will be exited after the coroutine finishes.

Example

>>> from quattro import Deferrer
>>> @Deferrer.enable
... async def test(defer: Deferrer) -> None:
...     tg = defer(TaskGroup())  # This TaskGroup will be exited after return
...

Deferrer is a subclass of contextlib.AsyncExitStack, so it supports its full API:

  • AsyncExitStack.enter_async_context (equivalent to just Defer.__call__ shown above)

  • AsyncExitStack.push_async_callback

  • AsyncExitStack.push_async_exit

  • BaseExitStack.enter_context

  • BaseExitStack.callback

  • BaseExitStack.push

Loosely inspired by the Go defer statement. (https://go.dev/tour/flowcontrol/12)

classmethod enable(function)

A decorator to be applied to a coroutine function, enabling the use of Defer.

The coroutine function should receive an instance of Defer as its first positional argument; this will be provided by Defer.

class quattro.TaskGroup(*, concurrency_limit=None)

Bases: TaskGroup

Parameters:

concurrency_limit (int | None) – When provided, use a semaphore to limit the number of non-background tasks that run in parallel.

Changed in version 26.1.0: Added the concurrency_limit parameter.

create_background_task(coro, *, name=None, context=None)

Create, schedule and return a task.

When the task group shuts down, the task will be cancelled.

If this task finishes with an error, the entire task group will be cancelled, like with non-background tasks.

Background tasks do not count against the concurrency limit.

Parameters:
  • coro (_CoroutineLike[T])

  • name (str | None)

  • context (Context | None)

Return type:

Task[T]

create_task(coro, *, name=None, context=None)

Create a new task in this group and return it.

Similar to asyncio.create_task.

Parameters:
  • coro (_CoroutineLike[T])

  • name (str | None)

  • context (Context | None)

Return type:

Task[T]

quattro.defer: Final = <quattro._defer._defer object>

First wrap your coroutine function with defer.enable, then call me inside.

Additionally, defer has the following methods:

  • enter_context

quattro.fail_after(seconds)

Create a cancel scope with the given timeout, and raises an error if it is actually cancelled.

This function and move_on_after() are similar in that both create a cancel scope with a given timeout, and if the timeout expires then both will cause CancelledError to be raised within the scope. The difference is that when the CancelledError exception reaches move_on_after(), it’s caught and discarded. When it reaches fail_after(), then it’s caught and TimeoutError is raised in its place.

Parameters:

seconds (float)

Return type:

CancelScope

quattro.fail_at(deadline)

Create a cancel scope with the given deadline, and raises an error if it is actually cancelled.

This function and move_on_at() are similar in that both create a cancel scope with a given absolute deadline, and if the deadline expires then both will cause CancelledError to be raised within the scope. The difference is that when the CancelledError exception reaches move_on_at(), it’s caught and discarded. When it reaches fail_at(), then it’s caught and TimeoutError is raised in its place.

Parameters:

deadline (float)

Return type:

CancelScope

async quattro.gather(*coros, return_exceptions=False, concurrency_limit=None)

A safer version of asyncio.gather.

Uses a task group under the hood to not leak tasks in cases of errors in child tasks.

Parameters:
  • concurrency_limit (int | None) – When provided, limit the number of parallel tasks to this number.

  • coros (Coroutine)

  • return_exceptions (bool)

Return type:

tuple

Notable differences are:

  • If a child task fails other unfinished tasks will be cancelled, just like in a TaskGroup.

  • quattro.gather only accepts coroutines and not futures and generators, just like a TaskGroup.

  • When return_exceptions is false (the default), an exception in a child task will cause an ExceptionGroup to bubble out of the top-level gather() call, just like in a TaskGroup.

  • Results are returned as a tuple, not a list.

(See https://docs.python.org/3/library/asyncio-task.html#asyncio.gather)

Added in version 23.1.0.

Changed in version 26.1.0: Added the concurrency_limit parameter.

quattro.get_current_effective_deadline()
Return type:

float

quattro.move_on_after(seconds)

Use as a context manager to create a cancel scope whose deadline is set to now + seconds.

Parameters:

seconds (float)

Return type:

CancelScope

quattro.move_on_at(deadline)

Use as a context manager to create a cancel scope with the given absolute deadline.

Parameters:

deadline (float)

Return type:

CancelScope