Source code for seddy._specs._base

"""SWF decisions making."""

import abc
import enum
import dataclasses
import typing as t


class DeciderError(RuntimeError):
    """Misconfiguration of the decider."""


[docs]class ChildPolicy(enum.Enum): """Policy for child executions on parent termination. .. seealso:: `StartWorkflowExecution in SWF API documentation <https://docs.aws.amazon.com/amazonswf/latest/apireference/API_StartWorkflowExecution.html#SWF-StartWorkflowExecution-request-childPolicy>`_ """ TERMINATE = "TERMINATE" REQUEST_CANCEL = "REQUEST_CANCEL" ABANDON = "ABANDON"
[docs]@dataclasses.dataclass class Registration: """Workflow registration configuration. Args: active: registration status, ``False`` for deprecated task_timeout: default decision task time-out (seconds), or "NONE" for unlimited execution_timeout: default workflow execution time-out (seconds) task_list: default decision task-list task_priority: default decision task priority child_policy: default policy for child executions upon parent execution termination lambda_role: default IAM role for Lambda access """ active: bool = True task_timeout: t.Union[int, str] = None execution_timeout: int = None task_list: str = None task_priority: int = None child_policy: ChildPolicy = None lambda_role: str = None
[docs] @classmethod def from_spec(cls, spec: t.Dict[str, t.Any]): """Construct registration configuration from specification. Args: spec: workflow registration configuration specification """ kw = {} if "active" in spec: kw["active"] = spec["active"] if "task_timeout" in spec: kw["task_timeout"] = spec["task_timeout"] if "execution_timeout" in spec: kw["execution_timeout"] = spec["execution_timeout"] if "task_list" in spec: kw["task_list"] = spec["task_list"] if "task_priority" in spec: kw["task_priority"] = spec["task_priority"] if "child_policy" in spec: kw["child_policy"] = ChildPolicy(spec["child_policy"]) if "lambda_role" in spec: kw["lambda_role"] = spec["lambda_role"] return cls(**kw)
[docs]class DecisionsBuilder(metaclass=abc.ABCMeta): """SWF decision builder. Args: workflow: workflow specification task: decision task """ def __init__(self, workflow: "Workflow", task: t.Dict[str, t.Any]): self.workflow = workflow self.task = task self.decisions = []
[docs] @abc.abstractmethod def build_decisions(self): # pragma: no cover """Build decisions from workflow history.""" raise NotImplementedError
[docs]class Workflow(metaclass=abc.ABCMeta): """SWF workflow specification. Args: name: workflow name version: workflow version registration: workflow registration configuration """ _registration_cls = Registration def __init__( self, name: str, version: str, description: str = None, registration: Registration = None, ): self.name = name self.version = version self.description = description self.registration = registration @classmethod def _args_from_spec( cls, spec: t.Dict[str, t.Any] ) -> t.Tuple[tuple, t.Dict[str, t.Any]]: """Construct initialisation arguments from workflow specification. Args: spec: workflow specification Returns: initialisation positional and keyword arguments """ args = (spec["name"], spec["version"]) kwargs = {} if "description" in spec: kwargs["description"] = spec["description"] if "registration" in spec: kwargs["registration"] = cls._registration_cls.from_spec( spec["registration"] ) return args, kwargs
[docs] @classmethod def from_spec(cls, spec: t.Dict[str, t.Any]): """Construct workflow type from specification. Args: spec: workflow specification """ args, kwargs = cls._args_from_spec(spec) return cls(*args, **kwargs)
@property @abc.abstractmethod def decisions_builder(self) -> t.Type[DecisionsBuilder]: # pragma: no cover raise NotImplementedError @property @abc.abstractmethod def spec_type(self) -> str: # pragma: no cover raise NotImplementedError
[docs] def setup(self): """Set up workflow specification. Useful for pre-calculation or other initialisation. """
[docs] def make_decisions(self, task: t.Dict[str, t.Any]) -> t.List[t.Dict[str, t.Any]]: """Build decisions from workflow history. Args: task: decision task Returns: workflow decisions """ builder = self.decisions_builder(self, task) builder.build_decisions() return builder.decisions
def make_decisions_on_error(exception: Exception) -> t.List[t.Dict[str, t.Any]]: """Build workflow-fail decision on decider exception. Args: exception: decider exception being handled Returns: workflow-fail decision """ decision_attrs = {"reason": exception.__class__.__name__} message = str(exception) if message: decision_attrs["details"] = message decision = { "decisionType": "FailWorkflowExecution", "failWorkflowExecutionDecisionAttributes": decision_attrs, } return [decision]