Source code for seddy.registration

"""SWF workflow registration."""

import typing as t
import logging as lg
import pathlib

from . import _specs, _util

logger = lg.getLogger(__name__)


[docs] def list_workflows(domain: str, client) -> t.Dict[t.Tuple[str, str], bool]: """List all workflows in SWF, including registered and deprecated. Args: domain: domain to list workflows of client (botocore.client.BaseClient): SWF client Returns: names, versions and registration status of workflows in SWF """ logger.info("Listing workflows in '%s'", domain) # List registered workflows _kwargs = {"domain": domain, "registrationStatus": "REGISTERED"} resp_registered = _util.list_paginated( client.list_workflow_types, "typeInfos", _kwargs ) # List deprecated workflows _kwargs = {"domain": domain, "registrationStatus": "DEPRECATED"} resp_deprecated = _util.list_paginated( client.list_workflow_types, "typeInfos", _kwargs ) # Combine registered = [w["workflowType"] for w in resp_registered["typeInfos"]] deprecated = [w["workflowType"] for w in resp_deprecated["typeInfos"]] existing = {(w["name"], w["version"]): True for w in registered} existing.update({(w["name"], w["version"]): False for w in deprecated}) return existing
[docs] def register_workflow(workflow: _specs.Workflow, domain: str, client): """Register a workflow with SWF. Args: workflow: specification of workflow to register domain: domain to register workflow in client (botocore.client.BaseClient): SWF client """ _fmt = "Registering workflow '%s' (version %s) on domain '%s'" logger.info(_fmt, workflow.name, workflow.version, domain) # Get registration options kwargs = {} if workflow.description is not None: kwargs["description"] = workflow.description if workflow.registration: if workflow.registration.task_timeout is not None: _default = workflow.registration.task_timeout kwargs["defaultTaskStartToCloseTimeout"] = str(_default) if workflow.registration.execution_timeout is not None: _default = workflow.registration.execution_timeout kwargs["defaultExecutionStartToCloseTimeout"] = str(_default) if workflow.registration.task_list is not None: kwargs["defaultTaskList"] = {"name": workflow.registration.task_list} if workflow.registration.task_priority is not None: kwargs["defaultTaskPriority"] = str(workflow.registration.task_priority) if workflow.registration.child_policy is not None: kwargs["defaultChildPolicy"] = workflow.registration.child_policy.value if workflow.registration.lambda_role is not None: kwargs["defaultLambdaRole"] = workflow.registration.lambda_role # Register client.register_workflow_type( domain=domain, name=workflow.name, version=workflow.version, **kwargs, )
[docs] def deprecate_workflow(workflow: _specs.Workflow, domain: str, client): """Deprecate a workflow in SWF. Args: workflow: specification of workflow to deprecate domain: domain to deprecate workflow in client (botocore.client.BaseClient): SWF client """ _fmt = "Deprecating workflow '%s' (version %s) in domain '%s'" logger.info(_fmt, workflow.name, workflow.version, domain) workflow_type = {"name": workflow.name, "version": workflow.version} client.deprecate_workflow_type(domain=domain, workflowType=workflow_type)
[docs] def undeprecate_workflow(workflow: _specs.Workflow, domain: str, client): """Undeprecate a workflow in SWF. Args: workflow: specification of workflow to undeprecate domain: domain to undeprecate workflow in client (botocore.client.BaseClient): SWF client """ _fmt = "Undeprecating workflow '%s' (version %s) in domain '%s'" logger.info(_fmt, workflow.name, workflow.version, domain) workflow_type = {"name": workflow.name, "version": workflow.version} client.undeprecate_workflow_type(domain=domain, workflowType=workflow_type)
def _sync_workflow( workflow: _specs.Workflow, domain: str, existing: t.Dict[t.Tuple[str, str], bool], client, ): """Synchronise a workflow's registration with SWF. Args: workflow: specification of workflow to register domain: domain to register workflow in existing: client (botocore.client.BaseClient): SWF client """ is_active = workflow.registration.active if workflow.registration else True key = (workflow.name, workflow.version) if key in existing: if existing[key] is is_active: _fmt = "Skipping up-to-date workflow '%s' (version %s, active: %s)" logger.debug(_fmt, workflow.name, workflow.version, is_active) elif is_active: undeprecate_workflow(workflow, domain, client) elif not is_active: deprecate_workflow(workflow, domain, client) elif is_active: # don't register inactive workflows register_workflow(workflow, domain, client)
[docs] def register_workflows(workflows: t.List[_specs.Workflow], domain: str): """Synchronise workflow registration with SWF. Args: workflows: specifications of workflows to register domain: domain to register workflows in """ client = _util.get_swf_client() logger.log(25, "Registering workflows in '%s'", domain) # Get existing workflows existing = list_workflows(domain, client) logger.debug("Exising workflows: %s", existing) # Register workflows for workflow in workflows: _sync_workflow(workflow, domain, existing, client)
[docs] def run_app(workflows_spec_file: pathlib.Path, domain: str): """Run registration synchronisation application. Arguments: workflows_spec_file: workflows specifications file path domain: SWF domain """ workflows = _specs.load_workflows(workflows_spec_file) register_workflows(workflows, domain)