Skip to content

psygnal #

Psygnal implements the observer pattern for Python.

It emulates the signal/slot pattern from Qt, but it does not require Qt.

Modules:

  • containers

    Containers backed by psygnal events.

  • qt

    Module that provides Qt-specific functionality for psygnal.

  • testing

    Utilities for testing psygnal Signals.

  • utils

    These utilities may help when using signals and evented objects.

Classes:

  • EmissionInfo

    Tuple containing information about an emission event.

  • EmitLoopError

    Error type raised when an exception occurs during a callback.

  • EventedModel

    A pydantic BaseModel that emits a signal whenever a field value is changed.

  • PathStep

    A single step in a path to a nested signal.

  • Signal

    Declares a signal emitter on a class.

  • SignalGroup

    A collection of signals that can be connected to as a single unit.

  • SignalGroupDescriptor

    Create a psygnal.SignalGroup on first instance attribute access.

  • SignalInstance

    A signal instance (optionally) bound to an object.

Functions:

  • debounced

    Create a debounced function that delays invoking func.

  • emit_queued

    Trigger emissions of all callbacks queued in the current thread.

  • evented

    A decorator to add events to a dataclass.

  • get_async_backend

    Get the current async backend. Returns None if no backend is set.

  • get_evented_namespace

    Return the name of the evented SignalGroup for an object.

  • is_evented

    Return True if the object or its class has been decorated with evented.

  • set_async_backend

    Set the async backend to use. Must be one of: 'asyncio', 'anyio', 'trio'.

  • throttled

    Create a throttled function that invokes func at most once per timeout.

EmissionInfo dataclass #

EmissionInfo(
    signal: SignalInstance,
    args: tuple[Any, ...],
    path: tuple[PathStep, ...] = (),
)

Tuple containing information about an emission event.

Attributes:

  • signal (SignalInstance) –

    The SignalInstance doing the emitting

  • args (tuple) –

    The args that were emitted

  • path (tuple[PathStep, ...]) –

    A tuple of PathStep objects that describe the path to the signal that was emitted. This is useful for nested signals, where the path can be used to determine the hierarchy of the signals. For example, if a signal is emitted from a nested object like bar.foo[3]['user'], the path will contain the steps to get to that object, such as (PathStep(attr='foo'), PathStep(index=3), PathStep(key='user')).

Methods:

  • insert_path

    Return a new EmissionInfo with the given path steps inserted at the front.

insert_path #

insert_path(*path: PathStep) -> EmissionInfo

Return a new EmissionInfo with the given path steps inserted at the front.

Source code in src/psygnal/_group.py
114
115
116
def insert_path(self, *path: PathStep) -> EmissionInfo:
    """Return a new EmissionInfo with the given path steps inserted at the front."""
    return EmissionInfo(self.signal, self.args, (*path, *self.path))

EmitLoopError #

EmitLoopError(
    exc: BaseException,
    signal: SignalInstance | None = None,
    recursion_depth: int = 0,
    reemission: str | None = None,
    emit_queue: Sequence[tuple] = (),
)

Bases: Exception

Error type raised when an exception occurs during a callback.

Source code in src/psygnal/_exceptions.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(
    self,
    exc: BaseException,
    signal: SignalInstance | None = None,
    recursion_depth: int = 0,
    reemission: str | None = None,
    emit_queue: Sequence[tuple] = (),
) -> None:
    # if isinstance(exc, EmitLoopError):
    #     super().__init__("nested EmitLoopError.")
    #     return

    self.__cause__ = exc

    # grab the signal name or repr
    if signal is None:  # pragma: no cover
        sig_name: str = ""
    elif instance := signal.instance:
        inst_class = instance.__class__
        mod = getattr(inst_class, "__module__", "")
        if mod:
            mod += "."
        sig_name = f"{mod}{inst_class.__qualname__}.{signal.name}"
    else:
        sig_name = signal.name

    msg = _build_psygnal_exception_msg(sig_name, exc, recursion_depth)

    # queued emission can be confusing, because the `signal.emit()` call shown
    # in the traceback will not match the emission that actually raised the error.
    if reemission == "queued" and (depth := len(emit_queue) - 1):
        msg += (
            "\nNOTE: reemission is set to 'queued', and this error occurred "
            f"at a queue-depth of {depth}.\nEmitting arguments: {emit_queue[-1]})\n"
        )

    super().__init__(msg)

EventedModel #

EventedModel(_model_self_, **data: Any)

Bases: BaseModel

A pydantic BaseModel that emits a signal whenever a field value is changed.

Important

This class requires pydantic to be installed. You can install directly (pip install pydantic) or by using the psygnal extra: pip install psygnal[pydantic]

