Skip to content

pulp.tasking

pulpcore.tasking

pulp.tasking.worker

pulpcore.tasking.worker

PulpcoreWorker()

handle_worker_heartbeat()

Create or update worker heartbeat records.

Existing Worker objects are searched for one to update. If an existing one is found, it is updated. Otherwise a new Worker entry is created. Logging at the info level is also done.

cancel_abandoned_task(task, final_state, reason=None)

Cancel and clean up an abandoned task.

This function must only be called while holding the lock for that task. It is a no-op if the task is neither in "running" nor "canceling" state.

Return True if the task was actually canceled, False otherwise.

unblock_tasks()

Iterate over waiting tasks and mark them unblocked accordingly.

Returns True if at least one task was unblocked. False otherwise.

iter_tasks()

Iterate over ready tasks and yield each task while holding the lock.

sleep()

Wait for signals on the wakeup channel while heart beating.

supervise_task(task)

Call and supervise the task process while heart beating.

This function must only be called while holding the lock for that task.

handle_available_tasks()

Pick and supervise tasks until there are no more available tasks.

Failing to detect new available tasks can lead to a stuck state, as the workers would go to sleep and wouldn't be able to know about the unhandled task until an external wakeup event occurs (e.g., new worker startup or new task gets in).

pulp.tasking.storage

pulpcore.tasking.storage

WorkerDirectory(hostname)

Bases: _WorkingDir

The directory associated with a pulpcore-worker.

Path format: /

Parameters:

  • hostname (str) –

    The worker hostname.

create()

Create the directory.

The directory is deleted and recreated when already exists. Only one of these should ever be held at a time for any individual worker.

get_worker_path(hostname)

Get the root directory path for a worker by hostname.

Format: /

Parameters:

  • hostname (str) –

    The worker hostname.

Returns:

  • str

    The absolute path to a worker's root directory.

pulp.tasking.tasks

pulpcore.tasking.tasks

dispatch(func, args=None, kwargs=None, task_group=None, exclusive_resources=None, shared_resources=None, immediate=False, deferred=True, versions=None)

Enqueue a message to Pulp workers with a reservation.

This method provides normal enqueue functionality, while also requesting necessary locks for serialized urls. No two tasks that claim the same resource can execute concurrently. It accepts resources which it transforms into a list of urls (one for each resource).

This method creates a pulpcore.app.models.Task object and returns it.

The values in args and kwargs must be JSON serializable, but may contain instances of uuid.UUID.

Parameters:

  • func (callable | str) –

    The function to be run when the necessary locks are acquired.

  • args (tuple, default: None ) –

    The positional arguments to pass on to the task.

  • kwargs (dict, default: None ) –

    The keyword arguments to pass on to the task.

  • task_group (TaskGroup, default: None ) –

    A TaskGroup to add the created Task to.

  • exclusive_resources (list, default: None ) –

    A list of resources this task needs exclusive access to while running. Each resource can be either a str or a django.models.Model instance.

  • shared_resources (list, default: None ) –

    A list of resources this task needs non-exclusive access to while running. Each resource can be either a str or a django.models.Model instance.

  • immediate (bool, default: False ) –

    Whether to allow running this task immediately. It must be guaranteed to execute fast without blocking. If not all resource constraints are met, the task will either be returned in a canceled state or, if deferred is True be left in the queue to be picked up by a worker eventually. Defaults to False.

  • deferred (bool, default: True ) –

    Whether to allow defer running the task to a pulpcore_worker. Defaults to True. immediate and deferred cannot both be False.

  • versions (Optional[Dict[str, str]], default: None ) –

    Minimum versions of components by app_label the worker must provide to handle the task.

Returns (pulpcore.app.models.Task): The Pulp Task that was created.

Raises:

  • ValueError

    When resources is an unsupported type.

cancel_task(task_id)

Cancel the task that is represented by the given task_id.

This method cancels only the task with given task_id, not the spawned tasks. This also updates task's state to 'canceling'.

Parameters:

  • task_id (str) –

    The ID of the task you wish to cancel

Raises:

  • NotFound

    If a task with given task_id does not exist

cancel_task_group(task_group_id)

Cancel the task group that is represented by the given task_group_id.

This method attempts to cancel all tasks in the task group.

Parameters:

  • task_group_id (str) –

    The ID of the task group you wish to cancel

Raises:

  • DoesNotExist

    If a task group with given task_group_id does not exist

pulp.tasking._util

pulpcore.tasking._util

delete_incomplete_resources(task)

Delete all incomplete created-resources on a canceled task.

Parameters:

  • task (Task) –

    A task.

perform_task(task_pk, task_working_dir_rel_path)

Setup the environment to handle a task and execute it. This must be called as a subprocess, while the parent holds the advisory lock of the task.