mirror of
https://github.com/projectdiscovery/nuclei-templates.git
synced 2026-01-31 15:53:33 +08:00
366 lines
14 KiB
Python
Executable File
366 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Enhanced KEV and vKEV Tagging Updater for Nuclei Templates
|
|
|
|
This script updates KEV and vKEV (VulnCheck Known Exploited Vulnerabilities) tags for all CVE templates by:
|
|
1. Fetching the latest CISA KEV catalog
|
|
2. Fetching VulnCheck's KEV data (if API key available)
|
|
3. Adding 'kev' tag for CISA KEV CVEs
|
|
4. Adding 'vkev' tag for VulnCheck KEV CVEs (including those not in CISA)
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import json
|
|
import requests
|
|
from pathlib import Path
|
|
from typing import Set, Optional, Dict, Tuple
|
|
|
|
# Configuration
|
|
CISA_KEV_JSON_URL = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
|
|
VULNCHECK_KEV_API_URL = "https://api.vulncheck.com/v3/index/vulncheck-kev"
|
|
TIMEOUT = 30
|
|
MAX_RETRIES = 3
|
|
VULNCHECK_PAGE_SIZE = 100
|
|
|
|
class EnhancedKEVUpdater:
|
|
def __init__(self, root_dir: str):
|
|
self.root_dir = Path(root_dir)
|
|
self.updated_count = 0
|
|
self.error_count = 0
|
|
self.cisa_kev_cves = set()
|
|
self.vulncheck_kev_cves = set()
|
|
self.vulncheck_api_key = os.getenv('VULNCHECK_API_KEY')
|
|
|
|
def find_cve_templates(self) -> list[Path]:
|
|
"""Find all CVE template files."""
|
|
cve_files = []
|
|
|
|
# Search for CVE templates in common directories
|
|
patterns = [
|
|
"**/cves/**/*.yaml",
|
|
"**/cve-*.yaml"
|
|
]
|
|
|
|
for pattern in patterns:
|
|
cve_files.extend(self.root_dir.glob(pattern))
|
|
|
|
return sorted(list(set(cve_files)))
|
|
|
|
def extract_cve_id(self, file_path: Path) -> Optional[str]:
|
|
"""Extract CVE ID from filename or template content."""
|
|
# First try filename
|
|
filename = file_path.stem
|
|
cve_match = re.search(r'CVE-\d{4}-\d+', filename, re.IGNORECASE)
|
|
if cve_match:
|
|
return cve_match.group().upper()
|
|
|
|
# If not found in filename, try content
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
cve_match = re.search(r'cve-id:\s*(CVE-\d{4}-\d+)', content, re.IGNORECASE)
|
|
if cve_match:
|
|
return cve_match.group(1).upper()
|
|
except Exception as e:
|
|
print(f"Error reading {file_path}: {e}")
|
|
|
|
return None
|
|
|
|
def fetch_cisa_kev_data(self) -> Set[str]:
|
|
"""Fetch CISA KEV catalog and return set of CVE IDs."""
|
|
kev_cves = set()
|
|
|
|
for attempt in range(MAX_RETRIES):
|
|
try:
|
|
print("Fetching CISA KEV catalog...")
|
|
response = requests.get(CISA_KEV_JSON_URL, timeout=TIMEOUT)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
if 'vulnerabilities' in data:
|
|
for vuln in data['vulnerabilities']:
|
|
if 'cveID' in vuln:
|
|
kev_cves.add(vuln['cveID'].upper())
|
|
|
|
print(f"Retrieved {len(kev_cves)} CVEs from CISA KEV catalog")
|
|
break
|
|
else:
|
|
print("Invalid CISA KEV data format")
|
|
|
|
except requests.RequestException as e:
|
|
print(f"CISA KEV fetch failed (attempt {attempt + 1}/{MAX_RETRIES}): {e}")
|
|
if attempt == MAX_RETRIES - 1:
|
|
print("Failed to fetch CISA KEV data")
|
|
else:
|
|
time.sleep(2 * (attempt + 1))
|
|
|
|
except Exception as e:
|
|
print(f"Unexpected error fetching CISA KEV: {e}")
|
|
break
|
|
|
|
return kev_cves
|
|
|
|
def fetch_vulncheck_kev_data(self) -> Set[str]:
|
|
"""Fetch VulnCheck KEV catalog and return set of CVE IDs."""
|
|
if not self.vulncheck_api_key:
|
|
print("VulnCheck API key not available, skipping vKEV data fetch")
|
|
return set()
|
|
|
|
vkev_cves = set()
|
|
page = 1
|
|
total_pages = None
|
|
pages_fetched = 0
|
|
|
|
print("Fetching VulnCheck KEV catalog...")
|
|
|
|
while True:
|
|
for attempt in range(MAX_RETRIES):
|
|
try:
|
|
headers = {
|
|
'Authorization': f'Bearer {self.vulncheck_api_key}',
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
params = {
|
|
'limit': VULNCHECK_PAGE_SIZE,
|
|
'page': page
|
|
}
|
|
|
|
response = requests.get(VULNCHECK_KEV_API_URL, headers=headers, params=params, timeout=TIMEOUT)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
if 'data' in data and data['data']:
|
|
for entry in data['data']:
|
|
if 'cve' in entry and entry['cve']:
|
|
for cve_id in entry['cve']:
|
|
if cve_id and cve_id.startswith('CVE-'):
|
|
vkev_cves.add(cve_id.upper())
|
|
|
|
# Check pagination info
|
|
meta = data.get('_meta', {})
|
|
total_pages = meta.get('total_pages', 0)
|
|
current_page = meta.get('page', page)
|
|
|
|
pages_fetched += 1
|
|
print(f"Fetched page {current_page}/{total_pages} ({len(data['data'])} entries, {len(vkev_cves)} unique CVEs so far)")
|
|
|
|
# Check if we've reached the last page
|
|
if current_page >= total_pages:
|
|
print(f"Completed fetching all {pages_fetched} pages")
|
|
return vkev_cves
|
|
|
|
# Rate limiting: small delay between requests to avoid hitting API limits
|
|
if pages_fetched % 10 == 0: # Every 10 pages, longer delay
|
|
print(f"Pausing for rate limiting after {pages_fetched} pages...")
|
|
time.sleep(2)
|
|
else:
|
|
time.sleep(0.5) # Small delay between requests
|
|
|
|
page += 1
|
|
break # Break retry loop, continue to next page
|
|
else:
|
|
print("No more VulnCheck KEV data")
|
|
return vkev_cves
|
|
|
|
except requests.RequestException as e:
|
|
print(f"VulnCheck KEV fetch failed (attempt {attempt + 1}/{MAX_RETRIES}, page {page}): {e}")
|
|
if attempt == MAX_RETRIES - 1:
|
|
print(f"Failed to fetch VulnCheck KEV data for page {page}")
|
|
return vkev_cves # Return what we have so far
|
|
else:
|
|
# Exponential backoff with rate limiting consideration
|
|
delay = min(10, 2 * (attempt + 1))
|
|
print(f"Retrying after {delay} seconds...")
|
|
time.sleep(delay)
|
|
|
|
except Exception as e:
|
|
print(f"Unexpected error fetching VulnCheck KEV: {e}")
|
|
return vkev_cves # Return what we have so far
|
|
|
|
print(f"Retrieved {len(vkev_cves)} CVEs from VulnCheck KEV catalog")
|
|
return vkev_cves
|
|
|
|
def has_tag(self, content: str, tag: str) -> bool:
|
|
"""Check if template already has specific tag in tags field."""
|
|
# Look for tags field and check if tag is present
|
|
tags_match = re.search(r'tags:\s*([^\n]+)', content)
|
|
if tags_match:
|
|
tags_str = tags_match.group(1)
|
|
# Check for tag as a standalone tag (not part of another word)
|
|
if re.search(rf'\b{tag}\b', tags_str, re.IGNORECASE):
|
|
return True
|
|
return False
|
|
|
|
def add_tag(self, content: str, tag: str) -> tuple[str, bool]:
|
|
"""Add tag to the tags field. Returns (updated_content, was_updated)."""
|
|
if self.has_tag(content, tag):
|
|
return content, False
|
|
|
|
# Find the tags field
|
|
tags_match = re.search(r'(\s*tags:\s*)([^\n]+)', content)
|
|
if tags_match:
|
|
indent = tags_match.group(1)
|
|
existing_tags = tags_match.group(2).rstrip()
|
|
|
|
# Add tag at the end
|
|
new_tags = existing_tags + f',{tag}'
|
|
new_line = f"{indent}{new_tags}"
|
|
|
|
updated_content = content.replace(tags_match.group(0), new_line)
|
|
return updated_content, True
|
|
else:
|
|
print(f"Warning: No tags field found to add {tag} tag")
|
|
return content, False
|
|
|
|
def remove_tag(self, content: str, tag: str) -> tuple[str, bool]:
|
|
"""Remove tag from the tags field. Returns (updated_content, was_updated)."""
|
|
if not self.has_tag(content, tag):
|
|
return content, False
|
|
|
|
# Find and update tags field
|
|
tags_match = re.search(r'(\s*tags:\s*)([^\n]+)', content)
|
|
if tags_match:
|
|
indent = tags_match.group(1)
|
|
existing_tags = tags_match.group(2)
|
|
|
|
# Remove tag in various positions
|
|
new_tags = existing_tags
|
|
# Remove ,tag or tag, patterns
|
|
new_tags = re.sub(rf',\s*{tag}\b', '', new_tags)
|
|
new_tags = re.sub(rf'\b{tag}\s*,', '', new_tags)
|
|
new_tags = re.sub(rf'^\s*{tag}\s*$', '', new_tags) # tag is only tag
|
|
new_tags = re.sub(rf'^\s*{tag}\s*,\s*', '', new_tags) # tag is first tag
|
|
|
|
if new_tags != existing_tags:
|
|
new_line = f"{indent}{new_tags}"
|
|
updated_content = content.replace(tags_match.group(0), new_line)
|
|
return updated_content, True
|
|
|
|
return content, False
|
|
|
|
def update_template_tags(self, file_path: Path, cve_id: str, is_cisa_kev: bool, is_vulncheck_kev: bool) -> bool:
|
|
"""Update a template file with KEV and vKEV tags."""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
original_content = content
|
|
updated = False
|
|
|
|
# Handle CISA KEV tag
|
|
if is_cisa_kev:
|
|
# Add kev tag if missing
|
|
content, tag_added = self.add_tag(content, 'kev')
|
|
if tag_added:
|
|
print(f"Added kev tag to {file_path.name}")
|
|
updated = True
|
|
else:
|
|
# Remove kev tag if present but CVE is not in CISA KEV catalog
|
|
if self.has_tag(content, 'kev'):
|
|
# Only remove if we're confident about our KEV data
|
|
if len(self.cisa_kev_cves) > 1000: # Sanity check
|
|
content, tag_removed = self.remove_tag(content, 'kev')
|
|
if tag_removed:
|
|
print(f"Removed kev tag from {file_path.name} (no longer in CISA KEV catalog)")
|
|
updated = True
|
|
else:
|
|
print(f"Warning: {cve_id} has kev tag but not in current CISA KEV catalog (keeping tag)")
|
|
|
|
# Handle VulnCheck KEV tag
|
|
if is_vulncheck_kev:
|
|
# Add vkev tag if missing
|
|
content, tag_added = self.add_tag(content, 'vkev')
|
|
if tag_added:
|
|
print(f"Added vkev tag to {file_path.name}")
|
|
updated = True
|
|
else:
|
|
# Remove vkev tag if present but CVE is not in VulnCheck KEV catalog
|
|
if self.has_tag(content, 'vkev') and len(self.vulncheck_kev_cves) > 100: # Sanity check
|
|
content, tag_removed = self.remove_tag(content, 'vkev')
|
|
if tag_removed:
|
|
print(f"Removed vkev tag from {file_path.name} (no longer in VulnCheck KEV catalog)")
|
|
updated = True
|
|
|
|
# Write updated content if changes were made
|
|
if updated:
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
return True
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"Error updating {file_path}: {e}")
|
|
self.error_count += 1
|
|
return False
|
|
|
|
def run(self):
|
|
"""Main execution function."""
|
|
print("Starting enhanced KEV/vKEV tagging update...")
|
|
|
|
# Fetch KEV data
|
|
self.cisa_kev_cves = self.fetch_cisa_kev_data()
|
|
self.vulncheck_kev_cves = self.fetch_vulncheck_kev_data()
|
|
|
|
if not self.cisa_kev_cves and not self.vulncheck_kev_cves:
|
|
print("No KEV data available, exiting")
|
|
return
|
|
|
|
# Find all CVE templates
|
|
template_files = self.find_cve_templates()
|
|
print(f"Found {len(template_files)} CVE template files")
|
|
|
|
if not template_files:
|
|
print("No CVE templates found!")
|
|
return
|
|
|
|
# Process templates
|
|
cisa_kev_found = 0
|
|
vulncheck_kev_found = 0
|
|
both_kev_found = 0
|
|
|
|
for file_path in template_files:
|
|
cve_id = self.extract_cve_id(file_path)
|
|
if cve_id:
|
|
is_cisa_kev = cve_id in self.cisa_kev_cves
|
|
is_vulncheck_kev = cve_id in self.vulncheck_kev_cves
|
|
|
|
if is_cisa_kev:
|
|
cisa_kev_found += 1
|
|
if is_vulncheck_kev:
|
|
vulncheck_kev_found += 1
|
|
if is_cisa_kev and is_vulncheck_kev:
|
|
both_kev_found += 1
|
|
|
|
if self.update_template_tags(file_path, cve_id, is_cisa_kev, is_vulncheck_kev):
|
|
self.updated_count += 1
|
|
else:
|
|
print(f"Could not extract CVE ID from {file_path}")
|
|
|
|
print(f"\nEnhanced KEV/vKEV update complete!")
|
|
print(f"Templates updated: {self.updated_count}")
|
|
print(f"CISA KEV CVEs found in templates: {cisa_kev_found}")
|
|
print(f"VulnCheck KEV CVEs found in templates: {vulncheck_kev_found}")
|
|
print(f"CVEs in both CISA and VulnCheck KEV: {both_kev_found}")
|
|
print(f"CISA KEV catalog size: {len(self.cisa_kev_cves)}")
|
|
print(f"VulnCheck KEV catalog size: {len(self.vulncheck_kev_cves)}")
|
|
print(f"Errors: {self.error_count}")
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
if len(sys.argv) > 1:
|
|
root_dir = sys.argv[1]
|
|
else:
|
|
root_dir = os.getcwd()
|
|
|
|
updater = EnhancedKEVUpdater(root_dir)
|
|
updater.run()
|
|
|
|
if __name__ == "__main__":
|
|
main() |