ogdc_runner.argo module

class ogdc_runner.argo.ArgoConfig(namespace: str, service_account_name: str, workflows_service_url: str, runner_image: str, runner_image_tag: str, image_pull_policy: str, workflow_retry_strategy: models.RetryStrategy | None)

Bases: object

Configuration for Argo workflows.

Parameters:
  • namespace (str)

  • service_account_name (str)

  • workflows_service_url (str)

  • runner_image (str)

  • runner_image_tag (str)

  • image_pull_policy (str)

  • workflow_retry_strategy (models.RetryStrategy | None)

property full_image_path: str

Return the full image path with tag.

class ogdc_runner.argo.ArgoManager None

Bases: object

Manager for Argo workflow configurations and services.

property config: ArgoConfig

Get the current configuration.

update_image(image: str | None = None, tag: str | None = None, pull_policy: str | None = None) None

Update the runner image configuration and re-apply global config.

Parameters:
  • image (str | None) – New image path (without tag)

  • tag (str | None) – New image tag

  • pull_policy (str | None) – New image pull policy

Return type:

None

property workflow_service: hera.workflows.WorkflowsService

Get the current workflow service.

ogdc_runner.argo.OgdcWorkflow(*, recipe_config: RecipeConfig, name: str, archive_workflow: bool, **kwargs: Any) Generator[hera.workflows.Workflow, None, None]

Contexts manager that yields an argo workflow with configuration driven by recipe_config.

kwargs:

  • recipe_config: Recipe configuration that is driving this workflow.

  • name: name of this workflow. Used with the recipe ID to generate a name for the argo workflow.

  • archive_workflow: Set to True to archive this workflow on workflow success. It is recommended to archive workflows that are used to transform data for provenance and metrics reasons. Workflows that do some task not related to data transformation can set this to False.

This context manager sets the following on argo.workflows.Workflow:

  • generate_name: based on recipe ID and the name.

  • workflows_service: uses the ogdc’s configured Argo workflows service

  • labels: Adds ogdc/persist-workflow-in-archive label based on archive_workflow kwarg. Other passed labels are preserved.

  • ttl_strategy: Set to 7 days if the recipe’s output type is temporary to allow sufficient type for the submitter of the recipe to retrieve the results. Otherwise this is left unset.

All other kwargs are passed directly to `argo.workflows.Workflow.

Parameters:
Return type:

Generator[Workflow, None, None]

ogdc_runner.argo.get_workflow_status(workflow_name: str) str | None

Return the given workflow’s status (e.g., ‘Succeeded’)

Parameters:

workflow_name (str)

Return type:

str | None

ogdc_runner.argo.make_generate_name(*, recipe_id: str, suffix: str = '') str

Create a workflow generate_name, truncating recipe_id if necessary.

Kubernetes names must be no more than 63 characters. Argo appends a 5-character random suffix to generate_name to create the final workflow name. This function truncates the recipe_id as needed to ensure the final name stays within the limit.

Parameters:
  • recipe_id (str) – The recipe identifier to use as the base name

  • suffix (str) – The suffix to append (e.g., “remove-existing-data”)

Return type:

str

Returns:

A generate_name string that will produce a valid Kubernetes name

ogdc_runner.argo.submit_workflow(workflow: hera.workflows.Workflow, *, wait: bool = False) str

Submit the given workflow and return its name as a str.

Parameters:
  • workflow (Workflow)

  • wait (bool)

Return type:

str

ogdc_runner.argo.wait_for_workflow_completion(workflow_name: str) None
Parameters:

workflow_name (str)

Return type:

None