Skip to content

pulpcore.plugin.stages

Plugin writers can use the Stages API to create a high-performance, download-and-saving pipeline to make writing sync code easier. There are several parts to the API:

  1. declarative-version is a generic pipeline useful for most synchronization use cases.
  2. The builtin Stages including artifact-stages and content-stages.
  3. The stages-api, which allows you to build custom stages and pipelines.

DeclarativeVersion

pulpcore.plugin.stages.DeclarativeVersion(first_stage, repository, mirror=False, acs=False)

A pipeline that creates a new pulpcore.plugin.models.RepositoryVersion from a stream of pulpcore.plugin.stages.DeclarativeContent objects.

The plugin writer needs to specify a first_stage that will create a pulpcore.plugin.stages.DeclarativeContent object for each Content unit that should exist in the pulpcore.plugin.models.RepositoryVersion.

The pipeline stages perform the following steps by default:

  1. Create the new pulpcore.plugin.models.RepositoryVersion
  2. Use the provided first_stage to construct pulpcore.plugin.stages.DeclarativeContent
  3. Query existing artifacts to determine which are already local to Pulp with pulpcore.plugin.stages.QueryExistingArtifacts
  4. Download any undownloaded pulpcore.plugin.models.Artifact objects with pulpcore.plugin.stages.ArtifactDownloader
  5. Save the newly downloaded pulpcore.plugin.models.Artifact objects with pulpcore.plugin.stages.ArtifactSaver
  6. Query for Content units already present in Pulp with pulpcore.plugin.stages.QueryExistingContents
  7. Save new Content units not yet present in Pulp with pulpcore.plugin.stages.ContentSaver
  8. Attach pulpcore.plugin.models.RemoteArtifact to the pulpcore.plugin.models.Content via pulpcore.plugin.stages.RemoteArtifactSaver
  9. Resolve the attached [asyncio.Future][] of pulpcore.plugin.stages.DeclarativeContent with pulpcore.plugin.stages.ResolveContentFutures
  10. Associate all content units with the new pulpcore.plugin.models.RepositoryVersion with pulpcore.plugin.stages.ContentAssociation
  11. Unassociate any content units not declared in the stream (only when mirror=True) with [pulpcore.plugin.stages.ContentUnassociation][]

To do this, the plugin writer should subclass the pulpcore.plugin.stages.Stage class and define its :meth:run() interface which returns a coroutine. This coroutine should download metadata, create the corresponding pulpcore.plugin.stages.DeclarativeContent objects, and put them into the [asyncio.Queue][] via :meth:put() to send them down the pipeline. For example::

class MyFirstStage(Stage):

    def __init__(remote):
        self.remote = remote

    async def run(self):
        downloader = remote.get_downloader(url=remote.url)
        result = await downloader.run()
        for entry in read_my_metadata_file_somehow(result.path)
            unit = MyContent(entry)  # make the content unit in memory-only
            artifact = Artifact(entry)  # make Artifact in memory-only
            da = DeclarativeArtifact(artifact, url, entry.relative_path, self.remote)
            dc = DeclarativeContent(content=unit, d_artifacts=[da])
            await self.put(dc)

To use your first stage with the pipeline you have to instantiate the subclass and pass it to pulpcore.plugin.stages.DeclarativeVersion.

  1. Create the instance of the subclassed pulpcore.plugin.stages.Stage object.
  2. Create the pulpcore.plugin.stages.DeclarativeVersion instance, passing the pulpcore.plugin.stages.Stage subclass instance to it
  3. Call the :meth:~pulpcore.plugin.stages.DeclarativeVersion.create method on your pulpcore.plugin.stages.DeclarativeVersion instance

Here is an example::

first_stage = MyFirstStage(remote)
DeclarativeVersion(first_stage, repository_version).create()

Parameters:

pipeline_stages(new_version)

Build the list of pipeline stages feeding into the ContentAssociation stage.

Plugin-writers may override this method to build a custom pipeline. This can be achieved by returning a list with different stages or by extending the list returned by this method.

Returns:

create()

Perform the work. This is the long-blocking call where all syncing occurs.

