Skip to content

pulp.tasking

pulpcore.tasking

pulp.tasking.worker

pulpcore.tasking.worker

PulpcoreWorker(auxiliary=False)

handle_worker_heartbeat()

Update worker heartbeat records.

If the update fails (the record was deleted, the database is unreachable, ...) the worker is shut down.

cancel_abandoned_task(task, final_state, reason=None)

Cancel and clean up an abandoned task.

This function must only be called while holding the lock for that task. It is a no-op if the task is neither in "running" nor "canceling" state.

Return True if the task was actually canceled, False otherwise.

unblock_tasks()

Iterate over waiting tasks and mark them unblocked accordingly.

This function also handles the communication around it. In order to prevent multiple workers to attempt unblocking tasks at the same time it tries to acquire a lock and just returns on failure to do so. Also it clears the notification about tasks to be unblocked and sends the notification that new unblocked tasks are made available.

Returns the number of new unblocked tasks.

iter_tasks()

Iterate over ready tasks and yield each task while holding the lock.

sleep()

Wait for signals on the wakeup channel while heart beating.

supervise_task(task)

Call and supervise the task process while heart beating.

This function must only be called while holding the lock for that task.

handle_unblocked_tasks()

Pick and supervise tasks until there are no more available tasks.

Failing to detect new available tasks can lead to a stuck state, as the workers would go to sleep and wouldn't be able to know about the unhandled task until an external wakeup event occurs (e.g., new worker startup or new task gets in).

pulp.tasking.storage

pulpcore.tasking.storage

WorkerDirectory(hostname)

Bases: _WorkingDir

The directory associated with a pulpcore-worker.

Path format: /

Parameters:

  • hostname (str) –

    The worker hostname.

create()

Create the directory.

The directory is deleted and recreated when already exists. Only one of these should ever be held at a time for any individual worker.

get_worker_path(hostname)

Get the root directory path for a worker by hostname.

Format: /

Parameters:

  • hostname (str) –

    The worker hostname.

Returns:

  • str

    The absolute path to a worker's root directory.

pulp.tasking.tasks

pulpcore.tasking.tasks

dispatch(func, args=None, kwargs=None, task_group=None, exclusive_resources=None, shared_resources=None, immediate=False, deferred=True, versions=None)

Enqueue a message to Pulp workers with a reservation.

This method provides normal enqueue functionality, while also requesting necessary locks for serialized urls. No two tasks that claim the same resource can execute concurrently. It accepts resources which it transforms into a list of urls (one for each resource).

This method creates a pulpcore.app.models.Task object and returns it.

The values in args and kwargs must be JSON serializable, but may contain instances of uuid.UUID.

Parameters:

  • func (callable | str) –

    The function to be run when the necessary locks are acquired.

  • args (tuple, default: None ) –

    The positional arguments to pass on to the task.

  • kwargs (dict, default: None ) –

    The keyword arguments to pass on to the task.

  • task_group (TaskGroup, default: None ) –

    A TaskGroup to add the created Task to.

  • exclusive_resources (list, default: None ) –

    A list of resources this task needs exclusive access to while running. Each resource can be either a str or a django.models.Model instance.

  • shared_resources (list, default: None ) –

    A list of resources this task needs non-exclusive access to while running. Each resource can be either a str or a django.models.Model instance.

  • immediate (bool, default: False ) –

    Whether to allow running this task immediately. It must be guaranteed to execute fast without blocking. If not all resource constraints are met, the task will either be returned in a canceled state or, if deferred is True be left in the queue to be picked up by a worker eventually. Defaults to False.

  • deferred (bool, default: True ) –

    Whether to allow defer running the task to a pulpcore_worker. Defaults to True. immediate and deferred cannot both be False.

  • versions (Optional[Dict[str, str]], default: None ) –

    Minimum versions of components by app_label the worker must provide to handle the task.

Returns (pulpcore.app.models.Task): The Pulp Task that was created.

Raises:

  • ValueError

    When resources is an unsupported type.

cancel_task(task_id)

Cancel the task that is represented by the given task_id.

This method cancels only the task with given task_id, not the spawned tasks. This also updates task's state to 'canceling'.

Parameters:

  • task_id (str) –

    The ID of the task you wish to cancel

Raises:

  • NotFound

    If a task with given task_id does not exist

cancel_task_group(task_group_id)

Cancel the task group that is represented by the given task_group_id.

This method attempts to cancel all tasks in the task group.

Parameters:

  • task_group_id (str) –

    The ID of the task group you wish to cancel

Raises:

  • DoesNotExist

    If a task group with given task_group_id does not exist

pulp.tasking._util

pulpcore.tasking._util

delete_incomplete_resources(task)

Delete all incomplete created-resources on a canceled task.

Parameters:

  • task (Task) –

    A task.

perform_task(task_pk, task_working_dir_rel_path)

Setup the environment to handle a task and execute it. This must be called as a subprocess, while the parent holds the advisory lock of the task.