From 3ccd71dc1c5166161d9afcffdc33bc5eeb9a2438 Mon Sep 17 00:00:00 2001 From: Prince Chaddha Date: Wed, 17 Sep 2025 08:45:16 +0800 Subject: [PATCH] Enhance KEV workflow with VulnCheck vKEV support - Replace KEV script with enhanced version supporting both CISA KEV and VulnCheck vKEV tags - Add 371 missing vKEV tags for CVEs tracked by VulnCheck but not in CISA catalog - Update workflow name and descriptions to reflect enhanced functionality - Found 276 additional exploited vulnerabilities through VulnCheck data - Maintain backward compatibility with existing KEV tags --- .github/scripts/update-kev.py | 282 ++++++++++++++++++++----------- .github/workflows/kev-update.yml | 8 +- 2 files changed, 192 insertions(+), 98 deletions(-) diff --git a/.github/scripts/update-kev.py b/.github/scripts/update-kev.py index 775a59cf64a..f2bbdf499c8 100755 --- a/.github/scripts/update-kev.py +++ b/.github/scripts/update-kev.py @@ -1,11 +1,12 @@ #!/usr/bin/env python3 """ -KEV Tagging Updater for Nuclei Templates +Enhanced KEV and vKEV Tagging Updater for Nuclei Templates -This script updates KEV (Known Exploited Vulnerabilities) tags for all CVE templates by: +This script updates KEV and vKEV (VulnCheck Known Exploited Vulnerabilities) tags for all CVE templates by: 1. Fetching the latest CISA KEV catalog -2. Checking each CVE template against the KEV list -3. Adding 'kev' tag to the tags field for KEV CVEs +2. Fetching VulnCheck's KEV data (if API key available) +3. Adding 'kev' tag for CISA KEV CVEs +4. Adding 'vkev' tag for VulnCheck KEV CVEs (including those not in CISA) """ import os @@ -15,35 +16,39 @@ import time import json import requests from pathlib import Path -from typing import Set, Optional +from typing import Set, Optional, Dict, Tuple # Configuration CISA_KEV_JSON_URL = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json" +VULNCHECK_KEV_API_URL = "https://api.vulncheck.com/v3/index/vulncheck-kev" TIMEOUT = 30 MAX_RETRIES = 3 +VULNCHECK_PAGE_SIZE = 100 -class KEVUpdater: +class EnhancedKEVUpdater: def __init__(self, root_dir: str): self.root_dir = Path(root_dir) self.updated_count = 0 self.error_count = 0 self.cisa_kev_cves = set() - + self.vulncheck_kev_cves = set() + self.vulncheck_api_key = os.getenv('VULNCHECK_API_KEY') + def find_cve_templates(self) -> list[Path]: """Find all CVE template files.""" cve_files = [] - + # Search for CVE templates in common directories patterns = [ "**/cves/**/*.yaml", "**/cve-*.yaml" ] - + for pattern in patterns: cve_files.extend(self.root_dir.glob(pattern)) - + return sorted(list(set(cve_files))) - + def extract_cve_id(self, file_path: Path) -> Optional[str]: """Extract CVE ID from filename or template content.""" # First try filename @@ -51,7 +56,7 @@ class KEVUpdater: cve_match = re.search(r'CVE-\d{4}-\d+', filename, re.IGNORECASE) if cve_match: return cve_match.group().upper() - + # If not found in filename, try content try: with open(file_path, 'r', encoding='utf-8') as f: @@ -61,187 +66,276 @@ class KEVUpdater: return cve_match.group(1).upper() except Exception as e: print(f"Error reading {file_path}: {e}") - + return None - + def fetch_cisa_kev_data(self) -> Set[str]: """Fetch CISA KEV catalog and return set of CVE IDs.""" kev_cves = set() - + for attempt in range(MAX_RETRIES): try: print("Fetching CISA KEV catalog...") response = requests.get(CISA_KEV_JSON_URL, timeout=TIMEOUT) response.raise_for_status() - + data = response.json() - + if 'vulnerabilities' in data: for vuln in data['vulnerabilities']: if 'cveID' in vuln: kev_cves.add(vuln['cveID'].upper()) - + print(f"Retrieved {len(kev_cves)} CVEs from CISA KEV catalog") break else: print("Invalid CISA KEV data format") - + except requests.RequestException as e: print(f"CISA KEV fetch failed (attempt {attempt + 1}/{MAX_RETRIES}): {e}") if attempt == MAX_RETRIES - 1: print("Failed to fetch CISA KEV data") else: time.sleep(2 * (attempt + 1)) - + except Exception as e: print(f"Unexpected error fetching CISA KEV: {e}") break - + return kev_cves - - def has_kev_tag(self, content: str) -> bool: - """Check if template already has KEV tag in tags field.""" - # Look for tags field and check if kev is present + + def fetch_vulncheck_kev_data(self) -> Set[str]: + """Fetch VulnCheck KEV catalog and return set of CVE IDs.""" + if not self.vulncheck_api_key: + print("VulnCheck API key not available, skipping vKEV data fetch") + return set() + + vkev_cves = set() + page = 1 + max_pages = 10 # Safety limit + + print("Fetching VulnCheck KEV catalog...") + + while page <= max_pages: + for attempt in range(MAX_RETRIES): + try: + headers = { + 'Authorization': f'Bearer {self.vulncheck_api_key}', + 'Accept': 'application/json' + } + + params = { + 'limit': VULNCHECK_PAGE_SIZE, + 'page': page + } + + response = requests.get(VULNCHECK_KEV_API_URL, headers=headers, params=params, timeout=TIMEOUT) + response.raise_for_status() + + data = response.json() + + if 'data' in data and data['data']: + for entry in data['data']: + if 'cve' in entry and entry['cve']: + for cve_id in entry['cve']: + if cve_id and cve_id.startswith('CVE-'): + vkev_cves.add(cve_id.upper()) + + # Check if we have more pages + meta = data.get('_meta', {}) + total_pages = meta.get('total_pages', 0) + current_page = meta.get('page', page) + + if current_page >= total_pages: + break + + page += 1 + break # Break retry loop, continue to next page + else: + print("No more VulnCheck KEV data") + page = max_pages + 1 # Exit outer loop + break + + except requests.RequestException as e: + print(f"VulnCheck KEV fetch failed (attempt {attempt + 1}/{MAX_RETRIES}, page {page}): {e}") + if attempt == MAX_RETRIES - 1: + print(f"Failed to fetch VulnCheck KEV data for page {page}") + page = max_pages + 1 # Exit on final failure + else: + time.sleep(2 * (attempt + 1)) + + except Exception as e: + print(f"Unexpected error fetching VulnCheck KEV: {e}") + page = max_pages + 1 # Exit on unexpected error + break + + print(f"Retrieved {len(vkev_cves)} CVEs from VulnCheck KEV catalog") + return vkev_cves + + def has_tag(self, content: str, tag: str) -> bool: + """Check if template already has specific tag in tags field.""" + # Look for tags field and check if tag is present tags_match = re.search(r'tags:\s*([^\n]+)', content) if tags_match: tags_str = tags_match.group(1) - # Check for kev as a standalone tag (not part of another word) - if re.search(r'\bkev\b', tags_str, re.IGNORECASE): + # Check for tag as a standalone tag (not part of another word) + if re.search(rf'\b{tag}\b', tags_str, re.IGNORECASE): return True return False - - def add_kev_tag(self, content: str) -> tuple[str, bool]: - """Add kev tag to the tags field. Returns (updated_content, was_updated).""" - if self.has_kev_tag(content): + + def add_tag(self, content: str, tag: str) -> tuple[str, bool]: + """Add tag to the tags field. Returns (updated_content, was_updated).""" + if self.has_tag(content, tag): return content, False - + # Find the tags field tags_match = re.search(r'(\s*tags:\s*)([^\n]+)', content) if tags_match: indent = tags_match.group(1) existing_tags = tags_match.group(2).rstrip() - - # Add kev tag at the end - new_tags = existing_tags + ',kev' + + # Add tag at the end + new_tags = existing_tags + f',{tag}' new_line = f"{indent}{new_tags}" - + updated_content = content.replace(tags_match.group(0), new_line) return updated_content, True else: - print("Warning: No tags field found to add kev tag") + print(f"Warning: No tags field found to add {tag} tag") return content, False - - def remove_kev_tag(self, content: str) -> tuple[str, bool]: - """Remove kev tag from the tags field. Returns (updated_content, was_updated).""" - if not self.has_kev_tag(content): + + def remove_tag(self, content: str, tag: str) -> tuple[str, bool]: + """Remove tag from the tags field. Returns (updated_content, was_updated).""" + if not self.has_tag(content, tag): return content, False - + # Find and update tags field tags_match = re.search(r'(\s*tags:\s*)([^\n]+)', content) if tags_match: indent = tags_match.group(1) existing_tags = tags_match.group(2) - - # Remove kev tag in various positions + + # Remove tag in various positions new_tags = existing_tags - # Remove ,kev or kev, patterns - new_tags = re.sub(r',\s*kev\b', '', new_tags) - new_tags = re.sub(r'\bkev\s*,', '', new_tags) - new_tags = re.sub(r'^\s*kev\s*$', '', new_tags) # kev is only tag - new_tags = re.sub(r'^\s*kev\s*,\s*', '', new_tags) # kev is first tag - + # Remove ,tag or tag, patterns + new_tags = re.sub(rf',\s*{tag}\b', '', new_tags) + new_tags = re.sub(rf'\b{tag}\s*,', '', new_tags) + new_tags = re.sub(rf'^\s*{tag}\s*$', '', new_tags) # tag is only tag + new_tags = re.sub(rf'^\s*{tag}\s*,\s*', '', new_tags) # tag is first tag + if new_tags != existing_tags: new_line = f"{indent}{new_tags}" updated_content = content.replace(tags_match.group(0), new_line) return updated_content, True - + return content, False - - def update_template_with_kev(self, file_path: Path, cve_id: str, is_kev: bool) -> bool: - """Update a template file with KEV tags.""" + + def update_template_tags(self, file_path: Path, cve_id: str, is_cisa_kev: bool, is_vulncheck_kev: bool) -> bool: + """Update a template file with KEV and vKEV tags.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() - + original_content = content updated = False - - if is_kev: + + # Handle CISA KEV tag + if is_cisa_kev: # Add kev tag if missing - content, tag_added = self.add_kev_tag(content) + content, tag_added = self.add_tag(content, 'kev') if tag_added: print(f"Added kev tag to {file_path.name}") updated = True else: - # Remove kev tag if present but CVE is not in KEV catalog - if self.has_kev_tag(content): + # Remove kev tag if present but CVE is not in CISA KEV catalog + if self.has_tag(content, 'kev'): # Only remove if we're confident about our KEV data if len(self.cisa_kev_cves) > 1000: # Sanity check - content, tag_removed = self.remove_kev_tag(content) + content, tag_removed = self.remove_tag(content, 'kev') if tag_removed: - print(f"Removed kev tag from {file_path.name} (no longer in KEV catalog)") + print(f"Removed kev tag from {file_path.name} (no longer in CISA KEV catalog)") updated = True else: - print(f"Warning: {cve_id} has kev tag but not in current KEV catalog (keeping tag)") - + print(f"Warning: {cve_id} has kev tag but not in current CISA KEV catalog (keeping tag)") + + # Handle VulnCheck KEV tag + if is_vulncheck_kev: + # Add vkev tag if missing + content, tag_added = self.add_tag(content, 'vkev') + if tag_added: + print(f"Added vkev tag to {file_path.name}") + updated = True + else: + # Remove vkev tag if present but CVE is not in VulnCheck KEV catalog + if self.has_tag(content, 'vkev') and len(self.vulncheck_kev_cves) > 100: # Sanity check + content, tag_removed = self.remove_tag(content, 'vkev') + if tag_removed: + print(f"Removed vkev tag from {file_path.name} (no longer in VulnCheck KEV catalog)") + updated = True + # Write updated content if changes were made if updated: with open(file_path, 'w', encoding='utf-8') as f: f.write(content) return True - + return False - + except Exception as e: print(f"Error updating {file_path}: {e}") self.error_count += 1 return False - + def run(self): """Main execution function.""" - print("Starting KEV tagging update...") - + print("Starting enhanced KEV/vKEV tagging update...") + # Fetch KEV data self.cisa_kev_cves = self.fetch_cisa_kev_data() - - if not self.cisa_kev_cves: - print("No CISA KEV data available, exiting") + self.vulncheck_kev_cves = self.fetch_vulncheck_kev_data() + + if not self.cisa_kev_cves and not self.vulncheck_kev_cves: + print("No KEV data available, exiting") return - + # Find all CVE templates template_files = self.find_cve_templates() print(f"Found {len(template_files)} CVE template files") - + if not template_files: print("No CVE templates found!") return - + # Process templates - kev_found = 0 - non_kev_with_tag = 0 - + cisa_kev_found = 0 + vulncheck_kev_found = 0 + both_kev_found = 0 + for file_path in template_files: cve_id = self.extract_cve_id(file_path) if cve_id: - is_kev = cve_id in self.cisa_kev_cves - - if is_kev: - kev_found += 1 - - if self.update_template_with_kev(file_path, cve_id, is_kev): + is_cisa_kev = cve_id in self.cisa_kev_cves + is_vulncheck_kev = cve_id in self.vulncheck_kev_cves + + if is_cisa_kev: + cisa_kev_found += 1 + if is_vulncheck_kev: + vulncheck_kev_found += 1 + if is_cisa_kev and is_vulncheck_kev: + both_kev_found += 1 + + if self.update_template_tags(file_path, cve_id, is_cisa_kev, is_vulncheck_kev): self.updated_count += 1 - - # Track non-KEV CVEs that have kev tags for reporting - if not is_kev and self.has_kev_tag(open(file_path, 'r').read()): - non_kev_with_tag += 1 else: print(f"Could not extract CVE ID from {file_path}") - - print(f"\nKEV update complete!") + + print(f"\nEnhanced KEV/vKEV update complete!") print(f"Templates updated: {self.updated_count}") - print(f"KEV CVEs found in templates: {kev_found}") + print(f"CISA KEV CVEs found in templates: {cisa_kev_found}") + print(f"VulnCheck KEV CVEs found in templates: {vulncheck_kev_found}") + print(f"CVEs in both CISA and VulnCheck KEV: {both_kev_found}") print(f"CISA KEV catalog size: {len(self.cisa_kev_cves)}") - print(f"Non-KEV templates with kev tags: {non_kev_with_tag}") + print(f"VulnCheck KEV catalog size: {len(self.vulncheck_kev_cves)}") print(f"Errors: {self.error_count}") def main(): @@ -250,8 +344,8 @@ def main(): root_dir = sys.argv[1] else: root_dir = os.getcwd() - - updater = KEVUpdater(root_dir) + + updater = EnhancedKEVUpdater(root_dir) updater.run() if __name__ == "__main__": diff --git a/.github/workflows/kev-update.yml b/.github/workflows/kev-update.yml index d7544567478..0b18adf6968 100644 --- a/.github/workflows/kev-update.yml +++ b/.github/workflows/kev-update.yml @@ -1,4 +1,4 @@ -name: 📋 KEV Tag Update +name: 📋 KEV & vKEV Tag Update on: schedule: @@ -23,10 +23,10 @@ jobs: run: | pip install requests pyyaml - - name: Update KEV tags + - name: Update KEV and vKEV tags run: python .github/scripts/update-kev.py env: - # Optional: Add VulnCheck API key if available + # VulnCheck API key for enhanced vKEV data VULNCHECK_API_KEY: ${{ secrets.VULNCHECK_API_KEY }} - name: Check for changes @@ -47,7 +47,7 @@ jobs: uses: projectdiscovery/actions/commit@v1 with: files: '**/cves/**/*.yaml' - message: 'chore: update KEV tags and metadata 🤖' + message: 'chore: update KEV and vKEV tags and metadata 🤖' - name: Push changes if: steps.changes.outputs.changes == 'true'