__doc__ = """
Base System
-----------
Basic coordinating for multiple, smaller systems that have an independently integrable
interface (i.e. works with symplectic or explicit routines `timestepper.py`.)
"""
from typing import Iterable, Callable, AnyStr
from collections.abc import MutableSequence
from elastica.rod import RodBase
from elastica.rigidbody import RigidBodyBase
from elastica.surface import SurfaceBase
from elastica.modules.memory_block import construct_memory_block_structures
from elastica._synchronize_periodic_boundary import _ConstrainPeriodicBoundaries
[docs]class BaseSystemCollection(MutableSequence):
"""
Base System for simulator classes. Every simulation class written by the user
must be derived from the BaseSystemCollection class; otherwise the simulation will
proceed.
Attributes
----------
allowed_sys_types: tuple
Tuple of allowed type rod-like objects. Here use a base class for objects, i.e. RodBase.
_systems: list
List of rod-like objects.
"""
"""
Developer Note
-----
Note
----
We can directly subclass a list for the
most part, but this is a bad idea, as List is non abstract
https://stackoverflow.com/q/3945940
"""
def __init__(self):
# Collection of functions. Each group is executed as a collection at the different steps.
# Each component (Forcing, Connection, etc.) registers the executable (callable) function
# in the group that that needs to be executed. These should be initialized before mixin.
self._feature_group_synchronize: Iterable[Callable[[float], None]] = []
self._feature_group_constrain_values: Iterable[Callable[[float], None]] = []
self._feature_group_constrain_rates: Iterable[Callable[[float], None]] = []
self._feature_group_callback: Iterable[
Callable[[float, int, AnyStr], None]
] = []
self._feature_group_finalize: Iterable[Callable] = []
# We need to initialize our mixin classes
super(BaseSystemCollection, self).__init__()
# List of system types/bases that are allowed
self.allowed_sys_types = (RodBase, RigidBodyBase, SurfaceBase)
# List of systems to be integrated
self._systems = []
# Flag Finalize: Finalizing twice will cause an error,
# but the error message is very misleading
self._finalize_flag = False
def _check_type(self, sys_to_be_added: AnyStr):
if not issubclass(sys_to_be_added.__class__, self.allowed_sys_types):
raise TypeError(
"{0}\n"
"is not a system passing validity\n"
"checks, that can be added into BaseSystem. If you are sure that\n"
"{0}\n"
"satisfies all criteria for being a system, please add\n"
"it using BaseSystem.extend_allowed_types.\n"
"The allowed types are\n"
"{1}".format(sys_to_be_added.__class__, self.allowed_sys_types)
)
return True
def __len__(self):
return len(self._systems)
def __getitem__(self, idx):
return self._systems[idx]
def __delitem__(self, idx):
del self._systems[idx]
def __setitem__(self, idx, system):
self._check_type(system)
self._systems[idx] = system
def insert(self, idx, system):
self._check_type(system)
self._systems.insert(idx, system)
def __str__(self):
return str(self._systems)
def extend_allowed_types(self, additional_types):
self.allowed_sys_types += additional_types
def override_allowed_types(self, allowed_types):
self.allowed_sys_types = allowed_types
def _get_sys_idx_if_valid(self, sys_to_be_added):
from numpy import int_ as npint
n_systems = len(self._systems) # Total number of systems from mixed-in class
if isinstance(sys_to_be_added, (int, npint)):
# 1. If they are indices themselves, check range
assert (
-n_systems <= sys_to_be_added < n_systems
), "Rod index {} exceeds number of registered rodtems".format(
sys_to_be_added
)
sys_idx = sys_to_be_added
elif self._check_type(sys_to_be_added):
# 2. If they are rod objects (most likely), lookup indices
# index might have some problems : https://stackoverflow.com/a/176921
try:
sys_idx = self._systems.index(sys_to_be_added)
except ValueError:
raise ValueError(
"Rod {} was not found, did you append it to the system?".format(
sys_to_be_added
)
)
return sys_idx
[docs] def finalize(self):
"""
This method finalizes the simulator class. When it is called, it is assumed that the user has appended
all rod-like objects to the simulator as well as all boundary conditions, callbacks, etc.,
acting on these rod-like objects. After the finalize method called,
the user cannot add new features to the simulator class.
"""
# This generates more straight-forward error.
assert self._finalize_flag is not True, "The finalize cannot be called twice."
# construct memory block
self._memory_blocks = construct_memory_block_structures(self._systems)
"""
In case memory block have ring rod, then periodic boundaries have to be synched. In order to synchronize
periodic boundaries, a new constrain for memory block rod added called as _ConstrainPeriodicBoundaries. This
constrain will synchronize the only periodic boundaries of position, director, velocity and omega variables.
"""
for i in range(len(self._memory_blocks)):
# append the memory block to the simulation as a system. Memory block is the final system in the simulation.
self.append(self._memory_blocks[i])
if hasattr(self._memory_blocks[i], "ring_rod_flag"):
# Apply the constrain to synchronize the periodic boundaries of the memory rod. Find the memory block
# sys idx among other systems added and then apply boundary conditions.
memory_block_idx = self._get_sys_idx_if_valid(self._memory_blocks[i])
self.constrain(self._systems[memory_block_idx]).using(
_ConstrainPeriodicBoundaries,
)
# Recurrent call finalize functions for all components.
for finalize in self._feature_group_finalize:
finalize()
# Clear the finalize feature group, just for the safety.
self._feature_group_finalize.clear()
self._feature_group_finalize = None
# Toggle the finalize_flag
self._finalize_flag = True
# sort _feature_group_synchronize so that _call_contacts is at the end
_call_contacts_index = []
for idx, feature in enumerate(self._feature_group_synchronize):
if feature.__name__ == "_call_contacts":
_call_contacts_index.append(idx)
# Move to the _call_contacts to the end of the _feature_group_synchronize list.
for index in _call_contacts_index:
self._feature_group_synchronize.append(
self._feature_group_synchronize.pop(index)
)
def synchronize(self, time: float):
# Collection call _feature_group_synchronize
for feature in self._feature_group_synchronize:
feature(time)
def constrain_values(self, time: float):
# Collection call _feature_group_constrain_values
for feature in self._feature_group_constrain_values:
feature(time)
def constrain_rates(self, time: float):
# Collection call _feature_group_constrain_rates
for feature in self._feature_group_constrain_rates:
feature(time)
def apply_callbacks(self, time: float, current_step: int):
# Collection call _feature_group_callback
for feature in self._feature_group_callback:
feature(time, current_step)