In addition to standard pydantic BaseModel properties (see pydantic docs), this class adds the following:

  1. Gains an events attribute that is an instance of psygnal.SignalGroup. This group will have a signal for each field in the model (excluding private attributes and non-mutable fields). Whenever a field in the model is mutated, the corresponding signal will emit with the new value (see example below).

  2. Gains support for properties and property.setters (not supported in pydantic's BaseModel). Enable by adding allow_property_setters = True to your model Config.

  3. If you would like properties (i.e. "computed fields") to emit an event when one of the model fields it depends on is mutated you must set one of the following options in the Config:

    • field_dependencies may be a Dict[str, List[str]], where the keys are the names of properties, and the values are a list of field names (strings) that the property depends on for its value
    • guess_property_dependencies may be set to True to "guess" property dependencies by inspecting the source code of the property getter for.
  4. If you would like to allow custom fields to provide their own json_encoders, you can either:

    1. use the standard pydantic method of adding json_encoders to your model, for each field type you'd like to support: 1. This EventedModel class will additionally look for a _json_encode method on any field types in the model. If a field type declares a _json_encode method, it will be added to the json_encoders dict in the model Config. (Prefer using the standard pydantic method)

Examples:

Standard EventedModel example:

class MyModel(EventedModel):
    x: int = 1


m = MyModel()
m.events.x.connect(lambda v: print(f"new value is {v}"))
m.x = 3  # prints 'new value is 3'

An example of using property_setters and emitting signals when a field dependency is mutated.

class MyModel(EventedModel):
    a: int = 1
    b: int = 1

    @property
    def c(self) -> List[int]:
        return [self.a, self.b]

    @c.setter
    def c(self, val: Sequence[int]) -> None:
        self.a, self.b = val

    class Config:
        allow_property_setters = True
        field_dependencies = {"c": ["a", "b"]}


m = MyModel()
assert m.c == [1, 1]
m.events.c.connect(lambda v: print(f"c updated to {v}"))
m.a = 2  # prints 'c updated to [2, 1]'

Methods:

  • reset

    Reset the state of the model to default values.

  • update

    Update a model in place.

Attributes:

  • events (SignalGroup) –

    Return the SignalGroup containing all events for this model.

Source code in src/psygnal/_evented_model.py
406
407
408
409
410
411
412
def __init__(_model_self_, **data: Any) -> None:
    super().__init__(**data)
    Group = _model_self_.__signal_group__
    # the type error is "cannot assign to a class variable" ...
    # but if we don't use `ClassVar`, then the `dataclass_transform` decorator
    # will add _events: SignalGroup to the __init__ signature, for *all* user models
    _model_self_._events = Group(_model_self_)  # type: ignore [misc]

events property #

events: SignalGroup

Return the SignalGroup containing all events for this model.

reset #

reset() -> None

Reset the state of the model to default values.

Source code in src/psygnal/_evented_model.py
473
474
475
476
477
478
479
480
481
def reset(self) -> None:
    """Reset the state of the model to default values."""
    model_config = self.model_config
    model_fields = _get_fields(type(self))
    for name, value in self._defaults.items():
        if isinstance(value, EventedModel):
            cast("EventedModel", getattr(self, name)).reset()
        elif not model_config.get("frozen") and not model_fields[name].frozen:
            setattr(self, name, value)

update #

update(
    values: Union[EventedModel, dict], recurse: bool = True
) -> None

Update a model in place.

Parameters:

  • values #

    (Union[dict, EventedModel]) –

    Values to update the model with. If an EventedModel is passed it is first converted to a dictionary. The keys of this dictionary must be found as attributes on the current model.

  • recurse #

    (bool, default: True ) –

    If True, recursively update fields that are EventedModels. Otherwise, just update the immediate fields of this EventedModel, which is useful when the declared field type (e.g. Union) can have different realized types with different fields.

Source code in src/psygnal/_evented_model.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
def update(self, values: Union["EventedModel", dict], recurse: bool = True) -> None:
    """Update a model in place.

    Parameters
    ----------
    values : Union[dict, EventedModel]
        Values to update the model with. If an EventedModel is passed it is
        first converted to a dictionary. The keys of this dictionary must
        be found as attributes on the current model.
    recurse : bool
        If True, recursively update fields that are EventedModels.
        Otherwise, just update the immediate fields of this EventedModel,
        which is useful when the declared field type (e.g. ``Union``) can have
        different realized types with different fields.
    """
    if isinstance(values, pydantic.BaseModel):
        values = values.model_dump()

    if not isinstance(values, dict):  # pragma: no cover
        raise TypeError(f"values must be a dict or BaseModel. got {type(values)}")

    with self.events._psygnal_relay.paused():  # TODO: reduce?
        for key, value in values.items():
            field = getattr(self, key)
            if isinstance(field, EventedModel) and recurse:
                field.update(value, recurse=recurse)
            else:
                setattr(self, key, value)

PathStep dataclass #

PathStep(
    attr: str | None = None,
    index: SupportsIndex | None = None,
    key: Hashable | None = None,
)

A single step in a path to a nested signal.

This is used to represent a path to a signal that is nested inside an object, such as when a signal is emitted from a nested object or a list. The EmissionInfo.path is a tuple of PathStep objects, where each PathStep represents either:

  • an attribute access (e.g. .foo)
  • an index access (e.g. [3])
  • a key access (e.g. ['user'])

Note

yes, index and key both just get passed to __getitem__ in the end, but we differentiate them here to make it clearer whether the step is a key in a Mapping or an index in a Sequence.

Signal #

Signal(
    *types: type[Any] | Signature,
    description: str = "",
    name: str | None = None,
    check_nargs_on_connect: bool = True,
    check_types_on_connect: bool = False,
    reemission: ReemissionVal = DEFAULT_REEMISSION,
    signal_instance_class: type[SignalInstance]
    | None = None,
)

Declares a signal emitter on a class.

This is class implements the descriptor protocol and is designed to be used as a class attribute, with the supported signature types provided in the constructor:

from psygnal import Signal


class MyEmitter:
    changed = Signal(int)


def receiver(arg: int):
    print("new value:", arg)


emitter = MyEmitter()
emitter.changed.connect(receiver)
emitter.changed.emit(1)  # prints 'new value: 1'

Note

in the example above, MyEmitter.changed is an instance of Signal, and emitter.changed is an instance of SignalInstance. See the documentation on SignalInstance for details on how to connect to and/or emit a signal on an instance of an object that has a Signal.

Parameters:

  • *types #

    (Type[Any] | Signature, default: () ) –

    A sequence of individual types, or a single inspect.Signature object.

  • description #

    (str, default: '' ) –

    Optional descriptive text for the signal. (not used internally).

  • name #

    (str | None, default: None ) –

    Optional name of the signal. If it is not specified then the name of the class attribute that is bound to the signal will be used. default None

  • check_nargs_on_connect #

    (bool, default: True ) –

    Whether to check the number of positional args against signature when connecting a new callback. This can also be provided at connection time using .connect(..., check_nargs=True). By default, True.

  • check_types_on_connect #

    (bool, default: False ) –

    Whether to check the callback parameter types against signature when connecting a new callback. This can also be provided at connection time using .connect(..., check_types=True). By default, False.

  • reemission #

    (Literal['immediate', 'queued', 'latest-only'] | None, default: DEFAULT_REEMISSION ) –

    Determines the order and manner in which connected callbacks are invoked when a callback re-emits a signal. Default is "immediate".

    • "immediate": Signals emitted by callbacks are immediately processed in a deeper emission loop, before returning to process signals emitted at the current level (after all callbacks in the deeper level have been called).

    • "queued": Signals emitted by callbacks are enqueued for emission after the current level of emission is complete. This ensures all connected callbacks are called with the first emitted value, before any of them are called with values emitted while calling callbacks.

    • "latest-only": Signals emitted by callbacks are immediately processed in a deeper emission loop, and remaining callbacks in the current level are never called with the original value.

Methods:

  • current_emitter

    Return currently emitting SignalInstance, if any.

  • sender

    Return currently emitting object, if any.

Attributes:

Source code in src/psygnal/_signal.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def __init__(
    self,
    *types: type[Any] | Signature,
    description: str = "",
    name: str | None = None,
    check_nargs_on_connect: bool = True,
    check_types_on_connect: bool = False,
    reemission: ReemissionVal = DEFAULT_REEMISSION,
    signal_instance_class: type[SignalInstance] | None = None,
) -> None:
    self._name = name
    self.description = description
    self._check_nargs_on_connect = check_nargs_on_connect
    self._check_types_on_connect = check_types_on_connect
    self._reemission = reemission
    self._signal_instance_class = signal_instance_class or SignalInstance
    self._signal_instance_cache: dict[int, SignalInstance] = {}

    if types and isinstance(types[0], Signature):
        self._signature = types[0]
        if len(types) > 1:
            warnings.warn(
                "Only a single argument is accepted when directly providing a"
                f" `Signature`.  These args were ignored: {types[1:]}",
                stacklevel=2,
            )
    else:
        self._signature = _build_signature(*cast("tuple[type[Any], ...]", types))

signature property #

signature: Signature

Signature supported by this Signal.

current_emitter classmethod #

current_emitter() -> SignalInstance | None

Return currently emitting SignalInstance, if any.

This will typically be used in a callback.

Examples:

from psygnal import Signal


def my_callback():
    source = Signal.current_emitter()
Source code in src/psygnal/_signal.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
@classmethod
def current_emitter(cls) -> SignalInstance | None:
    """Return currently emitting `SignalInstance`, if any.

    This will typically be used in a callback.

    Examples
    --------
    ```python
    from psygnal import Signal


    def my_callback():
        source = Signal.current_emitter()
    ```
    """
    return cls._current_emitter

sender classmethod #

sender() -> Any

Return currently emitting object, if any.

This will typically be used in a callback.

Source code in src/psygnal/_signal.py
439
440
441
442
443
444
445
@classmethod
def sender(cls) -> Any:
    """Return currently emitting object, if any.

    This will typically be used in a callback.
    """
    return getattr(cls._current_emitter, "instance", None)

SignalGroup #

SignalGroup(instance: Any = None)

A collection of signals that can be connected to as a single unit.

This class is not intended to be instantiated directly. Instead, it should be subclassed, and the subclass should define Signal instances as class attributes. The SignalGroup will then automatically collect these signals and provide a SignalRelay instance (at group.all) that can be used to connect to all of the signals in the group.

This class is used in both the EventedModels and the evented dataclass patterns. See also: psygnal.SignalGroupDescriptor, which provides convenient and explicit way to create a SignalGroup on a dataclass-like class.

Parameters:

  • instance #

    (Any, default: None ) –

    An object to which this SignalGroup is bound, by default None

Attributes:

  • all (SignalRelay) –

    A special SignalRelay instance that can be used to connect to all signals in this group.

Examples:

from psygnal import Signal, SignalGroup


class MySignals(SignalGroup):
    sig1 = Signal()
    sig2 = Signal()


group = MySignals()
group.all.connect(print)  # connect to all signals in the group

list(group)  # ['sig1', 'sig2']
len(group)  # 2
group.sig1 is group["sig1"]  # True

Methods:

  • is_uniform

    Return true if all signals in the group have the same signature.

  • psygnals_uniform

    Return true if all signals in the group have the same signature.

Source code in src/psygnal/_group.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def __init__(self, instance: Any = None) -> None:
    cls = type(self)
    if not hasattr(cls, "_psygnal_signals"):
        raise TypeError(
            "Cannot instantiate `SignalGroup` directly.  Use a subclass instead."
        )

    self._psygnal_instances = {
        name: (
            sig._create_signal_instance(self)
            if name in cls._psygnal_name_conflicts
            else sig.__get__(self, cls)
        )
        for name, sig in cls._psygnal_signals.items()
    }
    self._psygnal_relay = SignalRelay(self._psygnal_instances, instance)

all property #

all: SignalRelay

SignalInstance that can be used to connect to all signals in this group.

Examples:

from psygnal import Signal, SignalGroup


class MySignals(SignalGroup):
    sig1 = Signal()
    sig2 = Signal()


group = MySignals()
group.sig2.connect(...)  # connect to a single signal by name
group.all.connect(...)  # connect to all signals in the group

instance property #

instance: Any

Object that owns this SignalGroup.

signals property #

signals: Mapping[str, SignalInstance]

A mapping of signal names to SignalInstance instances.

is_uniform classmethod #

is_uniform() -> bool

Return true if all signals in the group have the same signature.

Source code in src/psygnal/_group.py
568
569
570
571
572
573
574
575
576
577
@classmethod
def is_uniform(cls) -> bool:
    """Return true if all signals in the group have the same signature."""
    warnings.warn(
        "The `is_uniform` method on SignalGroup is deprecated. Use "
        "`psygnals_uniform` instead. This will be an error in v0.11.",
        FutureWarning,
        stacklevel=2,
    )
    return cls._psygnal_uniform

psygnals_uniform classmethod #

psygnals_uniform() -> bool

Return true if all signals in the group have the same signature.

Source code in src/psygnal/_group.py
563
564
565
566
@classmethod
def psygnals_uniform(cls) -> bool:
    """Return true if all signals in the group have the same signature."""
    return cls._psygnal_uniform

SignalGroupDescriptor #

SignalGroupDescriptor(
    *,
    equality_operators: dict[str, EqOperator] | None = None,
    warn_on_no_fields: bool = True,
    cache_on_instance: bool = True,
    patch_setattr: bool = True,
    signal_group_class: type[SignalGroup] | None = None,
    collect_fields: bool = True,
    connect_child_events: bool = True,
    signal_aliases: Mapping[str, str | None]
    | FieldAliasFunc
    | None = None,
)

Create a psygnal.SignalGroup on first instance attribute access.

This descriptor is designed to be used as a class attribute on a dataclass-like class (e.g. a dataclass, a pydantic.BaseModel, an attrs class, a msgspec.Struct) On first access of the descriptor on an instance, it will create a SignalGroup bound to the instance, with a SignalInstance for each field in the dataclass.

Important

Using this descriptor will patch the class's __setattr__ method to emit events when fields change. (That patching occurs on first access of the descriptor name on an instance). To prevent this patching, you can set patch_setattr=False when creating the descriptor, but then you will need to manually call emit on the appropriate SignalInstance when you want to emit an event. Or you can use evented_setattr yourself

from psygnal._group_descriptor import evented_setattr
from psygnal import SignalGroupDescriptor
from dataclasses import dataclass
from typing import ClassVar


@dataclass
class Foo:
    x: int
    _events: ClassVar = SignalGroupDescriptor(patch_setattr=False)

    @evented_setattr("_events")  # pass the name of your SignalGroup
    def __setattr__(self, name: str, value: Any) -> None:
        super().__setattr__(name, value)

This currently requires a private import, please open an issue if you would like to depend on this functionality.

Parameters:

  • equality_operators #

    (dict[str, Callable[[Any, Any], bool]], default: None ) –

    A dictionary mapping field names to custom equality operators, where an equality operator is a callable that accepts two arguments and returns True if the two objects are equal. This will be used when comparing the old and new values of a field to determine whether to emit an event. If not provided, the default equality operator is operator.eq, except for numpy arrays, where np.array_equal is used.

  • warn_on_no_fields #

    (bool, default: True ) –

    If True (the default), a warning will be emitted if no mutable dataclass-like fields are found on the object.

  • cache_on_instance #

    (bool, default: True ) –

    If True (the default), a newly-created SignalGroup instance will be cached on the instance itself, so that subsequent accesses to the descriptor will return the same SignalGroup instance. This makes for slightly faster subsequent access, but means that the owner instance will no longer be pickleable. If False, the SignalGroup instance will still be cached, but not on the instance itself.

  • patch_setattr #

    (bool, default: True ) –

    If True (the default), a new __setattr__ method will be created that emits events when fields change. If False, no __setattr__ method will be created. (This will prevent signal emission, and assumes you are using a different mechanism to emit signals when fields change.)

  • signal_group_class #

    (type[SignalGroup] | None, default: None ) –

    A custom SignalGroup class to use, SignalGroup if None, by default None

  • collect_fields #

    (bool, default: True ) –

    Create a signal for each field in the dataclass. If True, the SignalGroup instance will be a subclass of signal_group_class (SignalGroup if it is None). If False, a deepcopy of signal_group_class will be used. Default to True

  • connect_child_events #

    (bool, default: True ) –

    If True, will connect events from all fields on the dataclass whose type is also "evented" to the group on the parent object. An object is considered "evented" if the is_evented function returns True for it (i.e. it has been decorated with @evented, or if it has a SignalGroupDescriptor). This is useful for nested evented dataclasses, where you want to monitor events emitted from arbitrarily deep children on the parent object. By default True.

  • signal_aliases #

    (Mapping[str, str | None] | FieldAliasFunc | None, default: None ) –

    If defined, a mapping between field name and signal name. Field names that are not signal_aliases keys are not aliased (the signal name is the field name). If the dict value is None, do not create a signal associated with this field. If a callable, the signal name is the output of the function applied to the field name. If the output is None, no signal is created for this field. If None, defaults to an empty dict, no aliases. Default to None

Examples:

from typing import ClassVar
from dataclasses import dataclass
from psygnal import SignalGroupDescriptor


@dataclass
class Person:
    name: str
    age: int = 0
    events: ClassVar[SignalGroupDescriptor] = SignalGroupDescriptor()


john = Person("John", 40)
john.events.age.connect(print)
john.age += 1  # prints 41
Source code in src/psygnal/_group_descriptor.py
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
def __init__(
    self,
    *,
    equality_operators: dict[str, EqOperator] | None = None,
    warn_on_no_fields: bool = True,
    cache_on_instance: bool = True,
    patch_setattr: bool = True,
    signal_group_class: type[SignalGroup] | None = None,
    collect_fields: bool = True,
    connect_child_events: bool = True,
    signal_aliases: Mapping[str, str | None] | FieldAliasFunc | None = None,
):
    grp_cls = signal_group_class or SignalGroup
    if not (isinstance(grp_cls, type) and issubclass(grp_cls, SignalGroup)):
        raise TypeError(  # pragma: no cover
            f"'signal_group_class' must be a subclass of SignalGroup, not {grp_cls}"
        )
    if not collect_fields:
        if grp_cls is SignalGroup:
            raise ValueError(
                "Cannot use SignalGroup with `collect_fields=False`. "
                "Use a custom SignalGroup subclass instead."
            )

        if callable(signal_aliases):
            raise ValueError(
                "Cannot use a Callable for `signal_aliases` with "
                "`collect_fields=False`"
            )

    self._name: str | None = None
    self._eqop = tuple(equality_operators.items()) if equality_operators else None
    self._warn_on_no_fields = warn_on_no_fields
    self._cache_on_instance = cache_on_instance
    self._patch_setattr = patch_setattr
    self._connect_child_events = connect_child_events

    self._signal_group_class: type[SignalGroup] = grp_cls
    self._collect_fields = collect_fields
    self._signal_aliases = signal_aliases

    self._signal_groups: dict[int, type[SignalGroup]] = {}

SignalInstance #

SignalInstance(
    signature: Signature | tuple = _empty_signature,
    *,
    instance: Any = None,
    name: str | None = None,
    description: str = "",
    check_nargs_on_connect: bool = True,
    check_types_on_connect: bool = False,
    reemission: ReemissionVal = DEFAULT_REEMISSION,
)

A signal instance (optionally) bound to an object.

In most cases, users will not create a SignalInstance directly -- instead creating a Signal class attribute. This object will be instantiated by the Signal.__get__ method (i.e. the descriptor protocol), when a Signal instance is accessed from an instance of a class with Signal attribute.

However, it is the SignalInstance that you will most often be interacting with when you access the name of a Signal on an instance -- so understanding the SignalInstance API is key to using psygnal.

class Emitter:
    signal = Signal()


e = Emitter()

# when accessed on an *instance* of Emitter,
# the signal attribute will be a SignalInstance
e.signal

# This is what you will use to connect your callbacks
e.signal.connect(some_callback)

Parameters:

  • signature #

    (Signature | None, default: _empty_signature ) –

    The signature that this signal accepts and will emit, by default Signature().

  • instance #

    (Any, default: None ) –

    An object to which this signal is bound. Normally this will be provided by the Signal.__get__ method (see above). However, an unbound SignalInstance may also be created directly. by default None.

  • name #

    (str | None, default: None ) –

    An optional name for this signal. Normally this will be provided by the Signal.__get__ method. by default None

  • check_nargs_on_connect #

    (bool, default: True ) –

    Whether to check the number of positional args against signature when connecting a new callback. This can also be provided at connection time using .connect(..., check_nargs=True). By default, True.

  • check_types_on_connect #

    (bool, default: False ) –

    Whether to check the callback parameter types against signature when connecting a new callback. This can also be provided at connection time using .connect(..., check_types=True). By default, False.

  • reemission #

    (Literal['immediate', 'queued', 'latest-only'] | None, default: DEFAULT_REEMISSION ) –

    See docstring for Signal for details. By default, "immediate".

  • description #

    (str, default: '' ) –

    Optional descriptive text for the signal. (not used internally).

Attributes:

Raises:

  • TypeError

    If signature is neither an instance of inspect.Signature, or a tuple of types.

Methods:

  • block

    Block this signal from emitting.

  • blocked

    Context manager to temporarily block this signal.

  • connect

    Connect a callback (slot) to this signal.

  • connect_setattr

    Bind an object attribute to the emitted value of this signal.

  • connect_setitem

    Bind a container item (such as a dict key) to emitted value of this signal.

  • disconnect

    Disconnect slot from signal.

  • disconnect_setattr

    Disconnect a previously connected attribute setter.

  • disconnect_setitem

    Disconnect a previously connected item setter.

  • emit

    Emit this signal with arguments args.

  • emit_fast

    Fast emit without any checks.

  • pause

    Pause all emission and collect *args tuples from emit().

  • paused

    Context manager to temporarily pause this signal.

  • resume

    Resume (unpause) this signal, emitting everything in the queue.

  • unblock

    Unblock this signal, allowing it to emit.

Source code in src/psygnal/_signal.py
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
def __init__(
    self,
    signature: Signature | tuple = _empty_signature,
    *,
    instance: Any = None,
    name: str | None = None,
    description: str = "",
    check_nargs_on_connect: bool = True,
    check_types_on_connect: bool = False,
    reemission: ReemissionVal = DEFAULT_REEMISSION,
) -> None:
    if isinstance(signature, (list, tuple)):
        signature = _build_signature(*signature)
    elif not isinstance(signature, Signature):  # pragma: no cover
        raise TypeError(
            "`signature` must be either a sequence of types, or an "
            "instance of `inspect.Signature`"
        )

    self._description = description
    self._reemission = ReemissionMode.validate(reemission)
    self._name = name
    self._instance: Callable = self._instance_ref(instance)
    self._args_queue: list[tuple] = []  # filled when paused
    self._signature = signature
    self._check_nargs_on_connect = check_nargs_on_connect
    self._check_types_on_connect = check_types_on_connect
    self._slots: list[WeakCallback] = []
    self._is_blocked: bool = False
    self._is_paused: bool = False
    self._lock = threading.RLock()
    self._emit_queue: deque[tuple] = deque()
    self._recursion_depth: int = 0
    self._max_recursion_depth: int = 0
    self._run_emit_loop_inner: Callable[[], None]
    if self._reemission == ReemissionMode.QUEUED:
        self._run_emit_loop_inner = self._run_emit_loop_queued
    elif self._reemission == ReemissionMode.LATEST:
        self._run_emit_loop_inner = self._run_emit_loop_latest_only
    else:
        self._run_emit_loop_inner = self._run_emit_loop_immediate

    # whether any slots in self._slots have a priority other than 0
    self._priority_in_use = False

description property #

description: str

Description of this SignalInstance.

instance property #

instance: Any

Object that emits this SignalInstance.

name property #

name: str

Name of this SignalInstance.

signature property #

signature: Signature

Signature supported by this SignalInstance.

block #

block(
    exclude: Container[str | SignalInstance] = (),
) -> None

Block this signal from emitting.

NOTE: the exclude argument is only for SignalGroup subclass, but we have to include it here to make mypyc happy.

Source code in src/psygnal/_signal.py
1362
1363
1364
1365
1366
1367
1368
def block(self, exclude: Container[str | SignalInstance] = ()) -> None:
    """Block this signal from emitting.

    NOTE: the `exclude` argument is only for SignalGroup subclass, but we
    have to include it here to make mypyc happy.
    """
    self._is_blocked = True

blocked #

blocked() -> AbstractContextManager[None]

Context manager to temporarily block this signal.

Useful if you need to temporarily block all emission of a given signal, (for example, to avoid a recursive signal loop)

Examples:

class MyEmitter:
    changed = Signal()

    def make_a_change(self):
        self.changed.emit()

obj = MyEmitter()

with obj.changed.blocked()
    obj.make_a_change()  # will NOT emit a changed signal.
Source code in src/psygnal/_signal.py
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
def blocked(self) -> AbstractContextManager[None]:
    """Context manager to temporarily block this signal.

    Useful if you need to temporarily block all emission of a given signal,
    (for example, to avoid a recursive signal loop)

    Examples
    --------
    ```python
    class MyEmitter:
        changed = Signal()

        def make_a_change(self):
            self.changed.emit()

    obj = MyEmitter()

    with obj.changed.blocked()
        obj.make_a_change()  # will NOT emit a changed signal.
    ```
    """
    return _SignalBlocker(self)

connect #

connect(
    *,
    thread: Thread
    | Literal["main", "current"]
    | None = ...,
    check_nargs: bool | None = ...,
    check_types: bool | None = ...,
    unique: bool | Literal["raise"] = ...,
    max_args: int | None = None,
    on_ref_error: RefErrorChoice = ...,
    priority: int = ...,
    emit_on_evented_child_events: bool = ...,
) -> Callable[[F], F]
connect(
    slot: F,
    *,
    thread: Thread
    | Literal["main", "current"]
    | None = ...,
    check_nargs: bool | None = ...,
    check_types: bool | None = ...,
    unique: bool | Literal["raise"] = ...,
    max_args: int | None = None,
    on_ref_error: RefErrorChoice = ...,
    priority: int = ...,
    emit_on_evented_child_events: bool = ...,
) -> F
connect(
    slot: F | None = None,
    *,
    thread: Thread
    | Literal["main", "current"]
    | None = None,
    check_nargs: bool | None = None,
    check_types: bool | None = None,
    unique: bool | Literal["raise"] = False,
    max_args: int | None = None,
    on_ref_error: RefErrorChoice = "warn",
    priority: int = 0,
    emit_on_evented_child_events: bool = False,
) -> Callable[[F], F] | F

Connect a callback (slot) to this signal.

slot is compatible if:

  • it requires no more than the number of positional arguments emitted by this SignalInstance. (It may require less)
  • it has no required keyword arguments (keyword only arguments that have no default).
  • if check_types is True, the parameter types in the callback signature must match the signature of this SignalInstance.

This method may be used as a decorator.

@signal.connect
def my_function(): ...

Important

If a signal is connected with thread != None, then it is up to the user to ensure that psygnal.emit_queued is called, or that one of the backend convenience functions is used (e.g. psygnal.qt.start_emitting_from_queue). Otherwise, callbacks that are connected to signals that are emitted from another thread will never be called.

Parameters:

  • slot #

    (Callable, default: None ) –

    A callable to connect to this signal. If the callable accepts less arguments than the signature of this slot, then they will be discarded when calling the slot.

  • check_nargs #

    (Optional[bool], default: None ) –

    If True and the provided slot requires more positional arguments than the signature of this Signal, raise TypeError. by default True.

  • thread #

    (Thread | Literal['main', 'current'] | None, default: None ) –

    If None (the default), this slot will be invoked immediately when a signal is emitted, from whatever thread emitted the signal. If a thread object is provided, then the callback will only be immediately invoked if the signal is emitted from that thread. Otherwise, the callback will be added to a queue. Note!, when using the thread parameter, the user is responsible for calling psygnal.emit_queued() in the corresponding thread, otherwise the slot will never be invoked. (See note above). (The strings "main" and "current" are also accepted, and will be interpreted as the threading.main_thread() and threading.current_thread(), respectively).

  • check_types #

    (Optional[bool], default: None ) –

    If True, An additional check will be performed to make sure that types declared in the slot signature are compatible with the signature declared by this signal, by default False.

  • unique #

    (Union[bool, str, None], default: False ) –

    If True, returns without connecting if the slot has already been connected. If the literal string "raise" is passed to unique, then a ValueError will be raised if the slot is already connected. By default False.

  • max_args #

    (Optional[int], default: None ) –

    If provided, slot will be called with no more more than max_args when this SignalInstance is emitted. (regardless of how many arguments are emitted).

  • on_ref_error #

    (('raise', 'warn', 'ignore'), default: 'raise' ) –

    What to do if a weak reference cannot be created. If 'raise', a ReferenceError will be raised. If 'warn' (default), a warning will be issued and a strong-reference will be used. If 'ignore' a strong-reference will be used (silently).

  • priority #

    (int, default: 0 ) –

    The priority of the callback. This is used to determine the order in which callbacks are called when multiple are connected to the same signal. Higher priority callbacks are called first. Negative values are allowed. The default is 0.

  • emit_on_evented_child_events #

    (bool, default: False ) –

    If True, and if this is a SignalInstance associated with a specific field on an evented dataclass, and if that field itself is an evented dataclass, then the slot will be called both when the field is set directly, and when a child member of that field is set. For example, if Team is an evented-dataclass with a field leader: Person which is itself an evented-dataclass, then team.events.leader.connect(callback, emit_on_evented_child_events=True) will invoke callback even when team.leader.age is mutated (in addition to when team.leader is set directly).

Raises:

  • TypeError

    If a non-callable object is provided.

  • ValueError

    If the provided slot fails validation, either due to mismatched positional argument requirements, or failed type checking.

  • ValueError

    If unique is 'raise' and slot has already been connected.

Source code in src/psygnal/_signal.py
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
def connect(
    self,
    slot: F | None = None,
    *,
    thread: threading.Thread | Literal["main", "current"] | None = None,
    check_nargs: bool | None = None,
    check_types: bool | None = None,
    unique: bool | Literal["raise"] = False,
    max_args: int | None = None,
    on_ref_error: RefErrorChoice = "warn",
    priority: int = 0,
    emit_on_evented_child_events: bool = False,
) -> Callable[[F], F] | F:
    """Connect a callback (`slot`) to this signal.

    `slot` is compatible if:

    * it requires no more than the number of positional arguments emitted by this
      `SignalInstance`.  (It *may* require less)
    * it has no *required* keyword arguments (keyword only arguments that have
      no default).
    * if `check_types` is `True`, the parameter types in the callback signature must
      match the signature of this `SignalInstance`.

    This method may be used as a decorator.

    ```python
    @signal.connect
    def my_function(): ...
    ```

    !!!important
        If a signal is connected with `thread != None`, then it is up to the user
        to ensure that `psygnal.emit_queued` is called, or that one of the backend
        convenience functions is used (e.g. `psygnal.qt.start_emitting_from_queue`).
        Otherwise, callbacks that are connected to signals that are emitted from
        another thread will never be called.

    Parameters
    ----------
    slot : Callable
        A callable to connect to this signal.  If the callable accepts less
        arguments than the signature of this slot, then they will be discarded when
        calling the slot.
    check_nargs : Optional[bool]
        If `True` and the provided `slot` requires more positional arguments than
        the signature of this Signal, raise `TypeError`. by default `True`.
    thread: Thread | Literal["main", "current"] | None
        If `None` (the default), this slot will be invoked immediately when a signal
        is emitted, from whatever thread emitted the signal. If a thread object is
        provided, then the callback will only be immediately invoked if the signal
        is emitted from that thread.  Otherwise, the callback will be added to a
        queue. **Note!**, when using the `thread` parameter, the user is responsible
        for calling `psygnal.emit_queued()` in the corresponding thread, otherwise
        the slot will never be invoked. (See note above). (The strings `"main"` and
        `"current"` are also accepted, and will be interpreted as the
        `threading.main_thread()` and `threading.current_thread()`, respectively).
    check_types : Optional[bool]
        If `True`, An additional check will be performed to make sure that types
        declared in the slot signature are compatible with the signature
        declared by this signal, by default `False`.
    unique : Union[bool, str, None]
        If `True`, returns without connecting if the slot has already been
        connected.  If the literal string "raise" is passed to `unique`, then a
        `ValueError` will be raised if the slot is already connected.
        By default `False`.
    max_args : Optional[int]
        If provided, `slot` will be called with no more more than `max_args` when
        this SignalInstance is emitted.  (regardless of how many arguments are
        emitted).
    on_ref_error : {'raise', 'warn', 'ignore'}, optional
        What to do if a weak reference cannot be created.  If 'raise', a
        ReferenceError will be raised.  If 'warn' (default), a warning will be
        issued and a strong-reference will be used. If 'ignore' a strong-reference
        will be used (silently).
    priority : int
        The priority of the callback. This is used to determine the order in which
        callbacks are called when multiple are connected to the same signal.
        Higher priority callbacks are called first. Negative values are allowed.
        The default is 0.
    emit_on_evented_child_events : bool
        If `True`, and if this is a SignalInstance associated with a specific field
        on an evented dataclass, and if that field itself is an evented dataclass,
        then the slot will be called both when the field is set directly, *and* when
        a child member of that field is set.
        For example, if `Team` is an evented-dataclass with a field `leader: Person`
        which is itself an evented-dataclass, then
        `team.events.leader.connect(callback, emit_on_evented_child_events=True)`
        will invoke callback even when `team.leader.age` is mutated (in addition to
        when `team.leader` is set directly).

    Raises
    ------
    TypeError
        If a non-callable object is provided.
    ValueError
        If the provided slot fails validation, either due to mismatched positional
        argument requirements, or failed type checking.
    ValueError
        If `unique` is `'raise'` and `slot` has already been connected.
    """
    if check_nargs is None:
        check_nargs = self._check_nargs_on_connect
    if check_types is None:
        check_types = self._check_types_on_connect

    def _wrapper(
        slot: F,
        max_args: int | None = max_args,
        _on_ref_err: RefErrorChoice = on_ref_error,
    ) -> F:
        if not callable(slot):
            raise TypeError(f"Cannot connect to non-callable object: {slot}")

        with self._lock:
            if unique and slot in self:
                if unique == "raise":
                    raise ValueError(
                        "Slot already connect. Use `connect(..., unique=False)` "
                        "to allow duplicate connections"
                    )
                return slot

            slot_sig: Signature | None = None
            if check_nargs and (max_args is None):
                slot_sig, max_args, isqt = self._check_nargs(slot, self.signature)
                if isqt:
                    _on_ref_err = "ignore"
            if check_types:
                slot_sig = slot_sig or signature(slot)
                if not _parameter_types_match(slot, self.signature, slot_sig):
                    extra = f"- Slot types {slot_sig} do not match types in signal."
                    self._raise_connection_error(slot, extra)

            cb = weak_callback(
                slot,
                max_args=max_args,
                finalize=self._try_discard,
                on_ref_error=_on_ref_err,
                priority=priority,
            )
            if thread is not None:
                cb = QueuedCallback(cb, thread=thread)
            self._append_slot(cb)

        if emit_on_evented_child_events:
            self._connect_child_event_listener(slot)
        return slot

    return _wrapper if slot is None else _wrapper(slot)

connect_setattr #

connect_setattr(
    obj: object,
    attr: str,
    maxargs: int | None | object = _NULL,
    *,
    on_ref_error: RefErrorChoice = "warn",
    priority: int = 0,
) -> WeakCallback[None]

Bind an object attribute to the emitted value of this signal.

Equivalent to calling self.connect(functools.partial(setattr, obj, attr)), but with additional weakref safety (i.e. a strong reference to obj will not be retained). The return object can be used to disconnect(), (or you can use disconnect_setattr()).

Parameters:

  • obj #

    (object) –

    An object.

  • attr #

    (str) –

    The name of an attribute on obj that should be set to the value of this signal when emitted.

  • maxargs #

    (Optional[int], default: _NULL ) –

    max number of positional args to accept

  • on_ref_error #

    (RefErrorChoice, default: 'warn' ) –

    What to do if a weak reference cannot be created. If 'raise', a ReferenceError will be raised. If 'warn' (default), a warning will be issued and a strong-reference will be used. If 'ignore' a strong-reference will be used (silently).

  • priority #

    (int, default: 0 ) –

    The priority of the callback. This is used to determine the order in which callbacks are called when multiple are connected to the same signal. Higher priority callbacks are called first. Negative values are allowed. The default is 0.

Returns:

  • Tuple

    (weakref.ref, name, callable). Reference to the object, name of the attribute, and setattr closure. Can be used to disconnect the slot.

Raises:

Examples:

>>> class T:
...     sig = Signal(int)
>>> class SomeObj:
...     x = 1
>>> t = T()
>>> my_obj = SomeObj()
>>> t.sig.connect_setattr(my_obj, "x")
>>> t.sig.emit(5)
>>> assert my_obj.x == 5
Source code in src/psygnal/_signal.py
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
def connect_setattr(
    self,
    obj: object,
    attr: str,
    maxargs: int | None | object = _NULL,
    *,
    on_ref_error: RefErrorChoice = "warn",
    priority: int = 0,
) -> WeakCallback[None]:
    """Bind an object attribute to the emitted value of this signal.

    Equivalent to calling `self.connect(functools.partial(setattr, obj, attr))`,
    but with additional weakref safety (i.e. a strong reference to `obj` will not
    be retained). The return object can be used to
    [`disconnect()`][psygnal.SignalInstance.disconnect], (or you can use
    [`disconnect_setattr()`][psygnal.SignalInstance.disconnect_setattr]).

    Parameters
    ----------
    obj : object
        An object.
    attr : str
        The name of an attribute on `obj` that should be set to the value of this
        signal when emitted.
    maxargs : Optional[int]
        max number of positional args to accept
    on_ref_error: {'raise', 'warn', 'ignore'}, optional
        What to do if a weak reference cannot be created.  If 'raise', a
        ReferenceError will be raised.  If 'warn' (default), a warning will be
        issued and a strong-reference will be used. If 'ignore' a strong-reference
        will be used (silently).
    priority : int
        The priority of the callback. This is used to determine the order in which
        callbacks are called when multiple are connected to the same signal.
        Higher priority callbacks are called first. Negative values are allowed.
        The default is 0.

    Returns
    -------
    Tuple
        (weakref.ref, name, callable).  Reference to the object, name of the
        attribute, and setattr closure.  Can be used to disconnect the slot.

    Raises
    ------
    ValueError
        If this is not a single-value signal
    AttributeError
        If `obj` has no attribute `attr`.

    Examples
    --------
    >>> class T:
    ...     sig = Signal(int)
    >>> class SomeObj:
    ...     x = 1
    >>> t = T()
    >>> my_obj = SomeObj()
    >>> t.sig.connect_setattr(my_obj, "x")
    >>> t.sig.emit(5)
    >>> assert my_obj.x == 5
    """
    if maxargs is _NULL:
        warnings.warn(
            "The default value of maxargs will change from `None` to `1` in "
            "version 0.11. To silence this warning, provide an explicit value for "
            "maxargs (`None` for current behavior, `1` for future behavior).",
            FutureWarning,
            stacklevel=2,
        )
        maxargs = None

    if not hasattr(obj, attr):
        raise AttributeError(f"Object {obj} has no attribute {attr!r}")

    with self._lock:
        caller = WeakSetattr(
            obj,
            attr,
            max_args=cast("int | None", maxargs),
            finalize=self._try_discard,
            on_ref_error=on_ref_error,
            priority=priority,
        )
        self._append_slot(caller)
    return caller

connect_setitem #

connect_setitem(
    obj: object,
    key: str,
    maxargs: int | None | object = _NULL,
    *,
    on_ref_error: RefErrorChoice = "warn",
    priority: int = 0,
) -> WeakCallback[None]

Bind a container item (such as a dict key) to emitted value of this signal.

Equivalent to calling self.connect(functools.partial(obj.__setitem__, attr)), but with additional weakref safety (i.e. a strong reference to obj will not be retained). The return object can be used to disconnect(), (or you can use disconnect_setitem()).

Parameters:

  • obj #

    (object) –

    An object.

  • key #

    (str) –

    Name of the key in obj that should be set to the value of this signal when emitted

  • maxargs #

    (Optional[int], default: _NULL ) –

    max number of positional args to accept

  • on_ref_error #

    (RefErrorChoice, default: 'warn' ) –

    What to do if a weak reference cannot be created. If 'raise', a ReferenceError will be raised. If 'warn' (default), a warning will be issued and a strong-reference will be used. If 'ignore' a strong-reference will be used (silently).

  • priority #

    (int, default: 0 ) –

    The priority of the callback. This is used to determine the order in which callbacks are called when multiple are connected to the same signal. Higher priority callbacks are called first. Negative values are allowed. The default is 0.

Returns:

  • Tuple

    (weakref.ref, name, callable). Reference to the object, name of the attribute, and setitem closure. Can be used to disconnect the slot.

Raises:

  • ValueError

    If this is not a single-value signal

  • TypeError

    If obj does not support setitem.

Examples:

>>> class T:
...     sig = Signal(int)
>>> t = T()
>>> my_obj = dict()
>>> t.sig.connect_setitem(my_obj, "x")
>>> t.sig.emit(5)
>>> assert my_obj == {"x": 5}
Source code in src/psygnal/_signal.py
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
def connect_setitem(
    self,
    obj: object,
    key: str,
    maxargs: int | None | object = _NULL,
    *,
    on_ref_error: RefErrorChoice = "warn",
    priority: int = 0,
) -> WeakCallback[None]:
    """Bind a container item (such as a dict key) to emitted value of this signal.

    Equivalent to calling `self.connect(functools.partial(obj.__setitem__, attr))`,
    but with additional weakref safety (i.e. a strong reference to `obj` will not
    be retained). The return object can be used to
    [`disconnect()`][psygnal.SignalInstance.disconnect], (or you can use
    [`disconnect_setitem()`][psygnal.SignalInstance.disconnect_setitem]).

    Parameters
    ----------
    obj : object
        An object.
    key : str
        Name of the key in `obj` that should be set to the value of this
        signal when emitted
    maxargs : Optional[int]
        max number of positional args to accept
    on_ref_error: {'raise', 'warn', 'ignore'}, optional
        What to do if a weak reference cannot be created.  If 'raise', a
        ReferenceError will be raised.  If 'warn' (default), a warning will be
        issued and a strong-reference will be used. If 'ignore' a strong-reference
        will be used (silently).
    priority : int
        The priority of the callback. This is used to determine the order in which
        callbacks are called when multiple are connected to the same signal.
        Higher priority callbacks are called first. Negative values are allowed.
        The default is 0.

    Returns
    -------
    Tuple
        (weakref.ref, name, callable).  Reference to the object, name of the
        attribute, and setitem closure.  Can be used to disconnect the slot.

    Raises
    ------
    ValueError
        If this is not a single-value signal
    TypeError
        If `obj` does not support __setitem__.

    Examples
    --------
    >>> class T:
    ...     sig = Signal(int)
    >>> t = T()
    >>> my_obj = dict()
    >>> t.sig.connect_setitem(my_obj, "x")
    >>> t.sig.emit(5)
    >>> assert my_obj == {"x": 5}
    """
    if maxargs is _NULL:
        warnings.warn(
            "The default value of maxargs will change from `None` to `1` in"
            "version 0.11. To silence this warning, provide an explicit value for "
            "maxargs (`None` for current behavior, `1` for future behavior).",
            FutureWarning,
            stacklevel=2,
        )
        maxargs = None

    if not hasattr(obj, "__setitem__"):
        raise TypeError(f"Object {obj} does not support __setitem__")

    with self._lock:
        caller = WeakSetitem(
            obj,
            key,
            max_args=cast("int | None", maxargs),
            finalize=self._try_discard,
            on_ref_error=on_ref_error,
            priority=priority,
        )
        self._append_slot(caller)

    return caller

disconnect #

disconnect(
    slot: Callable | None = None, missing_ok: bool = True
) -> None

Disconnect slot from signal.

Parameters:

  • slot #

    (callable, default: None ) –

    The specific slot to disconnect. If None, all slots will be disconnected, by default None

  • missing_ok #

    (Optional[bool], default: True ) –

    If False and the provided slot is not connected, raises ValueError. by defaultTrue`

Raises:

  • ValueError

    If slot is not connected and missing_ok is False.

Source code in src/psygnal/_signal.py
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
def disconnect(self, slot: Callable | None = None, missing_ok: bool = True) -> None:
    """Disconnect slot from signal.

    Parameters
    ----------
    slot : callable, optional
        The specific slot to disconnect.  If `None`, all slots will be disconnected,
        by default `None`
    missing_ok : Optional[bool]
        If `False` and the provided `slot` is not connected, raises `ValueError.
        by default `True`

    Raises
    ------
    ValueError
        If `slot` is not connected and `missing_ok` is False.
    """
    with self._lock:
        if slot is None:
            # NOTE: clearing an empty list is actually a RuntimeError in Qt
            self._remove_slot("all")
            return

        idx = self._slot_index(slot)
        if idx != -1:
            self._remove_slot(idx)
        elif not missing_ok:
            raise ValueError(f"slot is not connected: {slot}")

disconnect_setattr #

disconnect_setattr(
    obj: object, attr: str, missing_ok: bool = True
) -> None

Disconnect a previously connected attribute setter.

Parameters:

  • obj #

    (object) –

    An object.

  • attr #

    (str) –

    The name of an attribute on obj that was previously used for connect_setattr.

  • missing_ok #

    (bool, default: True ) –

    If False and the provided slot is not connected, raises ValueError. by default True

Raises:

  • ValueError

    If missing_ok is True and no attribute setter is connected.

Source code in src/psygnal/_signal.py
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
def disconnect_setattr(
    self, obj: object, attr: str, missing_ok: bool = True
) -> None:
    """Disconnect a previously connected attribute setter.

    Parameters
    ----------
    obj : object
        An object.
    attr : str
        The name of an attribute on `obj` that was previously used for
        `connect_setattr`.
    missing_ok : bool
        If `False` and the provided `slot` is not connected, raises `ValueError`.
        by default `True`

    Raises
    ------
    ValueError
        If `missing_ok` is `True` and no attribute setter is connected.
    """
    with self._lock:
        cb = WeakSetattr(obj, attr, on_ref_error="ignore")
        self._try_discard(cb, missing_ok)

disconnect_setitem #

disconnect_setitem(
    obj: object, key: str, missing_ok: bool = True
) -> None

Disconnect a previously connected item setter.

Parameters:

  • obj #

    (object) –

    An object.

  • key #

    (str) –

    The name of a key in obj that was previously used for connect_setitem.

  • missing_ok #

    (bool, default: True ) –

    If False and the provided slot is not connected, raises ValueError. by default True

Raises:

  • ValueError

    If missing_ok is True and no item setter is connected.

Source code in src/psygnal/_signal.py
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
def disconnect_setitem(
    self, obj: object, key: str, missing_ok: bool = True
) -> None:
    """Disconnect a previously connected item setter.

    Parameters
    ----------
    obj : object
        An object.
    key : str
        The name of a key in `obj` that was previously used for
        `connect_setitem`.
    missing_ok : bool
        If `False` and the provided `slot` is not connected, raises `ValueError`.
        by default `True`

    Raises
    ------
    ValueError
        If `missing_ok` is `True` and no item setter is connected.
    """
    if not hasattr(obj, "__setitem__"):
        raise TypeError(f"Object {obj} does not support __setitem__")

    with self._lock:
        caller = WeakSetitem(obj, key, on_ref_error="ignore")
        self._try_discard(caller, missing_ok)

emit #

emit(
    *args: Any,
    check_nargs: bool = False,
    check_types: bool = False,
) -> None

Emit this signal with arguments args.

Note

check_args and check_types both add overhead when calling emit.

Parameters:

  • *args #

    (Any, default: () ) –

    These arguments will be passed when calling each slot (unless the slot accepts fewer arguments, in which case extra args will be discarded.)

  • check_nargs #

    (bool, default: False ) –

    If False and the provided arguments cannot be successfully bound to the signature of this Signal, raise TypeError. Incurs some overhead. by default False.

  • check_types #

    (bool, default: False ) –

    If False and the provided arguments do not match the types declared by the signature of this Signal, raise TypeError. Incurs some overhead. by default False.

Raises:

  • TypeError

    If check_nargs and/or check_types are True, and the corresponding checks fail.

Source code in src/psygnal/_signal.py
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
def emit(
    self, *args: Any, check_nargs: bool = False, check_types: bool = False
) -> None:
    """Emit this signal with arguments `args`.

    !!! note

        `check_args` and `check_types` both add overhead when calling emit.

    Parameters
    ----------
    *args : Any
        These arguments will be passed when calling each slot (unless the slot
        accepts fewer arguments, in which case extra args will be discarded.)
    check_nargs : bool
        If `False` and the provided arguments cannot be successfully bound to the
        signature of this Signal, raise `TypeError`.  Incurs some overhead.
        by default False.
    check_types : bool
        If `False` and the provided arguments do not match the types declared by
        the signature of this Signal, raise `TypeError`.  Incurs some overhead.
        by default False.

    Raises
    ------
    TypeError
        If `check_nargs` and/or `check_types` are `True`, and the corresponding
        checks fail.
    """
    if self._is_blocked:
        return

    if check_nargs:
        try:
            self.signature.bind(*args)
        except TypeError as e:
            raise TypeError(
                f"Cannot emit args {args} from signal {self!r} with "
                f"signature {self.signature}:\n{e}"
            ) from e

    if check_types and not _parameter_types_match(
        lambda: None, self.signature, _build_signature(*[type(a) for a in args])
    ):
        raise TypeError(
            f"Types provided to '{self.name}.emit' "
            f"{tuple(type(a).__name__ for a in args)} do not match signal "
            f"signature: {self.signature}"
        )

    if self._is_paused:
        self._args_queue.append(args)
        return

    if SignalInstance._debug_hook is not None:
        from ._group import EmissionInfo

        SignalInstance._debug_hook(EmissionInfo(self, args))

    self._run_emit_loop(args)

emit_fast #

emit_fast(*args: Any) -> None

Fast emit without any checks.

This method can be up to 10x faster than emit(), but it lacks most of the features and safety checks of emit(). Use with caution. Specifically:

  • It does not support check_nargs or check_types
  • It does not use any thread safety locks.
  • It is not possible to query the emitter with Signal.current_emitter()
  • It is not possible to query the sender with Signal.sender()
  • It does not support "queued" or "latest-only" reemission modes for nested emits. It will always use "immediate" mode, wherein signals emitted by callbacks are immediately processed in a deeper emission loop.

It DOES, however, support paused() and blocked()

Parameters:

  • *args #

    (Any, default: () ) –

    These arguments will be passed when calling each slot (unless the slot accepts fewer arguments, in which case extra args will be discarded.)

Source code in src/psygnal/_signal.py
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
def emit_fast(self, *args: Any) -> None:
    """Fast emit without any checks.

    This method can be up to 10x faster than `emit()`, but it lacks most of the
    features and safety checks of `emit()`. Use with caution.  Specifically:

    - It does not support `check_nargs` or `check_types`
    - It does not use any thread safety locks.
    - It is not possible to query the emitter with `Signal.current_emitter()`
    - It is not possible to query the sender with `Signal.sender()`
    - It does not support "queued" or "latest-only" reemission modes for nested
      emits. It will always use "immediate" mode, wherein signals emitted by
      callbacks are immediately processed in a deeper emission loop.

    It DOES, however, support `paused()` and `blocked()`

    Parameters
    ----------
    *args : Any
        These arguments will be passed when calling each slot (unless the slot
        accepts fewer arguments, in which case extra args will be discarded.)
    """
    if self._is_blocked:
        return

    if self._is_paused:
        self._args_queue.append(args)
        return

    if self._recursion_depth >= RECURSION_LIMIT:
        raise RecursionError(
            f"Psygnal recursion limit ({RECURSION_LIMIT}) reached when emitting "
            f"signal {self.name!r} with args {args}"
        )

    self._recursion_depth += 1
    try:
        for caller in self._slots:
            caller.cb(args)
    except RecursionError as e:
        raise RecursionError(
            f"RecursionError when emitting signal {self.name!r} with args {args}"
        ) from e
    except EmitLoopError as e:  # pragma: no cover
        raise e
    except Exception as cb_err:
        loop_err = EmitLoopError(exc=cb_err, signal=self).with_traceback(
            cb_err.__traceback__
        )
        # this comment will show up in the traceback
        raise loop_err from cb_err  # emit() call ABOVE || callback error BELOW
    finally:
        if self._recursion_depth > 0:
            self._recursion_depth -= 1

pause #

pause() -> None

Pause all emission and collect *args tuples from emit().

args passed to emit will be collected and re-emitted when resume() is called. For a context manager version, see paused().

Source code in src/psygnal/_signal.py
1397
1398
1399
1400
1401
1402
1403
def pause(self) -> None:
    """Pause all emission and collect *args tuples from emit().

    args passed to `emit` will be collected and re-emitted when `resume()` is
    called. For a context manager version, see `paused()`.
    """
    self._is_paused = True

paused #

paused(
    reducer: ReducerFunc | None = None, initial: Any = _NULL
) -> AbstractContextManager[None]

Context manager to temporarily pause this signal.

Parameters:

  • reducer #

    (Callable | None, default: None ) –

    A optional function to reduce the args collected while paused into a single emitted group of args. If not provided, all emissions will be re-emitted as they were collected when the signal is resumed. May be:

    • a function that takes two args tuples and returns a single args tuple. This will be passed to functools.reduce and is expected to reduce all collected/emitted args into a single tuple. For example, three emit(1) events would be reduced and re-emitted as follows: self.emit(*functools.reduce(reducer, [(1,), (1,), (1,)]))
    • a function that takes a single argument (an iterable of args tuples) and returns a tuple (the reduced args). This will be not be passed to functools.reduce. If reducer is a function that takes a single argument, initial will be ignored.
  • initial #

    (Any, default: _NULL ) –

    initial value to pass to functools.reduce

Examples:

>>> with obj.signal.paused(lambda a, b: (a[0].union(set(b)),), (set(),)):
...     t.sig.emit(1)
...     t.sig.emit(2)
...     t.sig.emit(3)
>>> # results in obj.signal.emit({1, 2, 3})
Source code in src/psygnal/_signal.py
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
def paused(
    self, reducer: ReducerFunc | None = None, initial: Any = _NULL
) -> AbstractContextManager[None]:
    """Context manager to temporarily pause this signal.

    Parameters
    ----------
    reducer : Callable | None
        A optional function to reduce the args collected while paused into a single
        emitted group of args.  If not provided, all emissions will be re-emitted
        as they were collected when the signal is resumed. May be:

        - a function that takes two args tuples and returns a single args tuple.
          This will be passed to `functools.reduce` and is expected to reduce all
          collected/emitted args into a single tuple.
          For example, three `emit(1)` events would be reduced and re-emitted as
          follows: `self.emit(*functools.reduce(reducer, [(1,), (1,), (1,)]))`
        - a function that takes a single argument (an iterable of args tuples) and
          returns a tuple (the reduced args). This will be *not* be passed to
          `functools.reduce`. If `reducer` is a function that takes a single
          argument, `initial` will be ignored.
    initial: any, optional
        initial value to pass to `functools.reduce`

    Examples
    --------
    >>> with obj.signal.paused(lambda a, b: (a[0].union(set(b)),), (set(),)):
    ...     t.sig.emit(1)
    ...     t.sig.emit(2)
    ...     t.sig.emit(3)
    >>> # results in obj.signal.emit({1, 2, 3})
    """
    return _SignalPauser(self, reducer, initial)

resume #

resume(
    reducer: ReducerFunc | None = None, initial: Any = _NULL
) -> None

Resume (unpause) this signal, emitting everything in the queue.

Parameters:

  • reducer #

    (Callable | None, default: None ) –

    A optional function to reduce the args collected while paused into a single emitted group of args. If not provided, all emissions will be re-emitted as they were collected when the signal is resumed. May be:

    • a function that takes two args tuples and returns a single args tuple. This will be passed to functools.reduce and is expected to reduce all collected/emitted args into a single tuple. For example, three emit(1) events would be reduced and re-emitted as follows: self.emit(*functools.reduce(reducer, [(1,), (1,), (1,)]))
    • a function that takes a single argument (an iterable of args tuples) and returns a tuple (the reduced args). This will be not be passed to functools.reduce. If reducer is a function that takes a single argument, initial will be ignored.
  • initial #

    (Any, default: _NULL ) –

    initial value to pass to functools.reduce

Examples:

>>> class T:
...     sig = Signal(int)
>>> t = T()
>>> t.sig.pause()
>>> t.sig.emit(1)
>>> t.sig.emit(2)
>>> t.sig.emit(3)
>>> t.sig.resume(lambda a, b: (a[0].union(set(b)),), (set(),))
>>> # results in t.sig.emit({1, 2, 3})
Source code in src/psygnal/_signal.py
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
def resume(self, reducer: ReducerFunc | None = None, initial: Any = _NULL) -> None:
    """Resume (unpause) this signal, emitting everything in the queue.

    Parameters
    ----------
    reducer : Callable | None
        A optional function to reduce the args collected while paused into a single
        emitted group of args.  If not provided, all emissions will be re-emitted
        as they were collected when the signal is resumed. May be:

        - a function that takes two args tuples and returns a single args tuple.
          This will be passed to `functools.reduce` and is expected to reduce all
          collected/emitted args into a single tuple.
          For example, three `emit(1)` events would be reduced and re-emitted as
          follows: `self.emit(*functools.reduce(reducer, [(1,), (1,), (1,)]))`
        - a function that takes a single argument (an iterable of args tuples) and
          returns a tuple (the reduced args). This will be *not* be passed to
          `functools.reduce`. If `reducer` is a function that takes a single
          argument, `initial` will be ignored.
    initial: any, optional
        initial value to pass to `functools.reduce`

    Examples
    --------
    >>> class T:
    ...     sig = Signal(int)
    >>> t = T()
    >>> t.sig.pause()
    >>> t.sig.emit(1)
    >>> t.sig.emit(2)
    >>> t.sig.emit(3)
    >>> t.sig.resume(lambda a, b: (a[0].union(set(b)),), (set(),))
    >>> # results in t.sig.emit({1, 2, 3})
    """
    self._is_paused = False
    # not sure why this attribute wouldn't be set, but when resuming in
    # EventedModel.update, it may be undefined (as seen in tests)
    if not getattr(self, "_args_queue", None):
        return
    if len(self._slots) == 0:
        self._args_queue.clear()
        return

    if reducer is not None:
        if len(inspect.signature(reducer).parameters) == 1:
            args = cast("ReducerOneArg", reducer)(self._args_queue)
        else:
            reducer = cast("ReducerTwoArgs", reducer)
            if initial is _NULL:
                args = reduce(reducer, self._args_queue)
            else:
                args = reduce(reducer, self._args_queue, initial)
        self._run_emit_loop(args)
    else:
        for args in self._args_queue:
            self._run_emit_loop(args)
    self._args_queue.clear()

unblock #

unblock() -> None

Unblock this signal, allowing it to emit.

Source code in src/psygnal/_signal.py
1370
1371
1372
def unblock(self) -> None:
    """Unblock this signal, allowing it to emit."""
    self._is_blocked = False

debounced #

debounced(
    func: Callable[P, Any],
    timeout: int = 100,
    leading: bool = False,
) -> Debouncer[P]
debounced(
    func: Literal[None] | None = None,
    timeout: int = 100,
    leading: bool = False,
) -> Callable[[Callable[P, Any]], Debouncer[P]]
debounced(
    func: Callable[P, Any] | None = None,
    timeout: int = 100,
    leading: bool = False,
) -> (
    Debouncer[P]
    | Callable[[Callable[P, Any]], Debouncer[P]]
)

Create a debounced function that delays invoking func.

func will not be invoked until timeout ms have elapsed since the last time the debounced function was invoked.

The debounced function comes with a cancel method to cancel delayed func invocations and a flush method to immediately invoke them. Options indicate whether func should be invoked on the leading and/or trailing edge of the wait timeout. The func is invoked with the last arguments provided to the debounced function. Subsequent calls to the debounced function return the result of the last func invocation.

This decorator may be used with or without parameters.

Parameters:

  • func #

    (Callable, default: None ) –

    A function to throttle

  • timeout #

    (int, default: 100 ) –

    Timeout in milliseconds to wait before allowing another call, by default 100

  • leading #

    (bool, default: False ) –

    Whether to invoke the function on the leading edge of the wait timer, by default False

Examples:

from psygnal import Signal, debounced

class MyEmitter:
    changed = Signal(int)

def on_change(val: int)
    # do something possibly expensive
    ...

emitter = MyEmitter()

# connect the `on_change` whenever `emitter.changed` is emitted
# ONLY once at least 50 milliseconds have passed since the last signal emission.
emitter.changed.connect(debounced(on_change, timeout=50))
Source code in src/psygnal/_throttler.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def debounced(
    func: Callable[P, Any] | None = None,
    timeout: int = 100,
    leading: bool = False,
) -> Debouncer[P] | Callable[[Callable[P, Any]], Debouncer[P]]:
    """Create a debounced function that delays invoking `func`.

    `func` will not be invoked until `timeout` ms have elapsed since the last time
    the debounced function was invoked.

    The debounced function comes with a `cancel` method to cancel delayed func
    invocations and a `flush` method to immediately invoke them. Options
    indicate whether func should be invoked on the leading and/or trailing edge
    of the wait timeout. The func is invoked with the *last* arguments provided to
    the debounced function. Subsequent calls to the debounced function return the
    result of the last `func` invocation.

    This decorator may be used with or without parameters.

    Parameters
    ----------
    func : Callable
        A function to throttle
    timeout : int
        Timeout in milliseconds to wait before allowing another call, by default 100
    leading : bool
        Whether to invoke the function on the leading edge of the wait timer,
        by default False

    Examples
    --------
    ```python
    from psygnal import Signal, debounced

    class MyEmitter:
        changed = Signal(int)

    def on_change(val: int)
        # do something possibly expensive
        ...

    emitter = MyEmitter()

    # connect the `on_change` whenever `emitter.changed` is emitted
    # ONLY once at least 50 milliseconds have passed since the last signal emission.
    emitter.changed.connect(debounced(on_change, timeout=50))
    ```
    """

    def deco(func: Callable[P, Any]) -> Debouncer[P]:
        policy: EmissionPolicy = "leading" if leading else "trailing"
        return Debouncer(func, timeout, policy)

    return deco(func) if func is not None else deco

emit_queued #

emit_queued(thread: Thread | None = None) -> None

Trigger emissions of all callbacks queued in the current thread.

Parameters:

  • thread #

    (Thread, default: None ) –

    The thread on which to invoke the callback. If not provided, the main thread will be used.

Raises:

  • EmitLoopError

    If an exception is raised while invoking a queued callback. This exception can be caught and optionally suppressed or handled by the caller, allowing the emission of other queued callbacks to continue even if one of them raises an exception.

Source code in src/psygnal/_queue.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def emit_queued(thread: Thread | None = None) -> None:
    """Trigger emissions of all callbacks queued in the current thread.

    Parameters
    ----------
    thread : Thread, optional
        The thread on which to invoke the callback.  If not provided, the main
        thread will be used.

    Raises
    ------
    EmitLoopError
        If an exception is raised while invoking a queued callback.
        This exception can be caught and optionally suppressed or handled by the caller,
        allowing the emission of other queued callbacks to continue even if one of them
        raises an exception.
    """
    _thread = current_thread() if thread is None else thread
    queue = QueuedCallback._GLOBAL_QUEUE[_thread]

    while not queue.empty():
        cb, args = queue.get()
        try:
            cb(args)
        except Exception as e:  # pragma: no cover
            raise EmitLoopError(exc=e) from e

evented #

evented(
    cls: T,
    *,
    events_namespace: str = "events",
    equality_operators: dict[str, EqOperator] | None = None,
    warn_on_no_fields: bool = ...,
    cache_on_instance: bool = ...,
    connect_child_events: bool = ...,
    signal_aliases: Mapping[str, str | None]
    | FieldAliasFunc
    | None = ...,
) -> T
evented(
    cls: Literal[None] | None = None,
    *,
    events_namespace: str = "events",
    equality_operators: dict[str, EqOperator] | None = None,
    warn_on_no_fields: bool = ...,
    cache_on_instance: bool = ...,
    connect_child_events: bool = ...,
    signal_aliases: Mapping[str, str | None]
    | FieldAliasFunc
    | None = ...,
) -> Callable[[T], T]
evented(
    cls: T | None = None,
    *,
    events_namespace: str = "events",
    equality_operators: dict[str, EqOperator] | None = None,
    warn_on_no_fields: bool = True,
    cache_on_instance: bool = True,
    connect_child_events: bool = True,
    signal_aliases: Mapping[str, str | None]
    | FieldAliasFunc
    | None = None,
) -> Callable[[T], T] | T

A decorator to add events to a dataclass.

See also the documentation for SignalGroupDescriptor. This decorator is equivalent setting a class variable named events to a new SignalGroupDescriptor instance.

Note that this decorator will modify cls in place, as well as return it.

Tip

It is recommended to use the SignalGroupDescriptor descriptor rather than the decorator, as it it is more explicit and provides for easier static type inference.

Parameters:

  • cls #

    (type, default: None ) –

    The class to decorate.

  • events_namespace #

    (str, default: 'events' ) –

    The name of the namespace to add the events to, by default "events"

  • equality_operators #

    (dict[str, Callable] | None, default: None ) –

    A dictionary mapping field names to equality operators (a function that takes two values and returns True if they are equal). These will be used to determine if a field has changed when setting a new value. By default, this will use the __eq__ method of the field type, or np.array_equal, for numpy arrays. But you can provide your own if you want to customize how equality is checked. Alternatively, if the class has an __eq_operators__ class attribute, it will be used.

  • warn_on_no_fields #

    (bool, default: True ) –

    If True (the default), a warning will be emitted if no mutable dataclass-like fields are found on the object.

  • cache_on_instance #

    (bool, default: True ) –

    If True (the default), a newly-created SignalGroup instance will be cached on the instance itself, so that subsequent accesses to the descriptor will return the same SignalGroup instance. This makes for slightly faster subsequent access, but means that the owner instance will no longer be pickleable. If False, the SignalGroup instance will still be cached, but not on the instance itself.

  • connect_child_events #

    (bool, default: True ) –

    If True, will connect events from all fields on the dataclass whose type is also "evented" (as determined by the psygnal.is_evented function, which returns True if the class has been decorated with @evented, or if it has a SignalGroupDescriptor) to the group on the parent object. By default True. This is useful for nested evented dataclasses, where you want to monitor events emitted from arbitrarily deep children on the parent object.

  • signal_aliases #

    (Mapping[str, str | None] | FieldAliasFunc | None, default: None ) –

    If defined, a mapping between field name and signal name. Field names that are not signal_aliases keys are not aliased (the signal name is the field name). If the dict value is None, do not create a signal associated with this field. If a callable, the signal name is the output of the function applied to the field name. If the output is None, no signal is created for this field. If None, defaults to an empty dict, no aliases. Default to None

Returns:

  • type

    The decorated class, which gains a new SignalGroup instance at the events_namespace attribute (by default, events).

Raises:

  • TypeError

    If the class is frozen or is not a class.

Examples:

from psygnal import evented
from dataclasses import dataclass


@evented
@dataclass
class Person:
    name: str
    age: int = 0
Source code in src/psygnal/_evented_decorator.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def evented(
    cls: T | None = None,
    *,
    events_namespace: str = "events",
    equality_operators: dict[str, EqOperator] | None = None,
    warn_on_no_fields: bool = True,
    cache_on_instance: bool = True,
    connect_child_events: bool = True,
    signal_aliases: Mapping[str, str | None] | FieldAliasFunc | None = None,
) -> Callable[[T], T] | T:
    """A decorator to add events to a dataclass.

    See also the documentation for
    [`SignalGroupDescriptor`][psygnal.SignalGroupDescriptor].  This decorator is
    equivalent setting a class variable named `events` to a new
    `SignalGroupDescriptor` instance.

    Note that this decorator will modify `cls` *in place*, as well as return it.

    !!!tip
        It is recommended to use the `SignalGroupDescriptor` descriptor rather than
        the decorator, as it it is more explicit and provides for easier static type
        inference.

    Parameters
    ----------
    cls : type
        The class to decorate.
    events_namespace : str
        The name of the namespace to add the events to, by default `"events"`
    equality_operators : dict[str, Callable] | None
        A dictionary mapping field names to equality operators (a function that takes
        two values and returns `True` if they are equal). These will be used to
        determine if a field has changed when setting a new value.  By default, this
        will use the `__eq__` method of the field type, or np.array_equal, for numpy
        arrays.  But you can provide your own if you want to customize how equality is
        checked. Alternatively, if the class has an `__eq_operators__` class attribute,
        it will be used.
    warn_on_no_fields : bool
        If `True` (the default), a warning will be emitted if no mutable dataclass-like
        fields are found on the object.
    cache_on_instance : bool, optional
        If `True` (the default), a newly-created SignalGroup instance will be cached on
        the instance itself, so that subsequent accesses to the descriptor will return
        the same SignalGroup instance.  This makes for slightly faster subsequent
        access, but means that the owner instance will no longer be pickleable.  If
        `False`, the SignalGroup instance will *still* be cached, but not on the
        instance itself.
    connect_child_events : bool, optional
        If `True`, will connect events from all fields on the dataclass whose type is
        also "evented" (as determined by the `psygnal.is_evented` function,
        which returns True if the class has been decorated with `@evented`, or if it
        has a SignalGroupDescriptor) to the group on the parent object. By default
        True.
        This is useful for nested evented dataclasses, where you want to monitor events
        emitted from arbitrarily deep children on the parent object.
    signal_aliases: Mapping[str, str | None] | Callable[[str], str | None] | None
        If defined, a mapping between field name and signal name. Field names that are
        not `signal_aliases` keys are not aliased (the signal name is the field name).
        If the dict value is None, do not create a signal associated with this field.
        If a callable, the signal name is the output of the function applied to the
        field name. If the output is None, no signal is created for this field.
        If None, defaults to an empty dict, no aliases.
        Default to None

    Returns
    -------
    type
        The decorated class, which gains a new SignalGroup instance at the
        `events_namespace` attribute (by default, `events`).

    Raises
    ------
    TypeError
        If the class is frozen or is not a class.

    Examples
    --------
    ```python
    from psygnal import evented
    from dataclasses import dataclass


    @evented
    @dataclass
    class Person:
        name: str
        age: int = 0
    ```
    """

    def _decorate(cls: T) -> T:
        if not isinstance(cls, type):  # pragma: no cover
            raise TypeError("evented can only be used on classes")
        if any(k.startswith("_psygnal") for k in getattr(cls, "__annotations__", {})):
            raise TypeError("Fields on an evented class cannot start with '_psygnal'")

        descriptor: SignalGroupDescriptor = SignalGroupDescriptor(
            equality_operators=equality_operators,
            warn_on_no_fields=warn_on_no_fields,
            cache_on_instance=cache_on_instance,
            connect_child_events=connect_child_events,
            signal_aliases=signal_aliases,
        )
        # as a decorator, this will have already been called
        descriptor.__set_name__(cls, events_namespace)
        setattr(cls, events_namespace, descriptor)
        return cls

    return _decorate(cls) if cls is not None else _decorate

get_async_backend #

get_async_backend() -> _AsyncBackend | None

Get the current async backend. Returns None if no backend is set.

Source code in src/psygnal/_async.py
32
33
34
def get_async_backend() -> _AsyncBackend | None:
    """Get the current async backend. Returns None if no backend is set."""
    return _ASYNC_BACKEND

get_evented_namespace #

get_evented_namespace(obj: object) -> str | None

Return the name of the evented SignalGroup for an object.

Note: if you get the returned name as an attribute of the object, it will be a SignalGroup instance only if obj is an instance of an evented class. If obj is the evented class itself, it will be a _SignalGroupDescriptor.

Examples:

from psygnal import evented, get_evented_namespace, is_evented


@evented(events_namespace="my_events")
class Foo: ...


assert get_evented_namespace(Foo) == "my_events"
assert is_evented(Foo)
Source code in src/psygnal/_group_descriptor.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
def get_evented_namespace(obj: object) -> str | None:
    """Return the name of the evented SignalGroup for an object.

    Note: if you get the returned name as an attribute of the object, it will be a
    SignalGroup instance only if `obj` is an *instance* of an evented class.
    If `obj` is the evented class itself, it will be a `_SignalGroupDescriptor`.

    Examples
    --------
    ```python
    from psygnal import evented, get_evented_namespace, is_evented


    @evented(events_namespace="my_events")
    class Foo: ...


    assert get_evented_namespace(Foo) == "my_events"
    assert is_evented(Foo)
    ```
    """
    return getattr(obj, PSYGNAL_GROUP_NAME, None)

is_evented #

is_evented(obj: object) -> bool

Return True if the object or its class has been decorated with evented.

This also works for a setattr method that has been patched by psygnal.

Source code in src/psygnal/_group_descriptor.py
309
310
311
312
313
314
def is_evented(obj: object) -> bool:
    """Return `True` if the object or its class has been decorated with evented.

    This also works for a __setattr__ method that has been patched by psygnal.
    """
    return hasattr(obj, PSYGNAL_GROUP_NAME) or hasattr(obj, PATCHED_BY_PSYGNAL)

set_async_backend #

set_async_backend(
    backend: Literal["asyncio"],
) -> AsyncioBackend
set_async_backend(
    backend: Literal["anyio"],
) -> AnyioBackend
set_async_backend(backend: Literal['trio']) -> TrioBackend
set_async_backend(
    backend: SupportedBackend = "asyncio",
) -> _AsyncBackend

Set the async backend to use. Must be one of: 'asyncio', 'anyio', 'trio'.

This should be done as early as possible, and must be called before calling SignalInstance.connect with a coroutine function.

Source code in src/psygnal/_async.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def set_async_backend(backend: SupportedBackend = "asyncio") -> _AsyncBackend:
    """Set the async backend to use. Must be one of: 'asyncio', 'anyio', 'trio'.

    This should be done as early as possible, and *must* be called before calling
    `SignalInstance.connect` with a coroutine function.
    """
    global _ASYNC_BACKEND

    if _ASYNC_BACKEND and _ASYNC_BACKEND._backend != backend:  # pragma: no cover
        # allow setting the same backend multiple times, for tests
        raise RuntimeError(f"Async backend already set to: {_ASYNC_BACKEND._backend}")

    if backend == "asyncio":
        _ASYNC_BACKEND = AsyncioBackend()
    elif backend == "anyio":
        _ASYNC_BACKEND = AnyioBackend()
    elif backend == "trio":
        _ASYNC_BACKEND = TrioBackend()
    else:  # pragma: no cover
        raise RuntimeError(
            f"Async backend not supported: {backend}.  "
            "Must be one of: 'asyncio', 'anyio', 'trio'"
        )

    return _ASYNC_BACKEND

throttled #

throttled(
    func: Callable[P, Any],
    timeout: int = 100,
    leading: bool = True,
) -> Throttler[P]
throttled(
    func: Literal[None] | None = None,
    timeout: int = 100,
    leading: bool = True,
) -> Callable[[Callable[P, Any]], Throttler[P]]
throttled(
    func: Callable[P, Any] | None = None,
    timeout: int = 100,
    leading: bool = True,
) -> (
    Throttler[P]
    | Callable[[Callable[P, Any]], Throttler[P]]
)

Create a throttled function that invokes func at most once per timeout.

The throttled function comes with a cancel method to cancel delayed func invocations and a flush method to immediately invoke them. Options to indicate whether func should be invoked on the leading and/or trailing edge of the wait timeout. The func is invoked with the last arguments provided to the throttled function. Subsequent calls to the throttled function return the result of the last func invocation.

This decorator may be used with or without parameters.

Parameters:

  • func #

    (Callable, default: None ) –

    A function to throttle

  • timeout #

    (int, default: 100 ) –

    Timeout in milliseconds to wait before allowing another call, by default 100

  • leading #

    (bool, default: True ) –

    Whether to invoke the function on the leading edge of the wait timer, by default True

Examples:

from psygnal import Signal, throttled

class MyEmitter:
    changed = Signal(int)

def on_change(val: int)
    # do something possibly expensive
    ...

emitter = MyEmitter()

# connect the `on_change` whenever `emitter.changed` is emitted
# BUT, no more than once every 50 milliseconds
emitter.changed.connect(throttled(on_change, timeout=50))
Source code in src/psygnal/_throttler.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def throttled(
    func: Callable[P, Any] | None = None,
    timeout: int = 100,
    leading: bool = True,
) -> Throttler[P] | Callable[[Callable[P, Any]], Throttler[P]]:
    """Create a throttled function that invokes func at most once per timeout.

    The throttled function comes with a `cancel` method to cancel delayed func
    invocations and a `flush` method to immediately invoke them. Options
    to indicate whether func should be invoked on the leading and/or trailing
    edge of the wait timeout. The func is invoked with the last arguments provided
    to the throttled function. Subsequent calls to the throttled function return
    the result of the last func invocation.

    This decorator may be used with or without parameters.

    Parameters
    ----------
    func : Callable
        A function to throttle
    timeout : int
        Timeout in milliseconds to wait before allowing another call, by default 100
    leading : bool
        Whether to invoke the function on the leading edge of the wait timer,
        by default True

    Examples
    --------
    ```python
    from psygnal import Signal, throttled

    class MyEmitter:
        changed = Signal(int)

    def on_change(val: int)
        # do something possibly expensive
        ...

    emitter = MyEmitter()

    # connect the `on_change` whenever `emitter.changed` is emitted
    # BUT, no more than once every 50 milliseconds
    emitter.changed.connect(throttled(on_change, timeout=50))
    ```
    """

    def deco(func: Callable[P, Any]) -> Throttler[P]:
        policy: EmissionPolicy = "leading" if leading else "trailing"
        return Throttler(func, timeout, policy)

    return deco(func) if func is not None else deco