Source code for seddy._specs._dag

"""SWF decisions making."""

import json
import string
import dataclasses
import typing as t
import logging as lg

from . import _base

logger = lg.getLogger(__name__)
_jsonpath_characters = string.digits + string.ascii_letters + "_"
_sentinel = object()
_attr_keys = {
    "ActivityTaskCancelRequested": "activityTaskCancelRequestedEventAttributes",
    "ActivityTaskCanceled": "activityTaskCanceledEventAttributes",
    "ActivityTaskCompleted": "activityTaskCompletedEventAttributes",
    "ActivityTaskFailed": "activityTaskFailedEventAttributes",
    "ActivityTaskScheduled": "activityTaskScheduledEventAttributes",
    "ActivityTaskStarted": "activityTaskStartedEventAttributes",
    "ActivityTaskTimedOut": "activityTaskTimedOutEventAttributes",
    "CancelTimerFailed": "cancelTimerFailedEventAttributes",
    "CancelWorkflowExecutionFailed": "cancelWorkflowExecutionFailedEventAttributes",
    "ChildWorkflowExecutionCanceled": "childWorkflowExecutionCanceledEventAttributes",
    "ChildWorkflowExecutionCompleted": "childWorkflowExecutionCompletedEventAttributes",
    "ChildWorkflowExecutionFailed": "childWorkflowExecutionFailedEventAttributes",
    "ChildWorkflowExecutionStarted": "childWorkflowExecutionStartedEventAttributes",
    "ChildWorkflowExecutionTerminated": "childWorkflowExecutionTerminatedEventAttributes",
    "ChildWorkflowExecutionTimedOut": "childWorkflowExecutionTimedOutEventAttributes",
    "CompleteWorkflowExecutionFailed": "completeWorkflowExecutionFailedEventAttributes",
    "ContinueAsNewWorkflowExecutionFailed": "continueAsNewWorkflowExecutionFailedEventAttributes",
    "DecisionTaskCompleted": "decisionTaskCompletedEventAttributes",
    "DecisionTaskScheduled": "decisionTaskScheduledEventAttributes",
    "DecisionTaskStarted": "decisionTaskStartedEventAttributes",
    "DecisionTaskTimedOut": "decisionTaskTimedOutEventAttributes",
    "ExternalWorkflowExecutionCancelRequested": "externalWorkflowExecutionCancelRequestedEventAttributes",
    "ExternalWorkflowExecutionSignaled": "externalWorkflowExecutionSignaledEventAttributes",
    "FailWorkflowExecutionFailed": "failWorkflowExecutionFailedEventAttributes",
    "LambdaFunctionCompleted": "lambdaFunctionCompletedEventAttributes",
    "LambdaFunctionFailed": "lambdaFunctionFailedEventAttributes",
    "LambdaFunctionScheduled": "lambdaFunctionScheduledEventAttributes",
    "LambdaFunctionStarted": "lambdaFunctionStartedEventAttributes",
    "LambdaFunctionTimedOut": "lambdaFunctionTimedOutEventAttributes",
    "MarkerRecorded": "markerRecordedEventAttributes",
    "RecordMarkerFailed": "recordMarkerFailedEventAttributes",
    "RequestCancelActivityTaskFailed": "requestCancelActivityTaskFailedEventAttributes",
    "RequestCancelExternalWorkflowExecutionFailed": "requestCancelExternalWorkflowExecutionFailedEventAttributes",
    "RequestCancelExternalWorkflowExecutionInitiated": "requestCancelExternalWorkflowExecutionInitiatedEventAttributes",
    "ScheduleActivityTaskFailed": "scheduleActivityTaskFailedEventAttributes",
    "ScheduleLambdaFunctionFailed": "scheduleLambdaFunctionFailedEventAttributes",
    "SignalExternalWorkflowExecutionFailed": "signalExternalWorkflowExecutionFailedEventAttributes",
    "SignalExternalWorkflowExecutionInitiated": "signalExternalWorkflowExecutionInitiatedEventAttributes",
    "StartChildWorkflowExecutionFailed": "startChildWorkflowExecutionFailedEventAttributes",
    "StartChildWorkflowExecutionInitiated": "startChildWorkflowExecutionInitiatedEventAttributes",
    "StartLambdaFunctionFailed": "startLambdaFunctionFailedEventAttributes",
    "StartTimerFailed": "startTimerFailedEventAttributes",
    "TimerCanceled": "timerCanceledEventAttributes",
    "TimerFired": "timerFiredEventAttributes",
    "TimerStarted": "timerStartedEventAttributes",
    "WorkflowExecutionCancelRequested": "workflowExecutionCancelRequestedEventAttributes",
    "WorkflowExecutionCanceled": "workflowExecutionCanceledEventAttributes",
    "WorkflowExecutionCompleted": "workflowExecutionCompletedEventAttributes",
    "WorkflowExecutionContinuedAsNew": "workflowExecutionContinuedAsNewEventAttributes",
    "WorkflowExecutionFailed": "workflowExecutionFailedEventAttributes",
    "WorkflowExecutionSignaled": "workflowExecutionSignaledEventAttributes",
    "WorkflowExecutionStarted": "workflowExecutionStartedEventAttributes",
    "WorkflowExecutionTerminated": "workflowExecutionTerminatedEventAttributes",
    "WorkflowExecutionTimedOut": "workflowExecutionTimedOutEventAttributes",
}
_error_events = {
    "ActivityTaskFailed",
    "ActivityTaskTimedOut",
    "CancelTimerFailed",
    "CancelWorkflowExecutionFailed",
    "CompleteWorkflowExecutionFailed",
    "DecisionTaskTimedOut",
    "FailWorkflowExecutionFailed",
    "RecordMarkerFailed",
    "RequestCancelActivityTaskFailed",
    "ScheduleActivityTaskFailed",
    "StartTimerFailed",
    "WorkflowExecutionCancelRequested",
}
_activity_events = {
    "ActivityTaskCompleted",
    "ActivityTaskFailed",
    "ActivityTaskTimedOut",
    "ActivityTaskScheduled",
    "ActivityTaskStarted",
}
_decision_failed_events = {
    "ScheduleActivityTaskFailed",
    "RequestCancelActivityTaskFailed",
    "StartTimerFailed",
    "CancelTimerFailed",
    "StartChildWorkflowExecutionFailed",
    "SignalExternalWorkflowExecutionFailed",
    "RequestCancelExternalWorkflowExecutionFailed",
    "CancelWorkflowExecutionFailed",
    "CompleteWorkflowExecutionFailed",
    "ContinueAsNewWorkflowExecutionFailed",
    "FailWorkflowExecutionFailed",
}


