pulpcore.plugin.stages¶
Plugin writers can use the Stages API to create a high-performance, download-and-saving pipeline to make writing sync code easier. There are several parts to the API:
declarative-version
is a generic pipeline useful for most synchronization use cases.- The builtin Stages including
artifact-stages
andcontent-stages
. - The
stages-api
, which allows you to build custom stages and pipelines.
DeclarativeVersion¶
pulpcore.plugin.stages.DeclarativeVersion(first_stage, repository, mirror=False, acs=False)
¶
A pipeline that creates a new pulpcore.plugin.models.RepositoryVersion from a stream of pulpcore.plugin.stages.DeclarativeContent objects.
The plugin writer needs to specify a first_stage that will create a pulpcore.plugin.stages.DeclarativeContent object for each Content unit that should exist in the pulpcore.plugin.models.RepositoryVersion.
The pipeline stages perform the following steps by default:
- Create the new pulpcore.plugin.models.RepositoryVersion
- Use the provided
first_stage
to construct pulpcore.plugin.stages.DeclarativeContent - Query existing artifacts to determine which are already local to Pulp with pulpcore.plugin.stages.QueryExistingArtifacts
- Download any undownloaded pulpcore.plugin.models.Artifact objects with pulpcore.plugin.stages.ArtifactDownloader
- Save the newly downloaded pulpcore.plugin.models.Artifact objects with pulpcore.plugin.stages.ArtifactSaver
- Query for Content units already present in Pulp with pulpcore.plugin.stages.QueryExistingContents
- Save new Content units not yet present in Pulp with pulpcore.plugin.stages.ContentSaver
- Attach pulpcore.plugin.models.RemoteArtifact to the pulpcore.plugin.models.Content via pulpcore.plugin.stages.RemoteArtifactSaver
- Resolve the attached [asyncio.Future][] of pulpcore.plugin.stages.DeclarativeContent with pulpcore.plugin.stages.ResolveContentFutures
- Associate all content units with the new pulpcore.plugin.models.RepositoryVersion with pulpcore.plugin.stages.ContentAssociation
- Unassociate any content units not declared in the stream (only when mirror=True) with [pulpcore.plugin.stages.ContentUnassociation][]
To do this, the plugin writer should subclass the
pulpcore.plugin.stages.Stage class and define its
:meth:run()
interface which returns a coroutine. This coroutine should
download metadata, create the corresponding
pulpcore.plugin.stages.DeclarativeContent objects, and put them into the
[asyncio.Queue][] via :meth:put()
to send them down the pipeline. For example::
class MyFirstStage(Stage):
def __init__(remote):
self.remote = remote
async def run(self):
downloader = remote.get_downloader(url=remote.url)
result = await downloader.run()
for entry in read_my_metadata_file_somehow(result.path)
unit = MyContent(entry) # make the content unit in memory-only
artifact = Artifact(entry) # make Artifact in memory-only
da = DeclarativeArtifact(artifact, url, entry.relative_path, self.remote)
dc = DeclarativeContent(content=unit, d_artifacts=[da])
await self.put(dc)
To use your first stage with the pipeline you have to instantiate the subclass and pass it to pulpcore.plugin.stages.DeclarativeVersion.
- Create the instance of the subclassed pulpcore.plugin.stages.Stage object.
- Create the pulpcore.plugin.stages.DeclarativeVersion instance, passing the pulpcore.plugin.stages.Stage subclass instance to it
- Call the :meth:
~pulpcore.plugin.stages.DeclarativeVersion.create
method on your pulpcore.plugin.stages.DeclarativeVersion instance
Here is an example::
first_stage = MyFirstStage(remote)
DeclarativeVersion(first_stage, repository_version).create()
Parameters:
-
mirror
(bool
, default:False
) –'True' removes content units from the pulpcore.plugin.models.RepositoryVersion that are not requested in the pulpcore.plugin.stages.DeclarativeVersion stream. 'False' (additive) only adds content units observed in the :class:
~pulpcore.plugin.stages.DeclarativeVersion stream
, and does not remove any pre-existing units in the pulpcore.plugin.models.RepositoryVersion. 'False' is the default. -
acs
(bool
, default:False
) –When set to 'True' a new stage is added to look for Alternate Content Sources.
pipeline_stages(new_version)
¶
Build the list of pipeline stages feeding into the ContentAssociation stage.
Plugin-writers may override this method to build a custom pipeline. This can be achieved by returning a list with different stages or by extending the list returned by this method.
Returns:
-
list
–List of pulpcore.plugin.stages.Stage instances
create()
¶
Perform the work. This is the long-blocking call where all syncing occurs.
Returns: The created RepositoryVersion or None if it represents no change from the latest.
pulpcore.plugin.stages.DeclarativeArtifact(artifact=None, url=None, urls=None, relative_path=None, remote=None, extra_data=None, deferred_download=False)
¶
Relates an pulpcore.plugin.models.Artifact, how to download it, and its
relative_path
used later during publishing.
This is used by the Stages API stages to determine if an
pulpcore.plugin.models.Artifact is already present and ensure Pulp can download it in
the future. The artifact
can be either saved or unsaved. If unsaved, the artifact
attributes
may be incomplete because not all digest information can be computed until the
pulpcore.plugin.models.Artifact is downloaded.
Attributes:
-
url
(str
) –the url to fetch the pulpcore.plugin.models.Artifact from.
-
urls
(List[str]
) –A list of many possible URLs to fetch the pulpcore.plugin.models.Artifact from.
-
relative_path
(str
) –the relative_path this pulpcore.plugin.models.Artifact should be published at for any Publication.
-
extra_data
(dict
) –A dictionary available for additional data to be stored in.
-
deferred_download
(bool
) –Whether this artifact should be downloaded and saved in the artifact stages. Defaults to
False
. See :ref:on-demand-support
.
Raises:
-
ValueError
–If
artifact
,url
, orrelative_path
are not specified. Ifremote
is not
pulpcore.plugin.stages.DeclarativeContent(content=None, d_artifacts=None, extra_data=None)
¶
Relates a Content unit and zero or more pulpcore.plugin.stages.DeclarativeArtifact objects.
This is used by the Stages API stages to determine if a Content unit is already present and
ensure all of its associated pulpcore.plugin.stages.DeclarativeArtifact objects are
related correctly. The content
can be either saved or unsaved depending on where in the Stages
API pipeline this is used.
Attributes:
-
content
(subclass of [pulpcore.plugin.models.Content][]
) –A Content unit, possibly unsaved
-
d_artifacts
(list
) –A list of zero or more pulpcore.plugin.stages.DeclarativeArtifact objects associated with
content
. -
extra_data
(dict
) –A dictionary available for additional data to be stored in.
Raises:
-
ValueError
–If
content
is not specified.
Stages API¶
pulpcore.plugin.stages.create_pipeline(stages, maxsize=1)
async
¶
A coroutine that builds a Stages API linear pipeline from the list stages
and runs it.
Each stage is an instance of a class derived from pulpcore.plugin.stages.Stage that
implements the :meth:run
coroutine. This coroutine reads asynchronously either from the
items()
iterator or the batches()
iterator and outputs the items with put()
. Here is an
example of the simplest stage that only passes data::
class MyStage(Stage):
async def run(self):
async for d_content in self.items(): # Fetch items from the previous stage
await self.put(d_content) # Hand them over to the next stage
Parameters:
-
stages
(list of coroutines
) –A list of Stages API compatible coroutines.
-
maxsize
(int
, default:1
) –The maximum amount of items a queue between two stages should hold. Optional and defaults to 1.
Returns:
-
–
A single coroutine that can be used to run, wait, or cancel the entire pipeline with.
Raises: ValueError: When a stage instance is specified more than once.
pulpcore.plugin.stages.Stage()
¶
The base class for all Stages API stages.
To make a stage, inherit from this class and implement :meth:run
on the subclass.
run()
async
¶
The coroutine that is run as part of this stage.
Returns:
-
–
The coroutine that runs this stage.
items()
async
¶
Asynchronous iterator yielding items of [DeclarativeContent][] from self._in_q
.
The iterator will get instances of [DeclarativeContent][] one by one as they get available.
Yields:
-
–
An instance of [DeclarativeContent][]
Examples:
Used in stages to get d_content instances one by one from self._in_q
::
class MyStage(Stage):
async def run(self):
async for d_content in self.items():
# process declarative content
await self.put(d_content)
batches(minsize=500)
async
¶
Asynchronous iterator yielding batches of [DeclarativeContent][] from self._in_q
.
The iterator will try to get as many instances of
[DeclarativeContent][] as possible without blocking, but
at least minsize
instances.
Parameters:
-
minsize
(int
, default:500
) –The minimum batch size to yield (unless it is the final batch)
Yields:
-
–
A list of [DeclarativeContent][] instances
Examples:
Used in stages to get large chunks of d_content instances from self._in_q
::
class MyStage(Stage):
async def run(self):
async for batch in self.batches():
for d_content in batch:
# process declarative content
await self.put(d_content)
put(item)
async
¶
Coroutine to pass items to the next stage.
Parameters:
-
item
–A handled instance of pulpcore.plugin.stages.DeclarativeContent
Raises:
-
ValueError
–When
item
is None.
pulpcore.plugin.stages.EndStage()
¶
Bases: Stage
A Stages API stage that drains incoming items and does nothing with the items. This is required at the end of all pipelines.
Without this stage, the maxsize
of the last stage's _out_q
could fill up and block the
entire pipeline.
Artifact Related Stages¶
pulpcore.plugin.stages.ArtifactDownloader(max_concurrent_content=200, *args, **kwargs)
¶
Bases: GenericDownloader
A Stages API stage to download pulpcore.plugin.models.Artifact files, but don't save the pulpcore.plugin.models.Artifact in the db.
This stage downloads the file for any pulpcore.plugin.models.Artifact objects missing files and creates a new pulpcore.plugin.models.Artifact object from the downloaded file and its digest data. The new pulpcore.plugin.models.Artifact is not saved but added to the pulpcore.plugin.stages.DeclarativeArtifact object, replacing the likely incomplete pulpcore.plugin.models.Artifact.
Each pulpcore.plugin.stages.DeclarativeContent is sent to self._out_q
after all of
its pulpcore.plugin.stages.DeclarativeArtifact objects have been handled.
pulpcore.plugin.stages.ArtifactSaver()
¶
Bases: Stage
A Stages API stage that saves any unsaved :attr:DeclarativeArtifact.artifact
objects.
This stage expects pulpcore.plugin.stages.DeclarativeContent units from self._in_q
and inspects their associated pulpcore.plugin.stages.DeclarativeArtifact objects. Each
pulpcore.plugin.stages.DeclarativeArtifact object stores one
pulpcore.plugin.models.Artifact.
Any unsaved pulpcore.plugin.models.Artifact objects are saved. Each
pulpcore.plugin.stages.DeclarativeContent is sent to self._out_q
after all of its
pulpcore.plugin.stages.DeclarativeArtifact objects have been handled.
This stage drains all available items from self._in_q
and batches everything into one large
call to the db for efficiency.
run()
async
¶
The coroutine for this stage.
Returns:
-
–
The coroutine for this stage.
pulpcore.plugin.stages.RemoteArtifactSaver(fix_mismatched_remote_artifacts=False, *args, **kwargs)
¶
Bases: Stage
A Stage that saves pulpcore.plugin.models.RemoteArtifact objects
An pulpcore.plugin.models.RemoteArtifact object is saved for each pulpcore.plugin.stages.DeclarativeArtifact.
run()
async
¶
The coroutine for this stage.
Returns:
-
–
The coroutine for this stage.
pulpcore.plugin.stages.QueryExistingArtifacts()
¶
Bases: Stage
A Stages API stage that replaces :attr:DeclarativeContent.content
objects with already-saved
pulpcore.plugin.models.Artifact objects.
This stage expects pulpcore.plugin.stages.DeclarativeContent units from self._in_q
and inspects their associated pulpcore.plugin.stages.DeclarativeArtifact objects. Each
pulpcore.plugin.stages.DeclarativeArtifact object stores one
pulpcore.plugin.models.Artifact.
This stage inspects any unsaved pulpcore.plugin.models.Artifact objects and searches using their metadata for existing saved pulpcore.plugin.models.Artifact objects inside Pulp with the same digest value(s). Any existing pulpcore.plugin.models.Artifact objects found will replace their unsaved counterpart in the pulpcore.plugin.stages.DeclarativeArtifact object.
Each pulpcore.plugin.stages.DeclarativeContent is sent to self._out_q
after all of
its pulpcore.plugin.stages.DeclarativeArtifact objects have been handled.
This stage drains all available items from self._in_q
and batches everything into one large
call to the db for efficiency.
run()
async
¶
The coroutine for this stage.
Returns:
-
–
The coroutine for this stage.
Content Related Stages¶
pulpcore.plugin.stages.ContentSaver()
¶
Bases: Stage
A Stages API stage that saves :attr:DeclarativeContent.content
objects and saves its related
pulpcore.plugin.models.ContentArtifact objects too.
This stage expects pulpcore.plugin.stages.DeclarativeContent units from self._in_q
and inspects their associated pulpcore.plugin.stages.DeclarativeArtifact objects. Each
pulpcore.plugin.stages.DeclarativeArtifact object stores one
pulpcore.plugin.models.Artifact.
Each "unsaved" Content objects is saved and a pulpcore.plugin.models.ContentArtifact objects too.
Each pulpcore.plugin.stages.DeclarativeContent is sent to after it has been handled.
This stage drains all available items from self._in_q
and batches everything into one large
call to the db for efficiency.
run()
async
¶
The coroutine for this stage.
Returns:
-
–
The coroutine for this stage.
pulpcore.plugin.stages.QueryExistingContents()
¶
Bases: Stage
A Stages API stage that saves :attr:DeclarativeContent.content
objects and saves its related
pulpcore.plugin.models.ContentArtifact objects too.
This stage expects pulpcore.plugin.stages.DeclarativeContent units from self._in_q
and inspects their associated pulpcore.plugin.stages.DeclarativeArtifact objects. Each
pulpcore.plugin.stages.DeclarativeArtifact object stores one
pulpcore.plugin.models.Artifact.
This stage inspects any "unsaved" Content unit objects and searches for existing saved Content units inside Pulp with the same unit key. Any existing Content objects found, replace their "unsaved" counterpart in the pulpcore.plugin.stages.DeclarativeContent object.
Each pulpcore.plugin.stages.DeclarativeContent is sent to self._out_q
after it has
been handled.
This stage drains all available items from self._in_q
and batches everything into one large
call to the db for efficiency.
run()
async
¶
The coroutine for this stage.
Returns:
-
–
The coroutine for this stage.
pulpcore.plugin.stages.ResolveContentFutures()
¶
Bases: Stage
This stage resolves the futures in pulpcore.plugin.stages.DeclarativeContent.
Futures results are set to the found/created pulpcore.plugin.models.Content.
This is useful when data downloaded from the plugin API needs to be parsed by FirstStage to
create additional pulpcore.plugin.stages.DeclarativeContent objects to be send down
the pipeline. Consider an example where content type Foo
references additional instances of a
different content type Bar
. Consider this code in FirstStage::
# Create d_content and d_artifact for a `foo_a`
foo_a = DeclarativeContent(...)
# Send it in the pipeline
await self.put(foo_a)
...
foo_a_content = await foo_a.resolution() # awaits until the foo_a reaches this stage
This creates a "looping" pattern, of sorts, where downloaded content at the end of the pipeline
can introduce new additional to-be-downloaded content at the beginning of the pipeline.
On the other hand, it can impose a substantial performance decrement of batching content in the
earlier stages.
If you want to drop a declarative content prematurely from the pipeline, use the function
resolve()
to unblock the coroutines awaiting the attached future and do not hand the content
to the next stage.
As a rule of thumb, sending more items into the pipeline first and awaiting their resolution
later is better.
run()
async
¶
The coroutine for this stage.
Returns:
-
–
The coroutine for this stage.
pulpcore.plugin.stages.ContentAssociation(new_version, mirror, *args, **kwargs)
¶
Bases: Stage
A Stages API stage that associates content units with new_version
.
This stage stores all content unit primary keys in memory before running. This is done to
compute the units already associated but not received from self._in_q
. These units are passed
via self._out_q
to the next stage as a [django.db.models.query.QuerySet][].
This stage creates a ProgressReport named 'Associating Content' that counts the number of units associated. Since it's a stream the total count isn't known until it's finished.
If mirror
was enabled, then content units may also be un-assocated (removed) from
new_version
. A ProgressReport named 'Un-Associating Content' is created that counts the number
of units un-associated.
Parameters:
-
mirror
(bool
) –Whether or not to "mirror" the stream of DeclarativeContent - whether content not in the stream should be removed from the repository.
-
args
–unused positional arguments passed along to pulpcore.plugin.stages.Stage.
-
kwargs
–unused keyword arguments passed along to pulpcore.plugin.stages.Stage.
run()
async
¶
The coroutine for this stage.
Returns:
-
–
The coroutine for this stage.