Skip to content

Adding vulnerability report for plugins

Warning

This feature is provided as a tech preview and could change in backwards incompatible ways in the future.

Pulp provides a way to store known vulnerabilities from OSV for content units within a specified RepositoryVersion. Each plugin will need to implement a function to construct the package payload that will be used to query osv.dev database.

Note

As of now, querying by osv.dev commit is not supported (use package instead).

The first step in writing a vulnerability report for a Pulp content unit is to identify the package ecosystem by checking https://ossf.github.io/osv-schema/#defined-ecosystems.

The next step is to create an async function at the top level of the module (so it can be loaded in pulpcore) that will be run as a Pulp task. This async function should return a generator object with a dictionary containing the osv_data (created through _build_osv_data function in the following sample), and also the Content and RepositoryVersion objects.

Here is an example of a function with the above steps:

from asgiref.sync import sync_to_async
from pulpcore.plugin.models import RepositoryVersion
from pulpcore.plugin.sync import sync_to_async_iterable
from myplugin.app.models import MyPluginContent

async def get_content_from_repo_version(repo_version_pk: str):
    repo_version = await sync_to_async(RepositoryVersion.objects.get)(pk=repo_version_pk)
    content_units = MyPluginContent.objects.filter(pk__in=repo_version.content)
    ecosystem = "MyContentUnitEcosystem" # Content unit ecosystem from osv.dev (for ex "PyPI" for python content unit)
    async for content in sync_to_async_iterable(content_units):
        repo_content_osv_data = _build_osv_data(content.name, ecosystem, content.version)
        repo_content_osv_data["repo_version"] = repo_version
        repo_content_osv_data["content"] = content
        yield repo_content_osv_data

def _build_osv_data(name, ecosystem, version=None):
    osv_data = {"package": {"name": name, "ecosystem": ecosystem}}
    if version:
        osv_data["version"] = version
    return osv_data

Now that we have the async generator function, we need to create a new method in the plugin RepositoryVersionViewSet subclass (the plugin class that inherits from core.RepositoryVersionViewSet) that will be used to dispatch the vulnerability report task.

Note

In the following sample, we are not defining the permissions to access the endpoint. Plugin writters should define them according to each plugin needs.

from drf_spectacular.utils import extend_schema
from rest_framework.decorators import action

from pulpcore.plugin import viewsets as core_viewsets
from pulpcore.plugin.tasking import check_content, dispatch

class MyPluginRepositoryVersionViewSet(core_viewsets.RepositoryVersionViewSet):
    parent_viewset = MyPluginRepositoryViewSet

    @extend_schema(summary="Generate vulnerability report", responses={202: AsyncOperationResponseSerializer})
    @action(detail=True, methods=["post"])
    def scan(self, request, repository_pk, **kwargs):
        repository_version = self.get_object()
        func = f"{get_content_from_repo_version.__module__}.{get_content_from_repo_version.__name__}"
        task = dispatch(
            check_content,
            shared_resources=[repository_version.repository],
            args=[func, [repository_version.pk]],
        )
        return core_viewsets.OperationPostponedResponse(task, request)