"""SWF decider."""
import uuid
import socket
import typing as t
import logging as lg
import pathlib
from concurrent import futures as cf
from . import _specs, _util
logger = lg.getLogger(__name__)
[docs]
class UnsupportedWorkflow(LookupError):
"""Decider doesn't support workflow."""
[docs]
class Decider:
"""SWF decider.
Args:
workflows_spec_file: workflows specifications file path
domain: SWF domain to poll in
task_list: SWF decider task-list
identity: decider identity, default: automatically generated from
fully-qualified domain-name and a UUID
Attributes:
client (botocore.client.BaseClient): SWF client
identity (str): name of decider to poll as
"""
def __init__(
self,
workflows_spec_file: pathlib.Path,
domain: str,
task_list: str,
identity: str = None,
):
self.workflows_spec_file = workflows_spec_file
self.domain = domain
self.task_list = task_list
self.client = _util.get_swf_client(socket_read_timeout=70.0)
self.identity = identity or (socket.getfqdn() + "-" + str(uuid.uuid4())[:8])
self._future = None
def _poll_for_decision_task(self) -> t.Dict[str, t.Any]:
"""Poll for a decision task from SWF.
See https://docs.aws.amazon.com/amazonswf/latest/apireference/API_PollForDecisionTask.html
Returns:
decision task
"""
_kwargs = {
"domain": self.domain,
"identity": self.identity,
"taskList": {"name": self.task_list},
}
return _util.list_paginated(
self.client.poll_for_decision_task, "events", _kwargs
)
def _get_workflow(self, task: t.Dict[str, t.Any]) -> _specs.Workflow:
"""Get workflow specification for task.
Args:
task: decision task
Returns:
workflow specification
"""
name = task["workflowType"]["name"]
version = task["workflowType"]["version"]
try:
return _specs.get_workflow(name, version, self.workflows_spec_file)
except _specs.WorkflowNotFound as e:
raise UnsupportedWorkflow(task["workflowType"]) from e
def _respond_decision_task_completed(
self, decisions: t.List[t.Dict[str, t.Any]], task: t.Dict[str, t.Any]
):
"""Send decisions to SWF.
See https://docs.aws.amazon.com/amazonswf/latest/apireference/API_RespondDecisionTaskCompleted.html
Args:
decisions: workflow decisions
task: decision task
"""
logger.debug(
"Sending %d decisions for task '%s'", len(decisions), task["taskToken"]
)
self.client.respond_decision_task_completed(
taskToken=task["taskToken"], decisions=decisions
)
def _poll_and_run(self):
"""Perform poll, and possibly run decision task."""
task = self._poll_for_decision_task()
logger.debug("Decision task: %s", task)
if not task["taskToken"]:
return
executor = cf.ThreadPoolExecutor(max_workers=1)
self._future = executor.submit(self._decide_and_respond, task)
self._future.result()
def _decide_and_respond(self, task):
"""Make and respond with decisions."""
logger.info(
"Got decision task '%s' for workflow '%s-%s' execution '%s' (run '%s')",
task["taskToken"],
task["workflowType"]["name"],
task["workflowType"]["version"],
task["workflowExecution"]["workflowId"],
task["workflowExecution"]["runId"],
)
try:
workflow = self._get_workflow(task)
except UnsupportedWorkflow:
logger.error("Unsupported workflow type: %s" % task["workflowType"])
raise
workflow.setup()
exc = None
try:
decisions = workflow.make_decisions(task)
except Exception as e:
decisions = _specs.make_decisions_on_error(e)
exc = e
self._respond_decision_task_completed(decisions, task)
if exc:
raise exc
def _run_uncaught(self):
"""Run decider."""
_fmt = "Polling for tasks in domain '%s' with task-list '%s' as '%s'"
logger.log(25, _fmt, self.domain, self.task_list, self.identity)
while True:
self._poll_and_run()
[docs]
def run(self):
"""Run decider."""
try:
self._run_uncaught()
except KeyboardInterrupt:
logger.info("Quitting due to keyboard-interrupt")
if self._future and self._future.running():
logger.log(25, "Waiting on current decision task to be handled")
self._future.result()
[docs]
def run_app(
workflows_spec_file: pathlib.Path, domain: str, task_list: str, identity: str = None
):
"""Run decider application.
Arguments:
workflows_spec_file: workflows specifications file path
domain: SWF domain
task_list: SWF decider task-list
identity: decider identity, default: automatically generated
"""
decider = Decider(workflows_spec_file, domain, task_list, identity)
decider.run()