@dataclasses.dataclass
class TaskInput:
    @staticmethod
    def from_spec(spec: t.Dict[str, t.Any]) -> "TaskInput":
        for cls in [NoInput, Constant, WorkflowInput, DependencyResult, Object]:
            if cls.type == spec["type"]:
                break
        else:  # TODO: unit-test
            raise ValueError(spec["type"])
        return cls.from_spec(spec)


@dataclasses.dataclass
class NoInput(TaskInput):
    type: t.ClassVar = "none"

    @classmethod
    def from_spec(cls, spec) -> "NoInput":
        return cls()


@dataclasses.dataclass
class Constant(TaskInput):
    type: t.ClassVar = "constant"
    value: t.Any

    @classmethod
    def from_spec(cls, spec) -> "Constant":
        return cls(spec["value"])


@dataclasses.dataclass
class WorkflowInput(TaskInput):
    type: t.ClassVar = "workflow-input"
    path: str = "$"
    default: t.Any = _sentinel

    @classmethod
    def from_spec(cls, spec) -> "WorkflowInput":
        kwargs = {}
        if "path" in spec:
            kwargs["path"] = spec["path"]
        if "default" in spec:
            kwargs["default"] = spec["default"]
        return cls(**kwargs)


@dataclasses.dataclass
class DependencyResult(TaskInput):
    type: t.ClassVar = "dependency-result"
    id: t.Any
    path: str = "$"
    default: t.Any = _sentinel

    @classmethod
    def from_spec(cls, spec) -> "DependencyResult":
        kwargs = {}
        if "path" in spec:
            kwargs["path"] = spec["path"]
        if "default" in spec:
            kwargs["default"] = spec["default"]
        return cls(spec["id"], **kwargs)