Returns: The created RepositoryVersion or None if it represents no change from the latest.

pulpcore.plugin.stages.DeclarativeArtifact(artifact=None, url=None, urls=None, relative_path=None, remote=None, extra_data=None, deferred_download=False)

Relates an pulpcore.plugin.models.Artifact, how to download it, and its relative_path used later during publishing.

This is used by the Stages API stages to determine if an pulpcore.plugin.models.Artifact is already present and ensure Pulp can download it in the future. The artifact can be either saved or unsaved. If unsaved, the artifact attributes may be incomplete because not all digest information can be computed until the pulpcore.plugin.models.Artifact is downloaded.

Attributes:

  • url (str) –

    the url to fetch the pulpcore.plugin.models.Artifact from.

  • urls (List[str]) –

    A list of many possible URLs to fetch the pulpcore.plugin.models.Artifact from.

  • relative_path (str) –

    the relative_path this pulpcore.plugin.models.Artifact should be published at for any Publication.

  • extra_data (dict) –

    A dictionary available for additional data to be stored in.

  • deferred_download (bool) –

    Whether this artifact should be downloaded and saved in the artifact stages. Defaults to False. See :ref:on-demand-support.

Raises:

  • ValueError

    If artifact, url, or relative_path are not specified. If remote is not

pulpcore.plugin.stages.DeclarativeContent(content=None, d_artifacts=None, extra_data=None)

Relates a Content unit and zero or more pulpcore.plugin.stages.DeclarativeArtifact objects.

This is used by the Stages API stages to determine if a Content unit is already present and ensure all of its associated pulpcore.plugin.stages.DeclarativeArtifact objects are related correctly. The content can be either saved or unsaved depending on where in the Stages API pipeline this is used.

Attributes:

  • content (subclass of [pulpcore.plugin.models.Content][]) –

    A Content unit, possibly unsaved

  • d_artifacts (list) –

    A list of zero or more pulpcore.plugin.stages.DeclarativeArtifact objects associated with content.

  • extra_data (dict) –

    A dictionary available for additional data to be stored in.

Raises:

  • ValueError

    If content is not specified.

Stages API

pulpcore.plugin.stages.create_pipeline(stages, maxsize=1) async

A coroutine that builds a Stages API linear pipeline from the list stages and runs it.

Each stage is an instance of a class derived from pulpcore.plugin.stages.Stage that implements the :meth:run coroutine. This coroutine reads asynchronously either from the items() iterator or the batches() iterator and outputs the items with put(). Here is an example of the simplest stage that only passes data::

class MyStage(Stage):
    async def run(self):
        async for d_content in self.items():  # Fetch items from the previous stage
            await self.put(d_content)  # Hand them over to the next stage

Parameters:

  • stages (list of coroutines) –

    A list of Stages API compatible coroutines.

  • maxsize (int, default: 1 ) –

    The maximum amount of items a queue between two stages should hold. Optional and defaults to 1.

Returns:

  • A single coroutine that can be used to run, wait, or cancel the entire pipeline with.

Raises: ValueError: When a stage instance is specified more than once.

pulpcore.plugin.stages.Stage()

The base class for all Stages API stages.

To make a stage, inherit from this class and implement :meth:run on the subclass.

run() async

The coroutine that is run as part of this stage.

Returns:

  • The coroutine that runs this stage.

items() async

Asynchronous iterator yielding items of [DeclarativeContent][] from self._in_q.

The iterator will get instances of [DeclarativeContent][] one by one as they get available.

Yields:

  • An instance of [DeclarativeContent][]

Examples:

Used in stages to get d_content instances one by one from self._in_q::

class MyStage(Stage):
    async def run(self):
        async for d_content in self.items():
            # process declarative content
            await self.put(d_content)

batches(minsize=500) async

Asynchronous iterator yielding batches of [DeclarativeContent][] from self._in_q.

The iterator will try to get as many instances of [DeclarativeContent][] as possible without blocking, but at least minsize instances.

Parameters:

  • minsize (int, default: 500 ) –

    The minimum batch size to yield (unless it is the final batch)

