feat(api): standardize API endpoints and update data generation logic

- Rename IP address endpoints from `/ip-addresses/` to `/host-ports` for consistency
- Update vulnerability endpoints from `/assets/vulnerabilities/` to `/vulnerabilities/`
- Remove trailing slashes from API endpoint paths for standardization
- Remove explicit `type` field from target generation in seed data
- Enhance website generation with deduplication logic and attempt limiting
- Add default admin user seed data to database initialization migration
- Improve data generator to prevent infinite loops and handle unique URL combinations
- Align frontend service calls with updated backend API structure
This commit is contained in:
yyhuni
2026-01-15 13:02:26 +08:00
parent 794846ca7a
commit 6f02d9f3c5
5 changed files with 717 additions and 115 deletions

View File

@@ -9,12 +9,12 @@ export interface BulkDeleteResponse {
export class IPAddressService {
/**
* Bulk delete IP addresses
* POST /api/ip-addresses/bulk-delete/
* POST /api/host-ports/bulk-delete
* Note: IP addresses are aggregated, so we pass IP strings instead of IDs
*/
static async bulkDelete(ips: string[]): Promise<BulkDeleteResponse> {
const response = await api.post<BulkDeleteResponse>(
`/ip-addresses/bulk-delete/`,
`/host-ports/bulk-delete`,
{ ips }
)
return response.data
@@ -24,7 +24,7 @@ export class IPAddressService {
targetId: number,
params?: GetIPAddressesParams
): Promise<GetIPAddressesResponse> {
const response = await api.get<GetIPAddressesResponse>(`/targets/${targetId}/ip-addresses/`, {
const response = await api.get<GetIPAddressesResponse>(`/targets/${targetId}/host-ports`, {
params: {
page: params?.page || 1,
pageSize: params?.pageSize || 10,
@@ -38,7 +38,7 @@ export class IPAddressService {
scanId: number,
params?: GetIPAddressesParams
): Promise<GetIPAddressesResponse> {
const response = await api.get<GetIPAddressesResponse>(`/scans/${scanId}/ip-addresses/`, {
const response = await api.get<GetIPAddressesResponse>(`/scans/${scanId}/host-ports`, {
params: {
page: params?.page || 1,
pageSize: params?.pageSize || 10,
@@ -54,7 +54,7 @@ export class IPAddressService {
if (ips && ips.length > 0) {
params.ips = ips.join(',')
}
const response = await api.get<Blob>(`/targets/${targetId}/ip-addresses/export/`, {
const response = await api.get<Blob>(`/targets/${targetId}/host-ports/export`, {
params,
responseType: 'blob',
})
@@ -63,7 +63,7 @@ export class IPAddressService {
/** Export all IP addresses by scan task (CSV format) */
static async exportIPAddressesByScanId(scanId: number): Promise<Blob> {
const response = await api.get<Blob>(`/scans/${scanId}/ip-addresses/export/`, {
const response = await api.get<Blob>(`/scans/${scanId}/host-ports/export`, {
responseType: 'blob',
})
return response.data

View File

@@ -12,7 +12,7 @@ export class VulnerabilityService {
await mockDelay()
return getMockVulnerabilities(params)
}
const response = await api.get(`/assets/vulnerabilities/`, {
const response = await api.get(`/vulnerabilities/`, {
params: { ...params, filter },
})
return response.data
@@ -20,7 +20,7 @@ export class VulnerabilityService {
/** Get single vulnerability by ID */
static async getVulnerabilityById(id: number): Promise<Vulnerability> {
const response = await api.get<Vulnerability>(`/assets/vulnerabilities/${id}/`)
const response = await api.get<Vulnerability>(`/vulnerabilities/${id}/`)
return response.data
}
@@ -74,7 +74,7 @@ export class VulnerabilityService {
/** Get global vulnerability stats */
static async getStats(): Promise<{ total: number; pendingCount: number; reviewedCount: number }> {
const response = await api.get(`/assets/vulnerabilities/stats/`)
const response = await api.get(`/vulnerabilities/stats/`)
return response.data
}

View File

@@ -591,3 +591,13 @@ CREATE INDEX IF NOT EXISTS idx_scan_engine_ids_gin ON scan USING GIN (engine_ids
-- GIN index for scan.container_ids array
CREATE INDEX IF NOT EXISTS idx_scan_container_ids_gin ON scan USING GIN (container_ids);
-- ============================================
-- Seed data
-- ============================================
-- Default admin user (password: admin)
-- Password hash generated with bcrypt
INSERT INTO auth_user (username, password, is_superuser, is_staff, is_active, date_joined)
VALUES ('admin', '$2b$12$.4wL49eZfJuwVjP85Qxa7.xFb7HE3TDer4wcF9Z7c.oTOo7fExlgq', TRUE, TRUE, TRUE, CURRENT_TIMESTAMP)
ON CONFLICT (username) DO NOTHING;

View File

@@ -97,7 +97,7 @@ class DataGenerator:
if name not in used_names:
used_names.add(name)
targets.append({"name": name, "type": "domain"})
targets.append({"name": name})
break
# Generate IPs
@@ -108,7 +108,7 @@ class DataGenerator:
if name not in used_names:
used_names.add(name)
targets.append({"name": name, "type": "ip"})
targets.append({"name": name})
break
# Generate CIDRs
@@ -120,7 +120,7 @@ class DataGenerator:
if name not in used_names:
used_names.add(name)
targets.append({"name": name, "type": "cidr"})
targets.append({"name": name})
break
return targets
@@ -165,22 +165,39 @@ class DataGenerator:
def generate_websites(target: Dict[str, Any], count: int) -> List[Dict[str, Any]]:
"""
Generate website data for a target.
Args:
target: Target data (must have 'name' and 'type')
count: Number of websites to generate
Returns:
List of website data dictionaries with camelCase fields
"""
websites = []
for i in range(count):
protocol = DataGenerator.PROTOCOLS[i % len(DataGenerator.PROTOCOLS)]
subdomain = DataGenerator.SUBDOMAINS[i % len(DataGenerator.SUBDOMAINS)]
port = DataGenerator.PORTS[i % len(DataGenerator.PORTS)]
path = DataGenerator.PATHS[i % len(DataGenerator.PATHS)]
used_urls = set()
# Calculate total possible combinations to avoid infinite loop
max_combinations = len(DataGenerator.PROTOCOLS) * len(DataGenerator.SUBDOMAINS) * len(DataGenerator.PORTS) * len(DataGenerator.PATHS)
actual_count = min(count, max_combinations)
i = 0
attempts = 0
max_attempts = actual_count * 10 # Prevent infinite loop
while len(websites) < actual_count and attempts < max_attempts:
attempts += 1
# Use different strategies to generate unique combinations
protocol_idx = i % len(DataGenerator.PROTOCOLS)
subdomain_idx = (i // len(DataGenerator.PROTOCOLS)) % len(DataGenerator.SUBDOMAINS)
port_idx = (i // (len(DataGenerator.PROTOCOLS) * len(DataGenerator.SUBDOMAINS))) % len(DataGenerator.PORTS)
path_idx = (i // (len(DataGenerator.PROTOCOLS) * len(DataGenerator.SUBDOMAINS) * len(DataGenerator.PORTS))) % len(DataGenerator.PATHS)
protocol = DataGenerator.PROTOCOLS[protocol_idx]
subdomain = DataGenerator.SUBDOMAINS[subdomain_idx]
port = DataGenerator.PORTS[port_idx]
path = DataGenerator.PATHS[path_idx]
# Generate URL based on target type
if target["type"] == "domain":
url = f"{protocol}{subdomain}.{target['name']}{port}{path}"
@@ -191,24 +208,34 @@ class DataGenerator:
base_ip = target["name"].split("/")[0]
url = f"{protocol}{base_ip}{port}{path}"
else:
i += 1
continue
status_code = DataGenerator.STATUS_CODES[i % len(DataGenerator.STATUS_CODES)]
content_length = 1000 + (i * 100)
tech = DataGenerator.TECH_STACKS[i % len(DataGenerator.TECH_STACKS)]
vhost = (i % 5 == 0) # 20% are vhost
# Skip if URL already exists
if url in used_urls:
i += 1
continue
used_urls.add(url)
status_code = DataGenerator.STATUS_CODES[len(websites) % len(DataGenerator.STATUS_CODES)]
content_length = 1000 + (len(websites) * 100)
tech = DataGenerator.TECH_STACKS[len(websites) % len(DataGenerator.TECH_STACKS)]
vhost = (len(websites) % 5 == 0) # 20% are vhost
websites.append({
"url": url,
"title": DataGenerator.TITLES[i % len(DataGenerator.TITLES)],
"title": DataGenerator.TITLES[len(websites) % len(DataGenerator.TITLES)],
"statusCode": status_code,
"contentLength": content_length,
"contentType": DataGenerator.CONTENT_TYPES[i % len(DataGenerator.CONTENT_TYPES)],
"webserver": DataGenerator.WEBSERVERS[i % len(DataGenerator.WEBSERVERS)],
"contentType": DataGenerator.CONTENT_TYPES[len(websites) % len(DataGenerator.CONTENT_TYPES)],
"webserver": DataGenerator.WEBSERVERS[len(websites) % len(DataGenerator.WEBSERVERS)],
"tech": tech,
"vhost": vhost,
})
i += 1
return websites
@@ -223,40 +250,34 @@ class DataGenerator:
def generate_subdomains(target: Dict[str, Any], count: int) -> List[str]:
"""
Generate subdomain data for a domain target.
Args:
target: Target data (must be type 'domain')
count: Number of subdomains to generate
Returns:
List of subdomain names (strings)
Empty list if target is not a domain
"""
if target["type"] != "domain":
return []
subdomains = []
target_name = target['name']
# Extract base domain (remove first subdomain if exists)
# e.g., www.example.com -> example.com
parts = target_name.split('.')
if len(parts) > 2:
# Has subdomain, use base domain
base_domain = '.'.join(parts[1:])
else:
# No subdomain, use as is
base_domain = target_name
# Generate subdomains that end with .target_name
# e.g., target=portal.example.com -> www.portal.example.com
for i in range(count):
prefix = DataGenerator.SUBDOMAIN_PREFIXES[i % len(DataGenerator.SUBDOMAIN_PREFIXES)]
name = f"{prefix}.{base_domain}"
name = f"{prefix}.{target_name}"
# Skip if same as target name
if name == target_name:
continue
subdomains.append(name)
return subdomains
return subdomains
@@ -294,21 +315,36 @@ class DataGenerator:
def generate_endpoints(target: Dict[str, Any], count: int) -> List[Dict[str, Any]]:
"""
Generate endpoint data for a target.
Args:
target: Target data (must have 'name' and 'type')
count: Number of endpoints to generate
Returns:
List of endpoint data dictionaries with camelCase fields
"""
endpoints = []
for i in range(count):
protocol = DataGenerator.PROTOCOLS[i % len(DataGenerator.PROTOCOLS)]
subdomain = DataGenerator.SUBDOMAINS[i % len(DataGenerator.SUBDOMAINS)]
path = DataGenerator.API_PATHS[i % len(DataGenerator.API_PATHS)]
used_urls = set()
# Calculate total possible combinations
max_combinations = len(DataGenerator.PROTOCOLS) * len(DataGenerator.SUBDOMAINS) * len(DataGenerator.API_PATHS)
actual_count = min(count, max_combinations)
i = 0
attempts = 0
max_attempts = actual_count * 10
while len(endpoints) < actual_count and attempts < max_attempts:
attempts += 1
protocol_idx = i % len(DataGenerator.PROTOCOLS)
subdomain_idx = (i // len(DataGenerator.PROTOCOLS)) % len(DataGenerator.SUBDOMAINS)
path_idx = (i // (len(DataGenerator.PROTOCOLS) * len(DataGenerator.SUBDOMAINS))) % len(DataGenerator.API_PATHS)
protocol = DataGenerator.PROTOCOLS[protocol_idx]
subdomain = DataGenerator.SUBDOMAINS[subdomain_idx]
path = DataGenerator.API_PATHS[path_idx]
# Generate URL based on target type
if target["type"] == "domain":
url = f"{protocol}{subdomain}.{target['name']}{path}"
@@ -318,26 +354,36 @@ class DataGenerator:
base_ip = target["name"].split("/")[0]
url = f"{protocol}{base_ip}{path}"
else:
i += 1
continue
status_code = DataGenerator.STATUS_CODES[i % len(DataGenerator.STATUS_CODES)]
content_length = 500 + (i * 50)
tech = DataGenerator.API_TECH_STACKS[i % len(DataGenerator.API_TECH_STACKS)]
matched_gf = DataGenerator.GF_PATTERNS[i % len(DataGenerator.GF_PATTERNS)]
vhost = (i % 10 == 0) # 10% are vhost
# Skip if URL already exists
if url in used_urls:
i += 1
continue
used_urls.add(url)
status_code = DataGenerator.STATUS_CODES[len(endpoints) % len(DataGenerator.STATUS_CODES)]
content_length = 500 + (len(endpoints) * 50)
tech = DataGenerator.API_TECH_STACKS[len(endpoints) % len(DataGenerator.API_TECH_STACKS)]
matched_gf = DataGenerator.GF_PATTERNS[len(endpoints) % len(DataGenerator.GF_PATTERNS)]
vhost = (len(endpoints) % 10 == 0) # 10% are vhost
endpoints.append({
"url": url,
"title": DataGenerator.ENDPOINT_TITLES[i % len(DataGenerator.ENDPOINT_TITLES)],
"title": DataGenerator.ENDPOINT_TITLES[len(endpoints) % len(DataGenerator.ENDPOINT_TITLES)],
"statusCode": status_code,
"contentLength": content_length,
"contentType": "application/json",
"webserver": DataGenerator.WEBSERVERS[i % len(DataGenerator.WEBSERVERS)],
"webserver": DataGenerator.WEBSERVERS[len(endpoints) % len(DataGenerator.WEBSERVERS)],
"tech": tech,
"matchedGfPatterns": matched_gf,
"vhost": vhost,
})
i += 1
return endpoints
@@ -355,21 +401,36 @@ class DataGenerator:
def generate_directories(target: Dict[str, Any], count: int) -> List[Dict[str, Any]]:
"""
Generate directory data for a target.
Args:
target: Target data (must have 'name' and 'type')
count: Number of directories to generate
Returns:
List of directory data dictionaries with camelCase fields
"""
directories = []
for i in range(count):
protocol = DataGenerator.PROTOCOLS[i % len(DataGenerator.PROTOCOLS)]
subdomain = DataGenerator.SUBDOMAINS[i % len(DataGenerator.SUBDOMAINS)]
dir_path = DataGenerator.DIRECTORIES[i % len(DataGenerator.DIRECTORIES)]
used_urls = set()
# Calculate total possible combinations
max_combinations = len(DataGenerator.PROTOCOLS) * len(DataGenerator.SUBDOMAINS) * len(DataGenerator.DIRECTORIES)
actual_count = min(count, max_combinations)
i = 0
attempts = 0
max_attempts = actual_count * 10
while len(directories) < actual_count and attempts < max_attempts:
attempts += 1
protocol_idx = i % len(DataGenerator.PROTOCOLS)
subdomain_idx = (i // len(DataGenerator.PROTOCOLS)) % len(DataGenerator.SUBDOMAINS)
dir_idx = (i // (len(DataGenerator.PROTOCOLS) * len(DataGenerator.SUBDOMAINS))) % len(DataGenerator.DIRECTORIES)
protocol = DataGenerator.PROTOCOLS[protocol_idx]
subdomain = DataGenerator.SUBDOMAINS[subdomain_idx]
dir_path = DataGenerator.DIRECTORIES[dir_idx]
# Generate URL based on target type
if target["type"] == "domain":
url = f"{protocol}{subdomain}.{target['name']}{dir_path}"
@@ -379,20 +440,30 @@ class DataGenerator:
base_ip = target["name"].split("/")[0]
url = f"{protocol}{base_ip}{dir_path}"
else:
i += 1
continue
status = DataGenerator.DIR_STATUS_CODES[i % len(DataGenerator.DIR_STATUS_CODES)]
content_length = 1000 + (i * 100)
duration = 50 + (i * 5)
# Skip if URL already exists
if url in used_urls:
i += 1
continue
used_urls.add(url)
status = DataGenerator.DIR_STATUS_CODES[len(directories) % len(DataGenerator.DIR_STATUS_CODES)]
content_length = 1000 + (len(directories) * 100)
duration = 50 + (len(directories) * 5)
directories.append({
"url": url,
"status": status,
"contentLength": content_length,
"contentType": DataGenerator.CONTENT_TYPES[i % len(DataGenerator.CONTENT_TYPES)],
"contentType": DataGenerator.CONTENT_TYPES[len(directories) % len(DataGenerator.CONTENT_TYPES)],
"duration": duration,
})
i += 1
return directories
@@ -489,19 +560,19 @@ class DataGenerator:
def generate_vulnerabilities(target: Dict[str, Any], count: int) -> List[Dict[str, Any]]:
"""
Generate vulnerability data for a target.
Args:
target: Target data (must have 'name' and 'type')
count: Number of vulnerabilities to generate
Returns:
List of vulnerability data dictionaries with camelCase fields
"""
vulnerabilities = []
for i in range(count):
path = DataGenerator.VULN_PATHS[i % len(DataGenerator.VULN_PATHS)]
# Generate URL based on target type
if target["type"] == "domain":
url = f"https://www.{target['name']}{path}"
@@ -512,9 +583,9 @@ class DataGenerator:
url = f"https://{base_ip}{path}"
else:
continue
cvss_score = DataGenerator.CVSS_SCORES[i % len(DataGenerator.CVSS_SCORES)]
vulnerabilities.append({
"url": url,
"vulnType": DataGenerator.VULN_TYPES[i % len(DataGenerator.VULN_TYPES)],
@@ -523,5 +594,237 @@ class DataGenerator:
"cvssScore": cvss_score,
"description": DataGenerator.DESCRIPTIONS[i % len(DataGenerator.DESCRIPTIONS)],
})
return vulnerabilities
# Screenshot data templates
SCREENSHOT_STATUS_CODES = [200, 200, 200, 200, 301, 302, 403, 404]
@staticmethod
def generate_screenshots(target: Dict[str, Any], count: int) -> List[Dict[str, Any]]:
"""
Generate screenshot data for a target.
Args:
target: Target data (must have 'name' and 'type')
count: Number of screenshots to generate
Returns:
List of screenshot data dictionaries with camelCase fields
"""
screenshots = []
used_urls = set()
max_combinations = (
len(DataGenerator.PROTOCOLS) *
len(DataGenerator.SUBDOMAINS) *
len(DataGenerator.PORTS)
)
actual_count = min(count, max_combinations)
i = 0
attempts = 0
max_attempts = actual_count * 10
while len(screenshots) < actual_count and attempts < max_attempts:
attempts += 1
protocol_idx = i % len(DataGenerator.PROTOCOLS)
subdomain_idx = (i // len(DataGenerator.PROTOCOLS)) % len(DataGenerator.SUBDOMAINS)
port_idx = (
i // (len(DataGenerator.PROTOCOLS) * len(DataGenerator.SUBDOMAINS))
) % len(DataGenerator.PORTS)
protocol = DataGenerator.PROTOCOLS[protocol_idx]
subdomain = DataGenerator.SUBDOMAINS[subdomain_idx]
port = DataGenerator.PORTS[port_idx]
# Generate URL based on target type
if target["type"] == "domain":
url = f"{protocol}{subdomain}.{target['name']}{port}"
elif target["type"] == "ip":
url = f"{protocol}{target['name']}{port}"
elif target["type"] == "cidr":
base_ip = target["name"].split("/")[0]
url = f"{protocol}{base_ip}{port}"
else:
i += 1
continue
if url in used_urls:
i += 1
continue
used_urls.add(url)
status_code = DataGenerator.SCREENSHOT_STATUS_CODES[
len(screenshots) % len(DataGenerator.SCREENSHOT_STATUS_CODES)
]
screenshots.append({
"url": url,
"statusCode": status_code,
# Note: image field is optional, omitted for seed data
})
i += 1
return screenshots
# Scan data templates
ENGINE_IDS = [1, 2, 3, 4, 5]
ENGINE_NAMES = [
"subdomain_discovery",
"port_scan",
"web_crawl",
"vulnerability_scan",
"screenshot_capture",
]
SCAN_CONFIGURATIONS = [
"subdomain_discovery:\\n passive_tools:\\n subfinder:\\n enabled: true",
"port_scan:\\n nmap:\\n enabled: true\\n ports: top-1000",
"web_crawl:\\n httpx:\\n enabled: true",
"vulnerability_scan:\\n nuclei:\\n enabled: true",
]
@staticmethod
def generate_scan(target: Dict[str, Any]) -> Dict[str, Any]:
"""
Generate scan data for a target.
Args:
target: Target data (must have 'id')
Returns:
Scan initiate request data
"""
engine_count = random.randint(1, 3)
engine_indices = random.sample(range(len(DataGenerator.ENGINE_IDS)), engine_count)
return {
"targetId": target["id"],
"engineIds": [DataGenerator.ENGINE_IDS[i] for i in engine_indices],
"engineNames": [DataGenerator.ENGINE_NAMES[i] for i in engine_indices],
"configuration": random.choice(DataGenerator.SCAN_CONFIGURATIONS),
}
@staticmethod
def generate_subdomain_snapshots(target: Dict[str, Any], count: int) -> List[Dict[str, Any]]:
"""
Generate subdomain snapshot data for a domain target.
Args:
target: Target data (must be type 'domain')
count: Number of subdomains to generate
Returns:
List of subdomain snapshot items
"""
if target["type"] != "domain":
return []
subdomains = []
target_name = target['name']
parts = target_name.split('.')
if len(parts) > 2:
base_domain = '.'.join(parts[1:])
else:
base_domain = target_name
for i in range(count):
prefix = DataGenerator.SUBDOMAIN_PREFIXES[i % len(DataGenerator.SUBDOMAIN_PREFIXES)]
name = f"{prefix}.{base_domain}"
if name == target_name:
continue
subdomains.append({"name": name})
return subdomains
@staticmethod
def generate_host_port_snapshots(target: Dict[str, Any], count: int) -> List[Dict[str, Any]]:
"""
Generate host port snapshot data for a target.
Args:
target: Target data
count: Number of host port mappings to generate
Returns:
List of host port snapshot items
"""
host_ports = []
base_ip1 = random.randint(1, 223)
base_ip2 = random.randint(0, 255)
base_ip3 = random.randint(0, 255)
for i in range(count):
ip = f"{base_ip1}.{base_ip2}.{base_ip3}.{(i % 254) + 1}"
if target["type"] == "domain":
target_name = target["name"]
parts = target_name.split('.')
if len(parts) > 2:
host = '.'.join(parts[1:])
else:
host = target_name
elif target["type"] == "ip":
host = target["name"]
elif target["type"] == "cidr":
host = target["name"].split("/")[0]
else:
continue
port = DataGenerator.COMMON_PORTS[i % len(DataGenerator.COMMON_PORTS)]
host_ports.append({
"host": host,
"ip": ip,
"port": port,
})
return host_ports
@staticmethod
def generate_vulnerability_snapshots(target: Dict[str, Any], count: int) -> List[Dict[str, Any]]:
"""
Generate vulnerability snapshot data for a target.
Args:
target: Target data
count: Number of vulnerabilities to generate
Returns:
List of vulnerability snapshot items with required severity field
"""
vulnerabilities = []
for i in range(count):
path = DataGenerator.VULN_PATHS[i % len(DataGenerator.VULN_PATHS)]
if target["type"] == "domain":
url = f"https://www.{target['name']}{path}"
elif target["type"] == "ip":
url = f"https://{target['name']}{path}"
elif target["type"] == "cidr":
base_ip = target["name"].split("/")[0]
url = f"https://{base_ip}{path}"
else:
continue
cvss_score = DataGenerator.CVSS_SCORES[i % len(DataGenerator.CVSS_SCORES)]
vulnerabilities.append({
"url": url,
"vulnType": DataGenerator.VULN_TYPES[i % len(DataGenerator.VULN_TYPES)],
"severity": DataGenerator.SEVERITIES[i % len(DataGenerator.SEVERITIES)],
"source": DataGenerator.SOURCES[i % len(DataGenerator.SOURCES)],
"cvssScore": cvss_score,
"description": DataGenerator.DESCRIPTIONS[i % len(DataGenerator.DESCRIPTIONS)],
})
return vulnerabilities

View File

@@ -156,7 +156,12 @@ def main():
# Create assets
create_assets(client, progress, error_handler, targets, args.assets_per_target, args.batch_size)
# Create scans and snapshots
scan_ids = create_scans(client, progress, error_handler, targets)
if scan_ids:
create_snapshots(client, progress, error_handler, targets, scan_ids, args.assets_per_target, args.batch_size)
# Print summary
progress.print_summary()
@@ -188,6 +193,8 @@ def clear_data(client, progress):
# Delete in correct order (child tables first)
delete_operations = [
("scans", "/api/scans/bulk-delete"),
("screenshots", "/api/screenshots/bulk-delete"),
("vulnerabilities", "/api/vulnerabilities/bulk-delete"),
("host ports", "/api/host-ports/bulk-delete"),
("directories", "/api/directories/bulk-delete"),
@@ -522,32 +529,314 @@ def create_assets(client, progress, error_handler, targets, assets_per_target, b
error_handler.log_error(str(e), {"targetId": target["id"], "mappings": batch})
progress.finish_phase()
# Create vulnerabilities (temporarily disabled - API not fully implemented)
# progress.start_phase("Creating vulnerabilities", len(targets) * assets_per_target, "🔓")
#
# for target in targets:
# vulnerabilities = DataGenerator.generate_vulnerabilities(target, assets_per_target)
#
# # Batch create
# for i in range(0, len(vulnerabilities), batch_size):
# batch = vulnerabilities[i:i + batch_size]
#
# try:
# error_handler.retry_with_backoff(
# client.post,
# f"/api/targets/{target['id']}/vulnerabilities/bulk-create",
# {"vulnerabilities": batch}
# )
#
# progress.add_success(len(batch))
# progress.update(progress.current_count + len(batch))
#
# except Exception as e:
# progress.add_error(str(e))
# error_handler.log_error(str(e), {"targetId": target["id"], "vulnerabilities": batch})
#
# progress.finish_phase()
# Create vulnerabilities
progress.start_phase("Creating vulnerabilities", len(targets) * assets_per_target, "🔓")
for target in targets:
vulnerabilities = DataGenerator.generate_vulnerabilities(target, assets_per_target)
# Batch create
for i in range(0, len(vulnerabilities), batch_size):
batch = vulnerabilities[i:i + batch_size]
try:
error_handler.retry_with_backoff(
client.post,
f"/api/targets/{target['id']}/vulnerabilities/bulk-create",
{"vulnerabilities": batch}
)
progress.add_success(len(batch))
progress.update(progress.current_count + len(batch))
except Exception as e:
progress.add_error(str(e))
error_handler.log_error(
str(e),
{"targetId": target["id"], "vulnerabilities": batch}
)
progress.finish_phase()
# Create screenshots
progress.start_phase("Creating screenshots", len(targets) * assets_per_target, "📸")
for target in targets:
screenshots = DataGenerator.generate_screenshots(target, assets_per_target)
# Batch upsert
for i in range(0, len(screenshots), batch_size):
batch = screenshots[i:i + batch_size]
try:
error_handler.retry_with_backoff(
client.post,
f"/api/targets/{target['id']}/screenshots/bulk-upsert",
{"screenshots": batch}
)
progress.add_success(len(batch))
progress.update(progress.current_count + len(batch))
except Exception as e:
progress.add_error(str(e))
error_handler.log_error(
str(e),
{"targetId": target["id"], "screenshots": batch}
)
progress.finish_phase()
def create_scans(client, progress, error_handler, targets):
"""
Create scans for targets.
Args:
client: API client
progress: Progress tracker
error_handler: Error handler
targets: List of target dictionaries
Returns:
Dict mapping target_id to scan_id
"""
from data_generator import DataGenerator
progress.start_phase("Creating scans", len(targets), "🔍")
scan_ids = {}
for target in targets:
try:
scan_data = DataGenerator.generate_scan(target)
result = error_handler.retry_with_backoff(
client.post,
"/api/scans/initiate",
scan_data
)
if result.get("scans") and len(result["scans"]) > 0:
scan_ids[target["id"]] = result["scans"][0]["id"]
progress.add_success(1)
progress.update(len(scan_ids))
except Exception as e:
progress.add_error(str(e))
error_handler.log_error(str(e), {"targetId": target["id"]})
progress.finish_phase()
return scan_ids
def create_snapshots(client, progress, error_handler, targets, scan_ids, assets_per_target, batch_size):
"""
Create snapshot data for scans.
Args:
client: API client
progress: Progress tracker
error_handler: Error handler
targets: List of target dictionaries
scan_ids: Dict mapping target_id to scan_id
assets_per_target: Number of assets per target
batch_size: Batch size for bulk operations
"""
from data_generator import DataGenerator
# Filter targets that have scans
targets_with_scans = [t for t in targets if t["id"] in scan_ids]
if not targets_with_scans:
return
# Create website snapshots
progress.start_phase("Creating website snapshots", len(targets_with_scans) * assets_per_target, "🌐")
for target in targets_with_scans:
scan_id = scan_ids[target["id"]]
websites = DataGenerator.generate_websites(target, assets_per_target)
for i in range(0, len(websites), batch_size):
batch = websites[i:i + batch_size]
try:
error_handler.retry_with_backoff(
client.post,
f"/api/scans/{scan_id}/websites/bulk-upsert",
{"targetId": target["id"], "websites": batch}
)
progress.add_success(len(batch))
progress.update(progress.current_count + len(batch))
except Exception as e:
progress.add_error(str(e))
error_handler.log_error(str(e), {"scanId": scan_id, "websites": batch})
progress.finish_phase()
# Create subdomain snapshots (only for domain targets)
domain_targets = [t for t in targets_with_scans if t["type"] == "domain"]
if domain_targets:
progress.start_phase("Creating subdomain snapshots", len(domain_targets) * assets_per_target, "📝")
for target in domain_targets:
scan_id = scan_ids[target["id"]]
subdomains = DataGenerator.generate_subdomain_snapshots(target, assets_per_target)
if not subdomains:
continue
try:
error_handler.retry_with_backoff(
client.post,
f"/api/scans/{scan_id}/subdomains/bulk-upsert",
{"targetId": target["id"], "subdomains": subdomains}
)
progress.add_success(len(subdomains))
progress.update(progress.current_count + len(subdomains))
except Exception as e:
progress.add_error(str(e))
error_handler.log_error(str(e), {"scanId": scan_id, "subdomains": subdomains})
progress.finish_phase()
# Create endpoint snapshots
progress.start_phase("Creating endpoint snapshots", len(targets_with_scans) * assets_per_target, "🔗")
for target in targets_with_scans:
scan_id = scan_ids[target["id"]]
endpoints = DataGenerator.generate_endpoints(target, assets_per_target)
for i in range(0, len(endpoints), batch_size):
batch = endpoints[i:i + batch_size]
try:
error_handler.retry_with_backoff(
client.post,
f"/api/scans/{scan_id}/endpoints/bulk-upsert",
{"targetId": target["id"], "endpoints": batch}
)
progress.add_success(len(batch))
progress.update(progress.current_count + len(batch))
except Exception as e:
progress.add_error(str(e))
error_handler.log_error(str(e), {"scanId": scan_id, "endpoints": batch})
progress.finish_phase()
# Create directory snapshots
progress.start_phase("Creating directory snapshots", len(targets_with_scans) * assets_per_target, "📁")
for target in targets_with_scans:
scan_id = scan_ids[target["id"]]
directories = DataGenerator.generate_directories(target, assets_per_target)
for i in range(0, len(directories), batch_size):
batch = directories[i:i + batch_size]
try:
error_handler.retry_with_backoff(
client.post,
f"/api/scans/{scan_id}/directories/bulk-upsert",
{"targetId": target["id"], "directories": batch}
)
progress.add_success(len(batch))
progress.update(progress.current_count + len(batch))
except Exception as e:
progress.add_error(str(e))
error_handler.log_error(str(e), {"scanId": scan_id, "directories": batch})
progress.finish_phase()
# Create host port snapshots
progress.start_phase("Creating host port snapshots", len(targets_with_scans) * assets_per_target, "🔌")
for target in targets_with_scans:
scan_id = scan_ids[target["id"]]
host_ports = DataGenerator.generate_host_port_snapshots(target, assets_per_target)
for i in range(0, len(host_ports), batch_size):
batch = host_ports[i:i + batch_size]
try:
error_handler.retry_with_backoff(
client.post,
f"/api/scans/{scan_id}/host-ports/bulk-upsert",
{"targetId": target["id"], "hostPorts": batch}
)
progress.add_success(len(batch))
progress.update(progress.current_count + len(batch))
except Exception as e:
progress.add_error(str(e))
error_handler.log_error(str(e), {"scanId": scan_id, "hostPorts": batch})
progress.finish_phase()
# Create screenshot snapshots
progress.start_phase("Creating screenshot snapshots", len(targets_with_scans) * assets_per_target, "📸")
for target in targets_with_scans:
scan_id = scan_ids[target["id"]]
screenshots = DataGenerator.generate_screenshots(target, assets_per_target)
for i in range(0, len(screenshots), batch_size):
batch = screenshots[i:i + batch_size]
try:
error_handler.retry_with_backoff(
client.post,
f"/api/scans/{scan_id}/screenshots/bulk-upsert",
{"targetId": target["id"], "screenshots": batch}
)
progress.add_success(len(batch))
progress.update(progress.current_count + len(batch))
except Exception as e:
progress.add_error(str(e))
error_handler.log_error(str(e), {"scanId": scan_id, "screenshots": batch})
progress.finish_phase()
# Create vulnerability snapshots
progress.start_phase("Creating vulnerability snapshots", len(targets_with_scans) * assets_per_target, "🔓")
for target in targets_with_scans:
scan_id = scan_ids[target["id"]]
vulnerabilities = DataGenerator.generate_vulnerability_snapshots(target, assets_per_target)
for i in range(0, len(vulnerabilities), batch_size):
batch = vulnerabilities[i:i + batch_size]
try:
error_handler.retry_with_backoff(
client.post,
f"/api/scans/{scan_id}/vulnerabilities/bulk-create",
{"vulnerabilities": batch}
)
progress.add_success(len(batch))
progress.update(progress.current_count + len(batch))
except Exception as e:
progress.add_error(str(e))
error_handler.log_error(str(e), {"scanId": scan_id, "vulnerabilities": batch})
progress.finish_phase()
if __name__ == "__main__":