@dataclasses.dataclass
class Object(TaskInput):
    type: t.ClassVar = "object"
    items: t.Dict[str, TaskInput]

    @classmethod
    def from_spec(cls, spec) -> "Object":
        items = {}
        for key, subspec in spec["items"].items():
            items[key] = cls.from_spec(subspec)
        return cls(items)


@dataclasses.dataclass
class Task:  # TODO: unit-test
    """DAG-type workflow activity task specification.

    Args:
        id: task ID, must be unique within a workflow execution and
            without ':', '/', '|', 'arn' or any control character
        name: activity type name
        version: activity type version
        heartbeat: task heartbeat time-out (seconds), or "NONE" for
            unlimited
        timeout: task time-out (seconds), or "None" for unlimited
        task_list: task-list to schedule task on
        priority: task priority
        dependencies: IDs of task’s dependencies
    """

    id: str
    name: str
    version: str
    input: t.Union[NoInput, Constant, WorkflowInput, DependencyResult, Object] = None
    heartbeat: t.Union[int, str] = None
    timeout: t.Union[int, str] = None
    task_list: str = None
    priority: int = None
    dependencies: t.List[str] = None
    _input_cls: t.ClassVar = TaskInput

    @property
    def type(self) -> t.Dict[str, str]:
        """Activity type."""
        return {"name": self.name, "version": self.version}

    @classmethod
    def from_spec(cls, spec: t.Dict[str, t.Any]) -> "Task":
        """Construct registration configuration from specification.

        Args:
            spec: workflow registration configuration specification
        """

        args = (spec["id"], spec["type"]["name"], spec["type"]["version"])
        kwargs = {}
        if "input" in spec:
            kwargs["input"] = cls._input_cls.from_spec(spec["input"])
        if "heartbeat" in spec:
            kwargs["heartbeat"] = spec["heartbeat"]
        if "timeout" in spec:
            kwargs["timeout"] = spec["timeout"]
        if "task_list" in spec:
            kwargs["task_list"] = spec["task_list"]
        if "priority" in spec:
            kwargs["priority"] = spec["priority"]
        if "dependencies" in spec:
            kwargs["dependencies"] = spec["dependencies"]
        return cls(*args, **kwargs)


def _get(item_id, items, id_key):
    """Get item from list with given ID."""
    return next(item for item in items if item[id_key] == item_id)


def _get_item_jsonpath(path: str, obj, default: t.Any = _sentinel) -> t.Any:
    """Get a child item from an object.

    Args:
        path: path to child item, using basic single-valued JSONPath
            syntax
        obj: object to get child item from
        default: default value if missing

    Returns:
        pointed-to child item

    Raises:
        ValueError: invalid path
    """

    if path[0] != "$":
        raise ValueError("invalid path (must start at root): %s" % path)

    indices = []
    chars = []
    state = None
    for char in path[1:]:
        if char in ".[":
            if state:
                if state != ".":
                    raise ValueError("invalid path (missing closing ']'): %s" % path)
                elif not chars:
                    raise ValueError("invalid path (empty key): %s" % path)
                indices.append("".join(chars))
            chars = []
            state = char
        elif char == "]":
            if state != "[":
                raise ValueError("invalid path (invalid key): %s" % path)
            elif not chars:
                raise ValueError("invalid path (empty key): %s" % path)
            index = int("".join(chars))
            indices.append(index)
            chars = []
            state = None
        elif char not in _jsonpath_characters:
            raise ValueError("invalid path (illegal characters): %s" % path)
        else:
            if not state:
                raise ValueError("invalid path (missing '.' or '['): %s" % path)
            chars.append(char)
    if state == "[":
        raise ValueError("invalid path (missing closing ']'): %s" % path)
    elif state == ".":
        indices.append("".join(chars))

    item = obj
    for index in indices:
        try:
            item = item[index]
        except (KeyError, IndexError):
            if default != _sentinel:
                return default
            raise
    return item