Yields:

  • A list of [DeclarativeContent][] instances

Examples:

Used in stages to get large chunks of d_content instances from self._in_q::

class MyStage(Stage):
    async def run(self):
        async for batch in self.batches():
            for d_content in batch:
                # process declarative content
                await self.put(d_content)

put(item) async

Coroutine to pass items to the next stage.

Parameters:

Raises:

  • ValueError

    When item is None.

pulpcore.plugin.stages.EndStage()

Bases: Stage

A Stages API stage that drains incoming items and does nothing with the items. This is required at the end of all pipelines.

Without this stage, the maxsize of the last stage's _out_q could fill up and block the entire pipeline.

pulpcore.plugin.stages.ArtifactDownloader(max_concurrent_content=200, *args, **kwargs)

Bases: GenericDownloader

A Stages API stage to download pulpcore.plugin.models.Artifact files, but don't save the pulpcore.plugin.models.Artifact in the db.

This stage downloads the file for any pulpcore.plugin.models.Artifact objects missing files and creates a new pulpcore.plugin.models.Artifact object from the downloaded file and its digest data. The new pulpcore.plugin.models.Artifact is not saved but added to the pulpcore.plugin.stages.DeclarativeArtifact object, replacing the likely incomplete pulpcore.plugin.models.Artifact.

Each pulpcore.plugin.stages.DeclarativeContent is sent to self._out_q after all of its pulpcore.plugin.stages.DeclarativeArtifact objects have been handled.

pulpcore.plugin.stages.ArtifactSaver()

Bases: Stage

A Stages API stage that saves any unsaved :attr:DeclarativeArtifact.artifact objects.

This stage expects pulpcore.plugin.stages.DeclarativeContent units from self._in_q and inspects their associated pulpcore.plugin.stages.DeclarativeArtifact objects. Each pulpcore.plugin.stages.DeclarativeArtifact object stores one pulpcore.plugin.models.Artifact.

Any unsaved pulpcore.plugin.models.Artifact objects are saved. Each pulpcore.plugin.stages.DeclarativeContent is sent to self._out_q after all of its pulpcore.plugin.stages.DeclarativeArtifact objects have been handled.

This stage drains all available items from self._in_q and batches everything into one large call to the db for efficiency.

run() async

The coroutine for this stage.

Returns:

  • The coroutine for this stage.

pulpcore.plugin.stages.RemoteArtifactSaver(fix_mismatched_remote_artifacts=False, *args, **kwargs)

Bases: Stage

A Stage that saves pulpcore.plugin.models.RemoteArtifact objects

An pulpcore.plugin.models.RemoteArtifact object is saved for each pulpcore.plugin.stages.DeclarativeArtifact.

run() async

The coroutine for this stage.

Returns:

  • The coroutine for this stage.

pulpcore.plugin.stages.QueryExistingArtifacts()

Bases: Stage

A Stages API stage that replaces :attr:DeclarativeContent.content objects with already-saved pulpcore.plugin.models.Artifact objects.

This stage expects pulpcore.plugin.stages.DeclarativeContent units from self._in_q and inspects their associated pulpcore.plugin.stages.DeclarativeArtifact objects. Each pulpcore.plugin.stages.DeclarativeArtifact object stores one pulpcore.plugin.models.Artifact.

This stage inspects any unsaved pulpcore.plugin.models.Artifact objects and searches using their metadata for existing saved pulpcore.plugin.models.Artifact objects inside Pulp with the same digest value(s). Any existing pulpcore.plugin.models.Artifact objects found will replace their unsaved counterpart in the pulpcore.plugin.stages.DeclarativeArtifact object.

Each pulpcore.plugin.stages.DeclarativeContent is sent to self._out_q after all of its pulpcore.plugin.stages.DeclarativeArtifact objects have been handled.

This stage drains all available items from self._in_q and batches everything into one large call to the db for efficiency.

run() async

The coroutine for this stage.

Returns:

  • The coroutine for this stage.

pulpcore.plugin.stages.ContentSaver()

Bases: Stage

