359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701 | @dataclass_transform(kw_only_default=True, field_specifiers=(pydantic.Field,))
class EventedModel(pydantic.BaseModel, metaclass=EventedMetaclass):
"""A pydantic BaseModel that emits a signal whenever a field value is changed.
!!! important
This class requires `pydantic` to be installed.
You can install directly (`pip install pydantic`) or by using the psygnal
extra: `pip install psygnal[pydantic]`
In addition to standard pydantic `BaseModel` properties
(see [pydantic docs](https://pydantic-docs.helpmanual.io/usage/models/)),
this class adds the following:
1. gains an `events` attribute that is an instance of [`psygnal.SignalGroup`][].
This group will have a signal for each field in the model (excluding private
attributes and non-mutable fields). Whenever a field in the model is mutated,
the corresponding signal will emit with the new value (see example below).
2. Gains support for properties and property.setters (not supported in pydantic's
BaseModel). Enable by adding `allow_property_setters = True` to your model
`Config`.
3. If you would like properties (i.e. "computed fields") to emit an event when
one of the model fields it depends on is mutated you must set one of the
following options in the `Config`:
- `property_dependencies` may be a `Dict[str, List[str]]`, where the
keys are the names of properties, and the values are a list of field names
(strings) that the property depends on for its value
- `guess_property_dependencies` may be set to `True` to "guess" property
dependencies by inspecting the source code of the property getter for.
4. If you would like to allow custom fields to provide their own json_encoders, you
can either use the standard pydantic method of adding json_encoders to your
model, for each field type you'd like to support:
https://pydantic-docs.helpmanual.io/usage/exporting_models/#json_encoders
This `EventedModel` class will additionally look for a `_json_encode` method
on any field types in the model. If a field type declares a `_json_encode`
method, it will be added to the
[`json_encoders`](https://pydantic-docs.helpmanual.io/usage/exporting_models/#json_encoders)
dict in the model `Config`.
Examples
--------
Standard EventedModel example:
```python
class MyModel(EventedModel):
x: int = 1
m = MyModel()
m.events.x.connect(lambda v: print(f"new value is {v}"))
m.x = 3 # prints 'new value is 3'
```
An example of using property_setters and emitting signals when a field dependency
is mutated.
```python
class MyModel(EventedModel):
a: int = 1
b: int = 1
@property
def c(self) -> List[int]:
return [self.a, self.b]
@c.setter
def c(self, val: Sequence[int]) -> None:
self.a, self.b = val
class Config:
allow_property_setters = True
field_dependencies = {"c": ["a", "b"]}
m = MyModel()
assert m.c == [1, 1]
m.events.c.connect(lambda v: print(f"c updated to {v}"))
m.a = 2 # prints 'c updated to [2, 1]'
```
"""
# add private attributes for event emission
_events: ClassVar[SignalGroup] = PrivateAttr()
# mapping of name -> property obj for methods that are property setters
__property_setters__: ClassVar[Dict[str, property]]
# mapping of field name -> dependent set of property names
# when field is changed, an event for dependent properties will be emitted.
__field_dependents__: ClassVar[Dict[str, Set[str]]]
__eq_operators__: ClassVar[Dict[str, "EqOperator"]]
__slots__ = {"__weakref__"}
__signal_group__: ClassVar[Type[SignalGroup]]
_changes_queue: Dict[str, Any] = PrivateAttr(default_factory=dict)
_primary_changes: Set[str] = PrivateAttr(default_factory=set)
_delay_check_semaphore: int = PrivateAttr(0)
if PYDANTIC_V1:
class Config:
# this seems to be necessary for the _json_encoders trick to work
json_encoders: ClassVar[dict] = {"____": None}
def __init__(_model_self_, **data: Any) -> None:
super().__init__(**data)
Group = _model_self_.__signal_group__
# the type error is "cannot assign to a class variable" ...
# but if we don't use `ClassVar`, then the `dataclass_transform` decorator
# will add _events: SignalGroup to the __init__ signature, for *all* user models
_model_self_._events = Group(_model_self_) # type: ignore [misc]
# expose the private SignalGroup publicly
@property
def events(self) -> SignalGroup:
"""Return the `SignalGroup` containing all events for this model."""
return self._events
@property
def _defaults(self) -> Dict[str, Any]:
return _get_defaults(self)
def __eq__(self, other: Any) -> bool:
"""Check equality with another object.
We override the pydantic approach (which just checks
``self.model_dump() == other.model_dump()``) to accommodate more complicated
types like arrays, whose truth value is often ambiguous. ``__eq_operators__``
is constructed in ``EqualityMetaclass.__new__``
"""
if not isinstance(other, EventedModel):
return bool(_model_dump(self) == other)
for f_name, _ in self.__eq_operators__.items():
if not hasattr(self, f_name) or not hasattr(other, f_name):
return False # pragma: no cover
a = getattr(self, f_name)
b = getattr(other, f_name)
if not _check_field_equality(type(self), f_name, a, b):
return False
return True
def update(self, values: Union["EventedModel", dict], recurse: bool = True) -> None:
"""Update a model in place.
Parameters
----------
values : Union[dict, EventedModel]
Values to update the model with. If an EventedModel is passed it is
first converted to a dictionary. The keys of this dictionary must
be found as attributes on the current model.
recurse : bool
If True, recursively update fields that are EventedModels.
Otherwise, just update the immediate fields of this EventedModel,
which is useful when the declared field type (e.g. ``Union``) can have
different realized types with different fields.
"""
if isinstance(values, pydantic.BaseModel):
values = _model_dump(values)
if not isinstance(values, dict): # pragma: no cover
raise TypeError(f"values must be a dict or BaseModel. got {type(values)}")
with self.events._psygnal_relay.paused(): # TODO: reduce?
for key, value in values.items():
field = getattr(self, key)
if isinstance(field, EventedModel) and recurse:
field.update(value, recurse=recurse)
else:
setattr(self, key, value)
def reset(self) -> None:
"""Reset the state of the model to default values."""
model_config = _get_config(self)
model_fields = _get_fields(self)
for name, value in self._defaults.items():
if isinstance(value, EventedModel):
cast("EventedModel", getattr(self, name)).reset()
elif not model_config.get("frozen") and not model_fields[name].frozen:
setattr(self, name, value)
def _check_if_values_changed_and_emit_if_needed(self) -> None:
"""
Check if field values changed and emit events if needed.
The advantage of moving this to the end of all the modifications is
that comparisons will be performed only once for every potential change.
"""
if self._delay_check_semaphore > 0 or len(self._changes_queue) == 0:
# do not run whole machinery if there is no need
return
to_emit = []
for name in self._primary_changes:
# primary changes should contains only fields
# that are changed directly by assignment
old_value = self._changes_queue[name]
new_value = getattr(self, name)
if not _check_field_equality(type(self), name, new_value, old_value):
to_emit.append((name, new_value))
self._changes_queue.pop(name)
if not to_emit:
# If no direct changes was made then we can skip whole machinery
self._changes_queue.clear()
self._primary_changes.clear()
return
for name, old_value in self._changes_queue.items():
# check if any of dependent properties changed
new_value = getattr(self, name)
if not _check_field_equality(type(self), name, new_value, old_value):
to_emit.append((name, new_value))
self._changes_queue.clear()
self._primary_changes.clear()
with ComparisonDelayer(self):
# Again delay comparison to avoid having events caused by callback functions
for name, new_value in to_emit:
getattr(self._events, name)(new_value)
def __setattr__(self, name: str, value: Any) -> None:
if (
name == "_events"
or not hasattr(self, "_events") # can happen on init
or name not in self._events
):
# fallback to default behavior
return self._super_setattr_(name, value)
# the _setattr_default method is overridden in __new__ to be one of
# `_setattr_no_dependants` or `_setattr_with_dependents`.
self._setattr_default(name, value)
def _super_setattr_(self, name: str, value: Any) -> None:
# pydantic will raise a ValueError if extra fields are not allowed
# so we first check to see if this field has a property.setter.
# if so, we use it instead.
if name in self.__property_setters__:
self.__property_setters__[name].fset(self, value) # type: ignore[misc]
elif name == "_events":
# pydantic v2 prohibits shadowing class vars, on instances
object.__setattr__(self, name, value)
else:
super().__setattr__(name, value)
def _setattr_default(self, name: str, value: Any) -> None:
"""Will be overwritten by metaclass __new__.
It will become either `_setattr_no_dependants` (if the class has no
properties and `__field_dependents__`), or `_setattr_with_dependents` if it
does.
"""
def _setattr_no_dependants(self, name: str, value: Any) -> None:
"""__setattr__ behavior when the class has no properties."""
group = self._events
signal_instance: SignalInstance = group[name]
if len(signal_instance) < 1:
return self._super_setattr_(name, value)
old_value = getattr(self, name, object())
self._super_setattr_(name, value)
if not _check_field_equality(type(self), name, value, old_value):
getattr(self._events, name)(value)
def _setattr_with_dependents(self, name: str, value: Any) -> None:
"""__setattr__ behavior when the class does properties."""
with ComparisonDelayer(self):
self._setattr_impl(name, value)
def _setattr_impl(self, name: str, value: Any) -> None:
# if there are no listeners, we can just set the value without emitting
# so first check if there are any listeners for this field or any of its
# dependent properties.
# note that ALL signals will have sat least one listener simply by nature of
# being in the `self._events` SignalGroup.
group = self._events
signal_instance: SignalInstance = group[name]
deps_with_callbacks = {
dep_name
for dep_name in self.__field_dependents__.get(name, ())
if len(group[dep_name])
}
if (
len(signal_instance) < 1 # the signal itself has no listeners
and not deps_with_callbacks # no dependent properties with listeners
and not len(group._psygnal_relay) # no listeners on the SignalGroup
):
return self._super_setattr_(name, value)
self._primary_changes.add(name)
if name not in self._changes_queue:
self._changes_queue[name] = getattr(self, name, object())
for dep in deps_with_callbacks:
if dep not in self._changes_queue:
self._changes_queue[dep] = getattr(self, dep, object())
self._super_setattr_(name, value)
if PYDANTIC_V1:
@contextmanager
def enums_as_values(self, as_values: bool = True) -> Iterator[None]:
"""Temporarily override how enums are retrieved.
Parameters
----------
as_values : bool
Whether enums should be shown as values (or as enum objects),
by default `True`
"""
before = getattr(self.Config, "use_enum_values", NULL)
self.Config.use_enum_values = as_values # type: ignore
try:
yield
finally:
if before is not NULL:
self.Config.use_enum_values = before # type: ignore # pragma: no cover
else:
delattr(self.Config, "use_enum_values")
else:
@classmethod
@contextmanager
def enums_as_values(
cls, as_values: bool = True
) -> Iterator[None]: # pragma: no cover
"""Temporarily override how enums are retrieved.
Parameters
----------
as_values : bool
Whether enums should be shown as values (or as enum objects),
by default `True`
"""
before = cls.model_config.get("use_enum_values", NULL)
cls.model_config["use_enum_values"] = as_values
try:
yield
finally:
if before is not NULL: # pragma: no cover
cls.model_config["use_enum_values"] = cast(bool, before)
else:
cls.model_config.pop("use_enum_values")
|