8000 Add Postgres Live V2 Importer Pipeline by michaelehab · Pull Request #1982 · aboutcode-org/vulnerablecode · GitHub
[go: up one dir, main page]

Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
from vulnerabilities.pipelines.v2_importers import nvd_importer as nvd_importer_v2
from vulnerabilities.pipelines.v2_importers import oss_fuzz as oss_fuzz_v2
from vulnerabilities.pipelines.v2_importers import postgresql_importer as postgresql_importer_v2
from vulnerabilities.pipelines.v2_importers import (
postgresql_live_importer as postgresql_live_importer_v2,
)
from vulnerabilities.pipelines.v2_importers import pypa_importer as pypa_importer_v2
from vulnerabilities.pipelines.v2_importers import pysec_importer as pysec_importer_v2
from vulnerabilities.pipelines.v2_importers import redhat_importer as redhat_importer_v2
Expand Down Expand Up @@ -117,3 +120,9 @@
oss_fuzz.OSSFuzzImporter,
]
)

LIVE_IMPORTERS_REGISTRY = create_registry(
[
postgresql_live_importer_v2.PostgreSQLLiveImporterPipeline,
]
)
102 changes: 102 additions & 0 deletions vulnerabilities/pipelines/v2_importers/postgresql_live_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
#

import logging
from typing import Iterable

from packageurl import PackageURL
from univers.versions import GenericVersion
from univers.versions import SemverVersion

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.pipelines.v2_importers.postgresql_importer import PostgreSQLImporterPipeline

logger = logging.getLogger(__name__)


class PostgreSQLLiveImporterPipeline(PostgreSQLImporterPipeline):
"""
Live importer for PostgreSQL that filters the batch output to a single PURL.
"""

pipeline_id = "postgresql_live_importer_v2"
supported_types = ["generic"]

@classmethod
def steps(cls):
return (
cls.get_purl_inputs,
cls.collect_and_store_advisories,
)

def get_purl_inputs(self):
purl = self.inputs.get("purl")
if not purl:
raise ValueError("PURL is required for PostgreSQLLiveImporterPipeline")

if isinstance(purl, str):
purl = PackageURL.from_string(purl)

if not isinstance(purl, PackageURL):
raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance")

if purl.type not in self.supported_types:
raise ValueError(
f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}"
)

if purl.name != "postgresql":
raise ValueError(f"PURL: {purl!s} is expected to be for 'postgresql'")

if not purl.version:
raise ValueError(f"PURL: {purl!s} is expected to have a version")

self.purl = purl

def collect_advisories(self) -> Iterable[AdvisoryData]:
for advisory in super().collect_advisories():
if self._advisory_affects_purl(advisory):
yield advisory

def _advisory_affects_purl(self, advisory: AdvisoryData) -> bool:
if not advisory.affected_packages:
return False

try:
package_semver_version = SemverVersion(self.purl.version)
package_generic_version = GenericVersion(self.purl.version)
except Exception as e:
logger.debug(f"Invalid PURL version {self.purl.version!r}: {e}")
return False

for ap in advisory.affected_packages:
if ap.package.type != "generic" or ap.package.name != "postgresql":
continue

purl_q = self.purl.qualifiers or None
ap_q = ap.package.qualifiers or None

if purl_q is None and ap_q is None:
qualifiers_match = True
else:
qualifiers_match = all(ap_q.get(k) == v for k, v in purl_q.items())

if not qualifiers_match:
continue

try:
if getattr(ap, "affected_version_range", None):
if package_semver_version in ap.affected_version_range:
return True
elif getattr(ap, "fixed_version", None):
if package_generic_version < ap.fixed_version:
return True
except Exception as e:
logger.debug(f"Version comparison failed for {package_semver_version}: {e}")
continue

return False
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
#

import pytest
import requests
from packageurl import PackageURL

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.pipelines.v2_importers.postgresql_live_importer import (
PostgreSQLLiveImporterPipeline,
)

HTML_BASE = """
<html>
<body>
<table>
<tbody>
<tr>
<td>
<span class="nobr"><a href="/support/security/CVE-2022-1234/">CVE-2022-1234</a></span><br>
<a href="/about/news/postgresql-175-169-1513-1418-and-1321-released-3072/">Announcement</a><br>
</td>
<td>{affected}</td>
<td>{fixed}</td>
<td><a href="/vector?vector=CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H">9.8</a></td>
<td>{summary}</td>
</tr>
</tbody>
</table>
</body>
</html>
"""


class DummyResponse:
def __init__(self, content):
self.content = content.encode("utf-8")


def test_affected_version(monkeypatch):
html = HTML_BASE.format(affected="10.0, 10.1", fixed="10.2", summary="Issue affects all")
monkeypatch.setattr(requests, "get", lambda url: DummyResponse(html))

purl = PackageURL(type="generic", name="postgresql", version="10.1")
pipeline = PostgreSQLLiveImporterPipeline(purl=purl)
pipeline.get_purl_inputs()
advisories = list(pipeline.collect_advisories())

assert len(advisories) == 1
adv = advisories[0]
assert isinstance(adv, AdvisoryData)
assert adv.advisory_id == "CVE-2022-1234"


def test_unaffected_version(monkeypatch):
html = HTML_BASE.format(affected="10.0, 10.1", fixed="10.2", summary="Issue affects all")
monkeypatch.setattr(requests, "get", lambda url: DummyResponse(html))

purl = PackageURL(type="generic", name="postgresql", version="10.2")
pipeline = PostgreSQLLiveImporterPipeline(purl=purl)
pipeline.get_purl_inputs()
advisories = list(pipeline.collect_advisories())

assert len(advisories) == 0


def test_qualifier_filtering(monkeypatch):
html = HTML_BASE.format(affected="12.0, 12.1", fixed="12.2", summary="Windows-specific issue")
monkeypatch.setattr(requests, "get", lambda url: DummyResponse(html))

purl = PackageURL(
type="generic", name="postgresql", version="12.1", qualifiers={"os": "windows"}
)
pipeline = PostgreSQLLiveImporterPipeline(purl=purl)
pipeline.get_purl_inputs()
advisories = list(pipeline.collect_advisories())
assert len(advisories) == 1

purl = PackageURL(type="generic", name="postgresql", version="12.1", qualifiers={"os": "linux"})
pipeline = PostgreSQLLiveImporterPipeline(purl=purl)
pipeline.get_purl_inputs()
advisories = list(pipeline.collect_advisories())
assert len(advisories) == 0


def test_invalid_purl():
pipeline = PostgreSQLLiveImporterPipeline()

pipeline.inputs = {"purl": "pkg:pypi/postgresql@10.1"}
with pytest.raises(ValueError):
pipeline.get_purl_inputs()

pipeline.inputs = {"purl": "pkg:generic/notpostgresql@10.1"}
with pytest.raises(ValueError):
pipeline.get_purl_inputs()

pipeline.inputs = {"purl": "pkg:generic/postgresql"}
with pytest.raises(ValueError):
pipeline.get_purl_inputs()
0