Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions vulnerabilities/management/commands/populate_severity_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from django.core.management.base import BaseCommand
from vulnerabilities.models import VulnerabilitySeverity, AdvisorySeverity

class Command(BaseCommand):
help = "Populate scoring_elements_data for VulnerabilitySeverity and AdvisorySeverity"

def handle(self, *args, **options):
self.stdout.write("Starting population of VulnerabilitySeverity...")
qs = VulnerabilitySeverity.objects.filter(scoring_elements__isnull=False)
count = qs.count()
self.stdout.write(f"Found {count} VulnerabilitySeverity records to process.")

for i, severity in enumerate(qs.iterator(chunk_size=1000), start=1):
severity.save()
if i % 1000 == 0:
self.stdout.write(f"Processed {i}/{count} VulnerabilitySeverity records...")

self.stdout.write("Starting population of AdvisorySeverity...")
qs = AdvisorySeverity.objects.filter(scoring_elements__isnull=False)
count = qs.count()
self.stdout.write(f"Found {count} AdvisorySeverity records to process.")

for i, severity in enumerate(qs.iterator(chunk_size=1000), start=1):
severity.save()
if i % 1000 == 0:
self.stdout.write(f"Processed {i}/{count} AdvisorySeverity records...")

self.stdout.write("Population completed.")
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 5.2.11 on 2026-02-15 20:41

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('vulnerabilities', '0112_alter_advisoryseverity_scoring_system_and_more'),
]

operations = [
migrations.AddField(
model_name='advisoryseverity',
name='scoring_elements_data',
field=models.JSONField(blank=True, default=dict, help_text='Serialized scoring elements data.'),
),
migrations.AddField(
model_name='vulnerabilityseverity',
name='scoring_elements_data',
field=models.JSONField(blank=True, default=dict, help_text='Serialized scoring elements data.'),
),
]
42 changes: 42 additions & 0 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,27 @@ class VulnerabilitySeverity(models.Model):
"For example a CVSS vector string as used to compute a CVSS score.",
)

scoring_elements_data = models.JSONField(
default=dict,
blank=True,
help_text="Serialized scoring elements data.",
)

def save(self, *args, **kwargs):
if self.scoring_elements:
try:
scoring_system = SCORING_SYSTEMS.get(self.scoring_system)
if scoring_system:
self.scoring_elements_data = scoring_system.get(self.scoring_elements)
except (
CVSS2MalformedError,
CVSS3MalformedError,
CVSS4MalformedError,
NotImplementedError,
):
pass
super().save(*args, **kwargs)

published_at = models.DateTimeField(
blank=True, null=True, help_text="UTC Date of publication of the vulnerability severity"
)
Expand Down Expand Up @@ -2579,6 +2600,27 @@ class AdvisorySeverity(models.Model):
"For example a CVSS vector string as used to compute a CVSS score.",
)

scoring_elements_data = models.JSONField(
default=dict,
blank=True,
help_text="Serialized scoring elements data.",
)

def save(self, *args, **kwargs):
if self.scoring_elements:
try:
scoring_system = SCORING_SYSTEMS.get(self.scoring_system)
if scoring_system:
self.scoring_elements_data = scoring_system.get(self.scoring_elements)
except (
CVSS2MalformedError,
CVSS3MalformedError,
CVSS4MalformedError,
NotImplementedError,
):
pass
super().save(*args, **kwargs)

published_at = models.DateTimeField(
blank=True, null=True, help_text="UTC Date of publication of the vulnerability severity"
)
Expand Down
31 changes: 31 additions & 0 deletions vulnerabilities/tests/test_cvss_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@

from django.test import TestCase
from vulnerabilities.models import VulnerabilitySeverity, AdvisorySeverity
from vulnerabilities.severity_systems import SCORING_SYSTEMS

class TestSeverityStorage(TestCase):
def test_scoring_elements_data_population_vulnerability(self):
# CVSS v3.1 vector
vector = "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H"
severity = VulnerabilitySeverity.objects.create(
scoring_system="cvssv3.1",
scoring_elements=vector,
value="9.8"
)
severity.refresh_from_db()
self.assertTrue(severity.scoring_elements_data)
self.assertEqual(severity.scoring_elements_data['version'], '3.1')
self.assertEqual(severity.scoring_elements_data['vectorString'], vector)