def _build_activity_input(
    input_spec: TaskInput,
    workflow_input: t.Union[t.Dict[str, t.Any], None],
    activity_results: t.Dict[str, t.Any],
) -> t.Any:
    """Build activity input.

    Args:
        input_spec: activity task input specification
        workflow_input: workflow input
        activity_results: activities' results

    Returns:
        activity task input
    """

    input_spec = input_spec or NoInput()
    if isinstance(input_spec, NoInput):
        return _sentinel
    if isinstance(input_spec, Constant):
        return input_spec.value
    if isinstance(input_spec, WorkflowInput):
        path = input_spec.path
        return _get_item_jsonpath(path, workflow_input, input_spec.default)
    if isinstance(input_spec, DependencyResult):
        path = input_spec.path
        dependency_result = activity_results[input_spec.id]
        return _get_item_jsonpath(path, dependency_result, input_spec.default)
    if isinstance(input_spec, Object):
        input_ = {}
        for key, subspec in input_spec.items.items():
            value = _build_activity_input(subspec, workflow_input, activity_results)
            if value is not _sentinel:
                input_[key] = value
        return input_
    else:
        raise TypeError(input_spec)


[docs]class DAGBuilder(_base.DecisionsBuilder): """SWF decision builder from DAG-type workflow specification.""" def __init__(self, workflow: "DAGWorkflow", task): super().__init__(workflow, task) self.workflow = workflow self._scheduled = {} self._activity_task_events = {at.id: [] for at in workflow.task_specs} self._new_events = None self._error_events = [] self._ready_activities = set() def _schedule_task(self, activity_task: Task): workflow_started_event = self.task["events"][0] assert workflow_started_event["eventType"] == "WorkflowExecutionStarted" attrs = workflow_started_event["workflowExecutionStartedEventAttributes"] decision_attributes = { "activityId": activity_task.id, "activityType": activity_task.type, } # Build input input_spec = activity_task.input workflow_input = json.loads(attrs.get("input", "null")) activity_results = {} for activity_task_id, events in self._activity_task_events.items(): if activity_task_id in (activity_task.dependencies or []): assert events[-1]["eventType"] == "ActivityTaskCompleted" elif not events or events[-1]["eventType"] != "ActivityTaskCompleted": continue d_attrs = events[-1].get("activityTaskCompletedEventAttributes", {}) if "result" in d_attrs: activity_results[activity_task_id] = json.loads(d_attrs["result"]) input_ = _build_activity_input(input_spec, workflow_input, activity_results) if input_ is not _sentinel: decision_attributes["input"] = json.dumps(input_) # Set other attributes if activity_task.heartbeat is not None: decision_attributes["heartbeatTimeout"] = str(activity_task.heartbeat) if activity_task.timeout is not None: decision_attributes["startToCloseTimeout"] = str(activity_task.timeout) if activity_task.task_list is not None: decision_attributes["taskList"] = {"name": activity_task.task_list} if activity_task.priority is not None: decision_attributes["taskPriority"] = str(activity_task.priority) decision = { "decisionType": "ScheduleActivityTask", "scheduleActivityTaskDecisionAttributes": decision_attributes, } self.decisions.append(decision) def _get_scheduled_references(self): for event in self.task["events"]: if event["eventType"] in _activity_events: if event["eventType"] == "ActivityTaskScheduled": self._scheduled[event["eventId"]] = event else: attrs = event[_attr_keys[event["eventType"]]] self._scheduled[event["eventId"]] = _get( attrs["scheduledEventId"], self.task["events"], "eventId" ) def _get_activity_task_events(self): for event in self.task["events"]: if event["eventType"] in _activity_events: scheduled_event = self._scheduled[event["eventId"]] attrs = scheduled_event["activityTaskScheduledEventAttributes"] self._activity_task_events[attrs["activityId"]].append(event) def _process_activity_task_completed_event(self, event: t.Dict[str, t.Any]): scheduled_event = self._scheduled[event["eventId"]] attrs = scheduled_event["activityTaskScheduledEventAttributes"] dependants_task = self.workflow.dependants[attrs["activityId"]] for activity_task_id in dependants_task: assert not self._activity_task_events[activity_task_id] task = next(a for a in self.workflow.task_specs if a.id == activity_task_id) dependencies_satisfied = True for dependency_activity_task_id in task.dependencies: events = self._activity_task_events[dependency_activity_task_id] if not events or events[-1]["eventType"] != "ActivityTaskCompleted": dependencies_satisfied = False break if dependencies_satisfied: self._ready_activities.add(task.id) def _complete_workflow(self): tasks_complete = True for events in self._activity_task_events.values(): if not events or events[-1]["eventType"] != "ActivityTaskCompleted": tasks_complete = False break if tasks_complete: result = {} for activity_id, events in self._activity_task_events.items(): assert events and events[-1]["eventType"] == "ActivityTaskCompleted" attrs = events[-1].get("activityTaskCompletedEventAttributes") if attrs and "result" in attrs: result[activity_id] = json.loads(attrs["result"]) decision = {"decisionType": "CompleteWorkflowExecution"} if result: decision_attrs = {"result": json.dumps(result)} decision["completeWorkflowExecutionDecisionAttributes"] = decision_attrs self.decisions = [decision] def _fail_workflow(self, reason=None, details=None): decision_attrs = {} if reason: decision_attrs["reason"] = reason if details: decision_attrs["details"] = details decision = {"decisionType": "FailWorkflowExecution"} if decision_attrs: decision["failWorkflowExecutionDecisionAttributes"] = decision_attrs self.decisions = [decision] def _process_decision_failed(self, event: t.Dict[str, t.Any]) -> bool: event_ids = [event["eventId"] for event in self.task["events"]] attrs = event[_attr_keys[event["eventType"]]] if attrs["cause"] == "OPERATION_NOT_PERMITTED": idx = event_ids.index(attrs["DecisionTaskCompletedEventId"]) dc_event = self.task["events"][idx] dc_attrs = dc_event["decisionTaskCompletedEventAttributes"] idx = event_ids.index(dc_attrs["startedEventId"]) ds_event = self.task["events"][idx] ds_attrs = ds_event["decisionTaskStartedEventAttributes"] this_ds_event = self.task["events"][-1] this_ds_attrs = this_ds_event["decisionTaskStartedEventAttributes"] if ds_attrs["identity"] == this_ds_attrs["identity"]: raise _base.DeciderError("Not permitted") else: return False elif attrs["cause"] != "UNHANDLED_DECISION": raise _base.DeciderError() if event["eventType"] == "CancelWorkflowExecutionFailed": self.decisions = [{"decisionType": "CancelWorkflowExecution"}] return True elif event["eventType"] == "FailWorkflowExecutionFailed": return False elif event["eventType"] == "CompleteWorkflowExecutionFailed": self._complete_workflow() return True def _schedule_initial_activity_tasks(self): for task_id in self.workflow.dependants[None]: self._ready_activities.add(task_id) def _process_error_events(self): if not self._error_events: return False activity_events = [] decider_events = [] time_out_events = [] other_events = [] for event in self._error_events: if event["eventType"] == "ActivityTaskFailed": activity_events.append(event) elif event["eventType"] == "ActivityTaskTimedOut": attr = event["activityTaskTimedOutEventAttributes"] if attr["timeoutType"] in ("START_TO_CLOSE", "HEARTBEAT"): activity_events.append(event) elif attr["timeoutType"] in ("SCHEDULE_TO_START", "SCHEDULE_TO_CLOSE"): time_out_events.append(event) elif event["eventType"] == "WorkflowExecutionCancelRequested": self.decisions = [{"decisionType": "CancelWorkflowExecution"}] return True elif event["eventType"] in _decision_failed_events: if self._process_decision_failed(event): return True decider_events.append(event) elif event["eventType"] in ( "DecisionTaskTimedOut", "WorkflowExecutionTimedOut", ): time_out_events.append(event) elif event["eventType"] == "RecordMarkerFailed": other_events.append(event) details = [] if activity_events: details.append("%d activities failed" % len(activity_events)) if decider_events: details.append("%d decisions failed" % len(decider_events)) if time_out_events: details.append("%d actions timed-out" % len(time_out_events)) if other_events: details.append("%d other actions failed" % len(other_events)) details = ", ".join(details) self._fail_workflow(details=details) return True def _process_event(self, event: t.Dict[str, t.Any]): if event["eventType"] == "ActivityTaskCompleted": self._process_activity_task_completed_event(event) elif event["eventType"] == "WorkflowExecutionStarted": self._schedule_initial_activity_tasks() def _get_new_events(self): event_ids = [event["eventId"] for event in self.task["events"]] current_idx = event_ids.index(self.task["startedEventId"]) previous_idx = -1 if self.task["previousStartedEventId"] in event_ids: previous_idx = event_ids.index(self.task["previousStartedEventId"]) events = self.task["events"][previous_idx + 1 : current_idx + 1] logger.debug( "Processing %d events from index %d (ID: %s) to %d (ID: %s)", len(events), previous_idx + 1, events[0]["eventId"], current_idx, events[-1]["eventId"], ) self._new_events = events def _schedule_tasks(self): for task_id in self._ready_activities: task = next(ts for ts in self.workflow.task_specs if ts.id == task_id) assert not self._activity_task_events[task.id] self._schedule_task(task) def _process_new_events(self): assert self.task["events"][-1]["eventType"] == "DecisionTaskStarted" assert self.task["events"][-2]["eventType"] == "DecisionTaskScheduled" for event in self._new_events[:-2]: if event["eventType"] in _error_events: self._error_events.append(event) if self._process_error_events(): return for event in self._new_events[:-2]: self._process_event(event) self._schedule_tasks() self._complete_workflow()
[docs] def build_decisions(self): self._get_scheduled_references() self._get_activity_task_events() self._get_new_events() self._process_new_events()
[docs]class DAGWorkflow(_base.Workflow): """Dag-type SWF workflow specification. Args: name: workflow name version: workflow version task_specs: DAG task specifications """ spec_type = "dag" decisions_builder = DAGBuilder _task_cls = Task dependants: t.Dict[t.Union[None, str], t.List[str]] def __init__(self, name, version, task_specs: t.List[Task], description=None): super().__init__(name, version, description) self.task_specs = task_specs self.dependants = {None: []} @classmethod def _args_from_spec(cls, spec): args, kwargs = super()._args_from_spec(spec) tasks = [cls._task_cls.from_spec(s) for s in spec["tasks"]] args += (tasks,) return args, kwargs def _build_dependants(self): for activity_task in self.task_specs: dependants_task = [] for other_activity_task in self.task_specs: if activity_task.id in (other_activity_task.dependencies or []): dependants_task.append(other_activity_task.id) self.dependants[activity_task.id] = dependants_task if not activity_task.dependencies: self.dependants[None].append(activity_task.id)
[docs] def setup(self): self._build_dependants()