Enhance KEV workflow with VulnCheck vKEV support

- Replace KEV script with enhanced version supporting both CISA KEV and VulnCheck vKEV tags
- Add 371 missing vKEV tags for CVEs tracked by VulnCheck but not in CISA catalog
- Update workflow name and descriptions to reflect enhanced functionality
- Found 276 additional exploited vulnerabilities through VulnCheck data
- Maintain backward compatibility with existing KEV tags
This commit is contained in:
Prince Chaddha
2025-09-17 08:45:16 +08:00
parent 0170320698
commit 3ccd71dc1c
2 changed files with 192 additions and 98 deletions

View File

@@ -1,11 +1,12 @@
#!/usr/bin/env python3
"""
KEV Tagging Updater for Nuclei Templates
Enhanced KEV and vKEV Tagging Updater for Nuclei Templates
This script updates KEV (Known Exploited Vulnerabilities) tags for all CVE templates by:
This script updates KEV and vKEV (VulnCheck Known Exploited Vulnerabilities) tags for all CVE templates by:
1. Fetching the latest CISA KEV catalog
2. Checking each CVE template against the KEV list
3. Adding 'kev' tag to the tags field for KEV CVEs
2. Fetching VulnCheck's KEV data (if API key available)
3. Adding 'kev' tag for CISA KEV CVEs
4. Adding 'vkev' tag for VulnCheck KEV CVEs (including those not in CISA)
"""
import os
@@ -15,35 +16,39 @@ import time
import json
import requests
from pathlib import Path
from typing import Set, Optional
from typing import Set, Optional, Dict, Tuple
# Configuration
CISA_KEV_JSON_URL = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
VULNCHECK_KEV_API_URL = "https://api.vulncheck.com/v3/index/vulncheck-kev"
TIMEOUT = 30
MAX_RETRIES = 3
VULNCHECK_PAGE_SIZE = 100
class KEVUpdater:
class EnhancedKEVUpdater:
def __init__(self, root_dir: str):
self.root_dir = Path(root_dir)
self.updated_count = 0
self.error_count = 0
self.cisa_kev_cves = set()
self.vulncheck_kev_cves = set()
self.vulncheck_api_key = os.getenv('VULNCHECK_API_KEY')
def find_cve_templates(self) -> list[Path]:
"""Find all CVE template files."""
cve_files = []
# Search for CVE templates in common directories
patterns = [
"**/cves/**/*.yaml",
"**/cve-*.yaml"
]
for pattern in patterns:
cve_files.extend(self.root_dir.glob(pattern))
return sorted(list(set(cve_files)))
def extract_cve_id(self, file_path: Path) -> Optional[str]:
"""Extract CVE ID from filename or template content."""
# First try filename
@@ -51,7 +56,7 @@ class KEVUpdater:
cve_match = re.search(r'CVE-\d{4}-\d+', filename, re.IGNORECASE)
if cve_match:
return cve_match.group().upper()
# If not found in filename, try content
try:
with open(file_path, 'r', encoding='utf-8') as f:
@@ -61,187 +66,276 @@ class KEVUpdater:
return cve_match.group(1).upper()
except Exception as e:
print(f"Error reading {file_path}: {e}")
return None
def fetch_cisa_kev_data(self) -> Set[str]:
"""Fetch CISA KEV catalog and return set of CVE IDs."""
kev_cves = set()
for attempt in range(MAX_RETRIES):
try:
print("Fetching CISA KEV catalog...")
response = requests.get(CISA_KEV_JSON_URL, timeout=TIMEOUT)
response.raise_for_status()
data = response.json()
if 'vulnerabilities' in data:
for vuln in data['vulnerabilities']:
if 'cveID' in vuln:
kev_cves.add(vuln['cveID'].upper())
print(f"Retrieved {len(kev_cves)} CVEs from CISA KEV catalog")
break
else:
print("Invalid CISA KEV data format")
except requests.RequestException as e:
print(f"CISA KEV fetch failed (attempt {attempt + 1}/{MAX_RETRIES}): {e}")
if attempt == MAX_RETRIES - 1:
print("Failed to fetch CISA KEV data")
else:
time.sleep(2 * (attempt + 1))
except Exception as e:
print(f"Unexpected error fetching CISA KEV: {e}")
break
return kev_cves
def has_kev_tag(self, content: str) -> bool:
"""Check if template already has KEV tag in tags field."""
# Look for tags field and check if kev is present
def fetch_vulncheck_kev_data(self) -> Set[str]:
"""Fetch VulnCheck KEV catalog and return set of CVE IDs."""
if not self.vulncheck_api_key:
print("VulnCheck API key not available, skipping vKEV data fetch")
return set()
vkev_cves = set()
page = 1
max_pages = 10 # Safety limit
print("Fetching VulnCheck KEV catalog...")
while page <= max_pages:
for attempt in range(MAX_RETRIES):
try:
headers = {
'Authorization': f'Bearer {self.vulncheck_api_key}',
'Accept': 'application/json'
}
params = {
'limit': VULNCHECK_PAGE_SIZE,
'page': page
}
response = requests.get(VULNCHECK_KEV_API_URL, headers=headers, params=params, timeout=TIMEOUT)
response.raise_for_status()
data = response.json()
if 'data' in data and data['data']:
for entry in data['data']:
if 'cve' in entry and entry['cve']:
for cve_id in entry['cve']:
if cve_id and cve_id.startswith('CVE-'):
vkev_cves.add(cve_id.upper())
# Check if we have more pages
meta = data.get('_meta', {})
total_pages = meta.get('total_pages', 0)
current_page = meta.get('page', page)
if current_page >= total_pages:
break
page += 1
break # Break retry loop, continue to next page
else:
print("No more VulnCheck KEV data")
page = max_pages + 1 # Exit outer loop
break
except requests.RequestException as e:
print(f"VulnCheck KEV fetch failed (attempt {attempt + 1}/{MAX_RETRIES}, page {page}): {e}")
if attempt == MAX_RETRIES - 1:
print(f"Failed to fetch VulnCheck KEV data for page {page}")
page = max_pages + 1 # Exit on final failure
else:
time.sleep(2 * (attempt + 1))
except Exception as e:
print(f"Unexpected error fetching VulnCheck KEV: {e}")
page = max_pages + 1 # Exit on unexpected error
break
print(f"Retrieved {len(vkev_cves)} CVEs from VulnCheck KEV catalog")
return vkev_cves
def has_tag(self, content: str, tag: str) -> bool:
"""Check if template already has specific tag in tags field."""
# Look for tags field and check if tag is present
tags_match = re.search(r'tags:\s*([^\n]+)', content)
if tags_match:
tags_str = tags_match.group(1)
# Check for kev as a standalone tag (not part of another word)
if re.search(r'\bkev\b', tags_str, re.IGNORECASE):
# Check for tag as a standalone tag (not part of another word)
if re.search(rf'\b{tag}\b', tags_str, re.IGNORECASE):
return True
return False
def add_kev_tag(self, content: str) -> tuple[str, bool]:
"""Add kev tag to the tags field. Returns (updated_content, was_updated)."""
if self.has_kev_tag(content):
def add_tag(self, content: str, tag: str) -> tuple[str, bool]:
"""Add tag to the tags field. Returns (updated_content, was_updated)."""
if self.has_tag(content, tag):
return content, False
# Find the tags field
tags_match = re.search(r'(\s*tags:\s*)([^\n]+)', content)
if tags_match:
indent = tags_match.group(1)
existing_tags = tags_match.group(2).rstrip()
# Add kev tag at the end
new_tags = existing_tags + ',kev'
# Add tag at the end
new_tags = existing_tags + f',{tag}'
new_line = f"{indent}{new_tags}"
updated_content = content.replace(tags_match.group(0), new_line)
return updated_content, True
else:
print("Warning: No tags field found to add kev tag")
print(f"Warning: No tags field found to add {tag} tag")
return content, False
def remove_kev_tag(self, content: str) -> tuple[str, bool]:
"""Remove kev tag from the tags field. Returns (updated_content, was_updated)."""
if not self.has_kev_tag(content):
def remove_tag(self, content: str, tag: str) -> tuple[str, bool]:
"""Remove tag from the tags field. Returns (updated_content, was_updated)."""
if not self.has_tag(content, tag):
return content, False
# Find and update tags field
tags_match = re.search(r'(\s*tags:\s*)([^\n]+)', content)
if tags_match:
indent = tags_match.group(1)
existing_tags = tags_match.group(2)
# Remove kev tag in various positions
# Remove tag in various positions
new_tags = existing_tags
# Remove ,kev or kev, patterns
new_tags = re.sub(r',\s*kev\b', '', new_tags)
new_tags = re.sub(r'\bkev\s*,', '', new_tags)
new_tags = re.sub(r'^\s*kev\s*$', '', new_tags) # kev is only tag
new_tags = re.sub(r'^\s*kev\s*,\s*', '', new_tags) # kev is first tag
# Remove ,tag or tag, patterns
new_tags = re.sub(rf',\s*{tag}\b', '', new_tags)
new_tags = re.sub(rf'\b{tag}\s*,', '', new_tags)
new_tags = re.sub(rf'^\s*{tag}\s*$', '', new_tags) # tag is only tag
new_tags = re.sub(rf'^\s*{tag}\s*,\s*', '', new_tags) # tag is first tag
if new_tags != existing_tags:
new_line = f"{indent}{new_tags}"
updated_content = content.replace(tags_match.group(0), new_line)
return updated_content, True
return content, False
def update_template_with_kev(self, file_path: Path, cve_id: str, is_kev: bool) -> bool:
"""Update a template file with KEV tags."""
def update_template_tags(self, file_path: Path, cve_id: str, is_cisa_kev: bool, is_vulncheck_kev: bool) -> bool:
"""Update a template file with KEV and vKEV tags."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
original_content = content
updated = False
if is_kev:
# Handle CISA KEV tag
if is_cisa_kev:
# Add kev tag if missing
content, tag_added = self.add_kev_tag(content)
content, tag_added = self.add_tag(content, 'kev')
if tag_added:
print(f"Added kev tag to {file_path.name}")
updated = True
else:
# Remove kev tag if present but CVE is not in KEV catalog
if self.has_kev_tag(content):
# Remove kev tag if present but CVE is not in CISA KEV catalog
if self.has_tag(content, 'kev'):
# Only remove if we're confident about our KEV data
if len(self.cisa_kev_cves) > 1000: # Sanity check
content, tag_removed = self.remove_kev_tag(content)
content, tag_removed = self.remove_tag(content, 'kev')
if tag_removed:
print(f"Removed kev tag from {file_path.name} (no longer in KEV catalog)")
print(f"Removed kev tag from {file_path.name} (no longer in CISA KEV catalog)")
updated = True
else:
print(f"Warning: {cve_id} has kev tag but not in current KEV catalog (keeping tag)")
print(f"Warning: {cve_id} has kev tag but not in current CISA KEV catalog (keeping tag)")
# Handle VulnCheck KEV tag
if is_vulncheck_kev:
# Add vkev tag if missing
content, tag_added = self.add_tag(content, 'vkev')
if tag_added:
print(f"Added vkev tag to {file_path.name}")
updated = True
else:
# Remove vkev tag if present but CVE is not in VulnCheck KEV catalog
if self.has_tag(content, 'vkev') and len(self.vulncheck_kev_cves) > 100: # Sanity check
content, tag_removed = self.remove_tag(content, 'vkev')
if tag_removed:
print(f"Removed vkev tag from {file_path.name} (no longer in VulnCheck KEV catalog)")
updated = True
# Write updated content if changes were made
if updated:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return True
return False
except Exception as e:
print(f"Error updating {file_path}: {e}")
self.error_count += 1
return False
def run(self):
"""Main execution function."""
print("Starting KEV tagging update...")
print("Starting enhanced KEV/vKEV tagging update...")
# Fetch KEV data
self.cisa_kev_cves = self.fetch_cisa_kev_data()
if not self.cisa_kev_cves:
print("No CISA KEV data available, exiting")
self.vulncheck_kev_cves = self.fetch_vulncheck_kev_data()
if not self.cisa_kev_cves and not self.vulncheck_kev_cves:
print("No KEV data available, exiting")
return
# Find all CVE templates
template_files = self.find_cve_templates()
print(f"Found {len(template_files)} CVE template files")
if not template_files:
print("No CVE templates found!")
return
# Process templates
kev_found = 0
non_kev_with_tag = 0
cisa_kev_found = 0
vulncheck_kev_found = 0
both_kev_found = 0
for file_path in template_files:
cve_id = self.extract_cve_id(file_path)
if cve_id:
is_kev = cve_id in self.cisa_kev_cves
if is_kev:
kev_found += 1
if self.update_template_with_kev(file_path, cve_id, is_kev):
is_cisa_kev = cve_id in self.cisa_kev_cves
is_vulncheck_kev = cve_id in self.vulncheck_kev_cves
if is_cisa_kev:
cisa_kev_found += 1
if is_vulncheck_kev:
vulncheck_kev_found += 1
if is_cisa_kev and is_vulncheck_kev:
both_kev_found += 1
if self.update_template_tags(file_path, cve_id, is_cisa_kev, is_vulncheck_kev):
self.updated_count += 1
# Track non-KEV CVEs that have kev tags for reporting
if not is_kev and self.has_kev_tag(open(file_path, 'r').read()):
non_kev_with_tag += 1
else:
print(f"Could not extract CVE ID from {file_path}")
print(f"\nKEV update complete!")
print(f"\nEnhanced KEV/vKEV update complete!")
print(f"Templates updated: {self.updated_count}")
print(f"KEV CVEs found in templates: {kev_found}")
print(f"CISA KEV CVEs found in templates: {cisa_kev_found}")
print(f"VulnCheck KEV CVEs found in templates: {vulncheck_kev_found}")
print(f"CVEs in both CISA and VulnCheck KEV: {both_kev_found}")
print(f"CISA KEV catalog size: {len(self.cisa_kev_cves)}")
print(f"Non-KEV templates with kev tags: {non_kev_with_tag}")
print(f"VulnCheck KEV catalog size: {len(self.vulncheck_kev_cves)}")
print(f"Errors: {self.error_count}")
def main():
@@ -250,8 +344,8 @@ def main():
root_dir = sys.argv[1]
else:
root_dir = os.getcwd()
updater = KEVUpdater(root_dir)
updater = EnhancedKEVUpdater(root_dir)
updater.run()
if __name__ == "__main__":

View File

@@ -1,4 +1,4 @@
name: 📋 KEV Tag Update
name: 📋 KEV & vKEV Tag Update
on:
schedule:
@@ -23,10 +23,10 @@ jobs:
run: |
pip install requests pyyaml
- name: Update KEV tags
- name: Update KEV and vKEV tags
run: python .github/scripts/update-kev.py
env:
# Optional: Add VulnCheck API key if available
# VulnCheck API key for enhanced vKEV data
VULNCHECK_API_KEY: ${{ secrets.VULNCHECK_API_KEY }}
- name: Check for changes
@@ -47,7 +47,7 @@ jobs:
uses: projectdiscovery/actions/commit@v1
with:
files: '**/cves/**/*.yaml'
message: 'chore: update KEV tags and metadata 🤖'
message: 'chore: update KEV and vKEV tags and metadata 🤖'
- name: Push changes
if: steps.changes.outputs.changes == 'true'