def test_scoring_elements_data_population_advisory(self):
# CVSS v2 vector
vector = "AV:N/AC:L/Au:N/C:P/I:P/A:P"
severity = AdvisorySeverity.objects.create(
scoring_system="cvssv2",
scoring_elements=vector,
value="7.5"
)
severity.refresh_from_db()
self.assertTrue(severity.scoring_elements_data)
self.assertEqual(severity.scoring_elements_data['version'], '2.0')
self.assertEqual(severity.scoring_elements_data['vectorString'], vector)
73 changes: 39 additions & 34 deletions vulnerabilities/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def get_queryset(self):
Prefetch(
"severities",
queryset=models.VulnerabilitySeverity.objects.only(
"scoring_system", "value", "url", "scoring_elements", "published_at"
"scoring_system", "value", "url", "scoring_elements", "scoring_elements_data", "published_at"
),
),
Prefetch(
Expand Down Expand Up @@ -347,21 +347,22 @@ def get_context_data(self, **kwargs):
severity_vectors = []

for severity in valid_severities:
try:
vector_values_system = SCORING_SYSTEMS[severity.scoring_system]
if not vector_values_system:
logging.error(f"Unknown scoring system: {severity.scoring_system}")
continue
vector_values = vector_values_system.get(severity.scoring_elements)
if vector_values:
severity_vectors.append({"vector": vector_values, "origin": severity.url})
except (
CVSS2MalformedError,
CVSS3MalformedError,
CVSS4MalformedError,
NotImplementedError,
):
logging.error(f"CVSSMalformedError for {severity.scoring_elements}")
vector_values = severity.scoring_elements_data
if not vector_values and severity.scoring_elements:
try:
vector_values_system = SCORING_SYSTEMS.get(severity.scoring_system)
if vector_values_system:
vector_values = vector_values_system.get(severity.scoring_elements)
except (
CVSS2MalformedError,
CVSS3MalformedError,
CVSS4MalformedError,
NotImplementedError,
):
logging.error(f"CVSSMalformedError for {severity.scoring_elements}")

if vector_values:
severity_vectors.append({"vector": vector_values, "origin": severity.url})

epss_severity = vulnerability.severities.filter(scoring_system="epss").first()
epss_data = None
Expand Down Expand Up @@ -427,7 +428,7 @@ def get_queryset(self):
Prefetch(
"severities",
queryset=models.AdvisorySeverity.objects.only(
"scoring_system", "value", "url", "scoring_elements", "published_at"
"scoring_system", "value", "url", "scoring_elements", "scoring_elements_data", "published_at"
),
),
Prefetch(
Expand Down Expand Up @@ -494,23 +495,27 @@ def get_context_data(self, **kwargs):
severity_vectors = []

for severity in valid_severities:
try:
vector_values_system = SCORING_SYSTEMS.get(severity.scoring_system)
if not vector_values_system:
logging.error(f"Unknown scoring system: {severity.scoring_system}")
continue
if vector_values_system.identifier in ["cvssv3.1_qr"]:
continue
vector_values = vector_values_system.get(severity.scoring_elements)
if vector_values:
severity_vectors.append({"vector": vector_values, "origin": severity.url})
except (
CVSS2MalformedError,
CVSS3MalformedError,
CVSS4MalformedError,
NotImplementedError,
):
logging.error(f"CVSSMalformedError for {severity.scoring_elements}")
vector_values_system = SCORING_SYSTEMS.get(severity.scoring_system)
if not vector_values_system:
logging.error(f"Unknown scoring system: {severity.scoring_system}")
continue
if vector_values_system.identifier in ["cvssv3.1_qr"]:
continue

vector_values = severity.scoring_elements_data
if not vector_values and severity.scoring_elements:
try:
vector_values = vector_values_system.get(severity.scoring_elements)
except (
CVSS2MalformedError,
CVSS3MalformedError,
CVSS4MalformedError,
NotImplementedError,
):
logging.error(f"CVSSMalformedError for {severity.scoring_elements}")

if vector_values:
severity_vectors.append({"vector": vector_values, "origin": severity.url})

def add_ssvc(ssvc):
key = (ssvc.vector, ssvc.source_advisory_id)
Expand Down