Adding vulnerability report for plugins¶
Warning
This feature is provided as a tech preview and could change in backwards incompatible ways in the future.
Pulp provides a way to store known vulnerabilities from OSV for content units
within a specified RepositoryVersion
.
Each plugin will need to implement a function to construct the package payload
that will be used to query osv.dev database.
Note
As of now, querying by osv.dev commit
is not supported (use package
instead).
The first step in writing a vulnerability report for a Pulp content unit
is to identify the
package ecosystem
by checking
https://ossf.github.io/osv-schema/#defined-ecosystems.
The next step is to create an async function at the top level of the module (so it can be
loaded in pulpcore) that will be run as a Pulp task. This async function should return a generator
object with a dictionary containing the osv_data
(created through _build_osv_data
function in the following sample),
and also the Content
and RepositoryVersion
objects.
Here is an example of a function with the above steps:
from asgiref.sync import sync_to_async
from pulpcore.plugin.models import RepositoryVersion
from pulpcore.plugin.sync import sync_to_async_iterable
from myplugin.app.models import MyPluginContent
async def get_content_from_repo_version(repo_version_pk: str):
repo_version = await sync_to_async(RepositoryVersion.objects.get)(pk=repo_version_pk)
content_units = MyPluginContent.objects.filter(pk__in=repo_version.content)
ecosystem = "MyContentUnitEcosystem" # Content unit ecosystem from osv.dev (for ex "PyPI" for python content unit)
async for content in sync_to_async_iterable(content_units):
repo_content_osv_data = _build_osv_data(content.name, ecosystem, content.version)
repo_content_osv_data["repo_version"] = repo_version
repo_content_osv_data["content"] = content
yield repo_content_osv_data
def _build_osv_data(name, ecosystem, version=None):
osv_data = {"package": {"name": name, "ecosystem": ecosystem}}
if version:
osv_data["version"] = version
return osv_data
Now that we have the async generator function, we need to create a new method in the plugin
RepositoryVersionViewSet subclass (the plugin class that inherits from core.RepositoryVersionViewSet)
that will be used to dispatch the vulnerability report
task.
Note
In the following sample, we are not defining the permissions to access the endpoint. Plugin writters should define them according to each plugin needs.
from drf_spectacular.utils import extend_schema
from rest_framework.decorators import action
from pulpcore.plugin import viewsets as core_viewsets
from pulpcore.plugin.tasking import check_content, dispatch
class MyPluginRepositoryVersionViewSet(core_viewsets.RepositoryVersionViewSet):
parent_viewset = MyPluginRepositoryViewSet
@extend_schema(summary="Generate vulnerability report", responses={202: AsyncOperationResponseSerializer})
@action(detail=True, methods=["post"])
def scan(self, request, repository_pk, **kwargs):
repository_version = self.get_object()
func = f"{get_content_from_repo_version.__module__}.{get_content_from_repo_version.__name__}"
task = dispatch(
check_content,
shared_resources=[repository_version.repository],
args=[func, [repository_version.pk]],
)
return core_viewsets.OperationPostponedResponse(task, request)