A Stages API stage that saves :attr:DeclarativeContent.content objects and saves its related pulpcore.plugin.models.ContentArtifact objects too.

This stage expects pulpcore.plugin.stages.DeclarativeContent units from self._in_q and inspects their associated pulpcore.plugin.stages.DeclarativeArtifact objects. Each pulpcore.plugin.stages.DeclarativeArtifact object stores one pulpcore.plugin.models.Artifact.

Each "unsaved" Content objects is saved and a pulpcore.plugin.models.ContentArtifact objects too.

Each pulpcore.plugin.stages.DeclarativeContent is sent to after it has been handled.

This stage drains all available items from self._in_q and batches everything into one large call to the db for efficiency.

run() async

The coroutine for this stage.

Returns:

  • The coroutine for this stage.

pulpcore.plugin.stages.QueryExistingContents()

Bases: Stage

A Stages API stage that saves :attr:DeclarativeContent.content objects and saves its related pulpcore.plugin.models.ContentArtifact objects too.

This stage expects pulpcore.plugin.stages.DeclarativeContent units from self._in_q and inspects their associated pulpcore.plugin.stages.DeclarativeArtifact objects. Each pulpcore.plugin.stages.DeclarativeArtifact object stores one pulpcore.plugin.models.Artifact.

This stage inspects any "unsaved" Content unit objects and searches for existing saved Content units inside Pulp with the same unit key. Any existing Content objects found, replace their "unsaved" counterpart in the pulpcore.plugin.stages.DeclarativeContent object.

Each pulpcore.plugin.stages.DeclarativeContent is sent to self._out_q after it has been handled.

This stage drains all available items from self._in_q and batches everything into one large call to the db for efficiency.

run() async

The coroutine for this stage.

Returns:

  • The coroutine for this stage.

pulpcore.plugin.stages.ResolveContentFutures()

Bases: Stage

This stage resolves the futures in pulpcore.plugin.stages.DeclarativeContent.

Futures results are set to the found/created pulpcore.plugin.models.Content.

This is useful when data downloaded from the plugin API needs to be parsed by FirstStage to create additional pulpcore.plugin.stages.DeclarativeContent objects to be send down the pipeline. Consider an example where content type Foo references additional instances of a different content type Bar. Consider this code in FirstStage::

# Create d_content and d_artifact for a `foo_a`
foo_a = DeclarativeContent(...)
# Send it in the pipeline
await self.put(foo_a)

...

foo_a_content = await foo_a.resolution()  # awaits until the foo_a reaches this stage

This creates a "looping" pattern, of sorts, where downloaded content at the end of the pipeline can introduce new additional to-be-downloaded content at the beginning of the pipeline. On the other hand, it can impose a substantial performance decrement of batching content in the earlier stages. If you want to drop a declarative content prematurely from the pipeline, use the function resolve() to unblock the coroutines awaiting the attached future and do not hand the content to the next stage. As a rule of thumb, sending more items into the pipeline first and awaiting their resolution later is better.

run() async

The coroutine for this stage.

Returns:

  • The coroutine for this stage.

pulpcore.plugin.stages.ContentAssociation(new_version, mirror, *args, **kwargs)

Bases: Stage

A Stages API stage that associates content units with new_version.

This stage stores all content unit primary keys in memory before running. This is done to compute the units already associated but not received from self._in_q. These units are passed via self._out_q to the next stage as a [django.db.models.query.QuerySet][].

This stage creates a ProgressReport named 'Associating Content' that counts the number of units associated. Since it's a stream the total count isn't known until it's finished.

If mirror was enabled, then content units may also be un-assocated (removed) from new_version. A ProgressReport named 'Un-Associating Content' is created that counts the number of units un-associated.

Parameters:

  • mirror (bool) –

    Whether or not to "mirror" the stream of DeclarativeContent - whether content not in the stream should be removed from the repository.

  • args

    unused positional arguments passed along to pulpcore.plugin.stages.Stage.

  • kwargs

    unused keyword arguments passed along to pulpcore.plugin.stages.Stage.

run() async

The coroutine for this stage.

Returns:

  • The coroutine for this stage.