feat(search): implement advanced query parser with expression syntax support

- Add SearchQueryParser class to parse complex search expressions with operators (=, ==, !=)
- Support logical operators && (AND) and || (OR) for combining multiple conditions
- Implement field mapping for frontend to database field translation
- Add support for array field searching (tech stack) with unnest and ANY operators
- Support fuzzy matching (=), exact matching (==), and negation (!=) operators
- Add proper SQL injection prevention through parameterized queries
- Refactor search service to use expression-based filtering instead of simple filters
- Update search views to integrate new query parser
- Enhance frontend search hook and service to support new expression syntax
- Update search types to reflect new query structure
- Improve search page UI to display expression syntax examples and help text
- Enable complex multi-condition searches like: host="api" && tech="nginx" || status=="200"
This commit is contained in:
yyhuni
2026-01-02 17:46:31 +08:00
parent 23bc463283
commit 18cc016268
6 changed files with 363 additions and 224 deletions

View File

@@ -3,85 +3,271 @@
提供资产搜索的核心业务逻辑:
- 从物化视图查询数据
- 支持多条件组合过滤
- 支持模糊匹配
- 支持表达式语法解析
- 支持 =(模糊)、==(精确)、!=(不等于)操作符
- 支持 && (AND) 和 || (OR) 逻辑组合
"""
import logging
from typing import Optional, List, Dict, Any
import re
from typing import Optional, List, Dict, Any, Tuple
from django.db import connection
logger = logging.getLogger(__name__)
# 支持的字段映射(前端字段名 -> 数据库字段名)
FIELD_MAPPING = {
'host': 'host',
'url': 'url',
'title': 'title',
'tech': 'tech',
'status': 'status_code',
'body': 'response_body',
'header': 'response_headers',
}
# 数组类型字段
ARRAY_FIELDS = {'tech'}
class SearchQueryParser:
"""
搜索查询解析器
支持语法:
- field="value" 模糊匹配ILIKE %value%
- field=="value" 精确匹配
- field!="value" 不等于
- && AND 连接
- || OR 连接
- () 分组(暂不支持嵌套)
示例:
- host="api" && tech="nginx"
- tech="vue" || tech="react"
- status=="200" && host!="test"
"""
# 匹配单个条件: field="value" 或 field=="value" 或 field!="value"
CONDITION_PATTERN = re.compile(r'(\w+)\s*(==|!=|=)\s*"([^"]*)"')
@classmethod
def parse(cls, query: str) -> Tuple[str, List[Any]]:
"""
解析查询字符串,返回 SQL WHERE 子句和参数
Args:
query: 搜索查询字符串
Returns:
(where_clause, params) 元组
"""
if not query or not query.strip():
return "1=1", []
query = query.strip()
# 按 || 分割为 OR 组
or_groups = cls._split_by_or(query)
if len(or_groups) == 1:
# 没有 OR直接解析 AND 条件
return cls._parse_and_group(or_groups[0])
# 多个 OR 组
or_clauses = []
all_params = []
for group in or_groups:
clause, params = cls._parse_and_group(group)
if clause and clause != "1=1":
or_clauses.append(f"({clause})")
all_params.extend(params)
if not or_clauses:
return "1=1", []
return " OR ".join(or_clauses), all_params
@classmethod
def _split_by_or(cls, query: str) -> List[str]:
"""按 || 分割查询,但忽略引号内的 ||"""
parts = []
current = ""
in_quotes = False
i = 0
while i < len(query):
char = query[i]
if char == '"':
in_quotes = not in_quotes
current += char
elif not in_quotes and i + 1 < len(query) and query[i:i+2] == '||':
if current.strip():
parts.append(current.strip())
current = ""
i += 1 # 跳过第二个 |
else:
current += char
i += 1
if current.strip():
parts.append(current.strip())
return parts if parts else [query]
@classmethod
def _parse_and_group(cls, group: str) -> Tuple[str, List[Any]]:
"""解析 AND 组(用 && 连接的条件)"""
# 移除外层括号
group = group.strip()
if group.startswith('(') and group.endswith(')'):
group = group[1:-1].strip()
# 按 && 分割
parts = cls._split_by_and(group)
and_clauses = []
all_params = []
for part in parts:
clause, params = cls._parse_condition(part.strip())
if clause:
and_clauses.append(clause)
all_params.extend(params)
if not and_clauses:
return "1=1", []
return " AND ".join(and_clauses), all_params
@classmethod
def _split_by_and(cls, query: str) -> List[str]:
"""按 && 分割查询,但忽略引号内的 &&"""
parts = []
current = ""
in_quotes = False
i = 0
while i < len(query):
char = query[i]
if char == '"':
in_quotes = not in_quotes
current += char
elif not in_quotes and i + 1 < len(query) and query[i:i+2] == '&&':
if current.strip():
parts.append(current.strip())
current = ""
i += 1 # 跳过第二个 &
else:
current += char
i += 1
if current.strip():
parts.append(current.strip())
return parts if parts else [query]
@classmethod
def _parse_condition(cls, condition: str) -> Tuple[Optional[str], List[Any]]:
"""
解析单个条件
Returns:
(sql_clause, params) 或 (None, []) 如果解析失败
"""
# 移除括号
condition = condition.strip()
if condition.startswith('(') and condition.endswith(')'):
condition = condition[1:-1].strip()
match = cls.CONDITION_PATTERN.match(condition)
if not match:
logger.warning(f"无法解析条件: {condition}")
return None, []
field, operator, value = match.groups()
field = field.lower()
# 验证字段
if field not in FIELD_MAPPING:
logger.warning(f"未知字段: {field}")
return None, []
db_field = FIELD_MAPPING[field]
is_array = field in ARRAY_FIELDS
# 根据操作符生成 SQL
if operator == '=':
# 模糊匹配
return cls._build_like_condition(db_field, value, is_array)
elif operator == '==':
# 精确匹配
return cls._build_exact_condition(db_field, value, is_array)
elif operator == '!=':
# 不等于
return cls._build_not_equal_condition(db_field, value, is_array)
return None, []
@classmethod
def _build_like_condition(cls, field: str, value: str, is_array: bool) -> Tuple[str, List[Any]]:
"""构建模糊匹配条件"""
if is_array:
# 数组字段:检查数组中是否有元素包含该值
return f"EXISTS (SELECT 1 FROM unnest({field}) AS t WHERE t ILIKE %s)", [f"%{value}%"]
else:
return f"{field} ILIKE %s", [f"%{value}%"]
@classmethod
def _build_exact_condition(cls, field: str, value: str, is_array: bool) -> Tuple[str, List[Any]]:
"""构建精确匹配条件"""
if is_array:
# 数组字段:检查数组中是否包含该精确值
return f"%s = ANY({field})", [value]
elif field == 'status_code':
# 状态码是整数
try:
return f"{field} = %s", [int(value)]
except ValueError:
return f"{field}::text = %s", [value]
else:
return f"{field} = %s", [value]
@classmethod
def _build_not_equal_condition(cls, field: str, value: str, is_array: bool) -> Tuple[str, List[Any]]:
"""构建不等于条件"""
if is_array:
# 数组字段:检查数组中不包含该值
return f"NOT (%s = ANY({field}))", [value]
elif field == 'status_code':
try:
return f"({field} IS NULL OR {field} != %s)", [int(value)]
except ValueError:
return f"({field} IS NULL OR {field}::text != %s)", [value]
else:
return f"({field} IS NULL OR {field} != %s)", [value]
class AssetSearchService:
"""资产搜索服务"""
def search(
self,
host: Optional[str] = None,
title: Optional[str] = None,
tech: Optional[str] = None,
status: Optional[str] = None,
body: Optional[str] = None,
header: Optional[str] = None,
url: Optional[str] = None,
) -> List[Dict[str, Any]]:
def search(self, query: str) -> List[Dict[str, Any]]:
"""
搜索资产
Args:
host: 主机名模糊匹配
title: 标题模糊匹配
tech: 技术栈匹配
status: 状态码匹配(支持逗号分隔多值)
body: 响应体模糊匹配
header: 响应头模糊匹配
url: URL 模糊匹配
query: 搜索查询字符串
Returns:
List[Dict]: 搜索结果列表
"""
# 构建查询条件
conditions = []
params = []
if host:
conditions.append("host ILIKE %s")
params.append(f"%{host}%")
if title:
conditions.append("title ILIKE %s")
params.append(f"%{title}%")
if tech:
# 技术栈数组模糊匹配(数组中任意元素包含搜索词)
conditions.append("EXISTS (SELECT 1 FROM unnest(tech) AS t WHERE t ILIKE %s)")
params.append(f"%{tech}%")
if status:
# 支持多状态码,逗号分隔
status_codes = [s.strip() for s in status.split(',') if s.strip().isdigit()]
if status_codes:
placeholders = ','.join(['%s'] * len(status_codes))
conditions.append(f"status_code IN ({placeholders})")
params.extend([int(s) for s in status_codes])
if body:
conditions.append("response_body ILIKE %s")
params.append(f"%{body}%")
if header:
conditions.append("response_headers ILIKE %s")
params.append(f"%{header}%")
if url:
conditions.append("url ILIKE %s")
params.append(f"%{url}%")
# 构建 SQL
where_clause = " AND ".join(conditions) if conditions else "1=1"
where_clause, params = SearchQueryParser.parse(query)
sql = f"""
SELECT
@@ -115,65 +301,20 @@ class AssetSearchService:
return results
except Exception as e:
logger.error(f"搜索查询失败: {e}")
logger.error(f"搜索查询失败: {e}, SQL: {sql}, params: {params}")
raise
def count(
self,
host: Optional[str] = None,
title: Optional[str] = None,
tech: Optional[str] = None,
status: Optional[str] = None,
body: Optional[str] = None,
header: Optional[str] = None,
url: Optional[str] = None,
) -> int:
def count(self, query: str) -> int:
"""
统计搜索结果数量
Args:
与 search 方法相同
query: 搜索查询字符串
Returns:
int: 结果总数
"""
# 构建查询条件
conditions = []
params = []
if host:
conditions.append("host ILIKE %s")
params.append(f"%{host}%")
if title:
conditions.append("title ILIKE %s")
params.append(f"%{title}%")
if tech:
# 技术栈数组模糊匹配
conditions.append("EXISTS (SELECT 1 FROM unnest(tech) AS t WHERE t ILIKE %s)")
params.append(f"%{tech}%")
if status:
status_codes = [s.strip() for s in status.split(',') if s.strip().isdigit()]
if status_codes:
placeholders = ','.join(['%s'] * len(status_codes))
conditions.append(f"status_code IN ({placeholders})")
params.extend([int(s) for s in status_codes])
if body:
conditions.append("response_body ILIKE %s")
params.append(f"%{body}%")
if header:
conditions.append("response_headers ILIKE %s")
params.append(f"%{header}%")
if url:
conditions.append("url ILIKE %s")
params.append(f"%{url}%")
where_clause = " AND ".join(conditions) if conditions else "1=1"
where_clause, params = SearchQueryParser.parse(query)
sql = f"SELECT COUNT(*) FROM asset_search_view WHERE {where_clause}"

View File

@@ -3,6 +3,22 @@
提供资产搜索的 REST API 接口:
- GET /api/assets/search/ - 搜索资产
搜索语法:
- field="value" 模糊匹配ILIKE %value%
- field=="value" 精确匹配
- field!="value" 不等于
- && AND 连接
- || OR 连接
支持的字段:
- host: 主机名
- url: URL
- title: 标题
- tech: 技术栈
- status: 状态码
- body: 响应体
- header: 响应头
"""
import logging
@@ -25,16 +41,15 @@ class AssetSearchView(APIView):
GET /api/assets/search/
Query Parameters:
host: 主机名模糊匹配
title: 标题模糊匹配
tech: 技术栈匹配
status: 状态码匹配(支持逗号分隔多值,如 "200,301"
body: 响应体模糊匹配
header: 响应头模糊匹配
url: URL 模糊匹配
q: 搜索查询表达式
page: 页码(从 1 开始,默认 1
pageSize: 每页数量(默认 10最大 100
示例查询:
?q=host="api" && tech="nginx"
?q=tech="vue" || tech="react"
?q=status=="200" && host!="test"
Response:
{
"results": [...],
@@ -51,20 +66,13 @@ class AssetSearchView(APIView):
def get(self, request: Request):
"""搜索资产"""
# 获取搜索参数
host = request.query_params.get('host', '').strip() or None
title = request.query_params.get('title', '').strip() or None
tech = request.query_params.get('tech', '').strip() or None
status_param = request.query_params.get('status', '').strip() or None
body = request.query_params.get('body', '').strip() or None
header = request.query_params.get('header', '').strip() or None
url = request.query_params.get('url', '').strip() or None
# 获取搜索查询
query = request.query_params.get('q', '').strip()
# 检查是否有搜索条件
if not any([host, title, tech, status_param, body, header, url]):
if not query:
return error_response(
code=ErrorCodes.VALIDATION_ERROR,
message='At least one search parameter is required',
message='Search query (q) is required',
status_code=status.HTTP_400_BAD_REQUEST
)
@@ -82,30 +90,14 @@ class AssetSearchView(APIView):
try:
# 获取总数
total = self.service.count(
host=host,
title=title,
tech=tech,
status=status_param,
body=body,
header=header,
url=url,
)
total = self.service.count(query)
# 计算分页
total_pages = (total + page_size - 1) // page_size if total > 0 else 1
offset = (page - 1) * page_size
# 获取搜索结果
all_results = self.service.search(
host=host,
title=title,
tech=tech,
status=status_param,
body=body,
header=header,
url=url,
)
all_results = self.service.search(query)
# 手动分页
results = all_results[offset:offset + page_size]
@@ -117,10 +109,8 @@ class AssetSearchView(APIView):
response_headers = {}
if result.get('response_headers'):
try:
# 尝试解析为 JSON
response_headers = json.loads(result['response_headers'])
except (json.JSONDecodeError, TypeError):
# 如果不是 JSON尝试解析为 HTTP 头格式
headers_str = result['response_headers']
for line in headers_str.split('\n'):
if ':' in line:
@@ -158,6 +148,6 @@ class AssetSearchView(APIView):
logger.exception("搜索失败")
return error_response(
code=ErrorCodes.SERVER_ERROR,
message='Search failed',
message=f'Search failed: {str(e)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)

View File

@@ -11,59 +11,53 @@ import { useAssetSearch } from "@/hooks/use-search"
import type { SearchParams, SearchState } from "@/types/search.types"
import { Alert, AlertDescription } from "@/components/ui/alert"
// 搜索示例
// 搜索示例 - 展示各种查询语法
const SEARCH_FILTER_EXAMPLES = [
'host="example.com"',
'title="后台管理"',
// 模糊匹配 (=)
'host="api"',
'title="Dashboard"',
'tech="nginx"',
'status="200"',
'body="password"',
'header="Server: nginx"',
// 精确匹配 (==)
'status=="200"',
'host=="admin.example.com"',
// 不等于 (!=)
'status!="404"',
'host!="test"',
// AND 组合 (&&)
'host="api" && status=="200"',
'tech="nginx" && title="Dashboard"',
'host="admin" && tech="php" && status=="200"',
// OR 组合 (||)
'tech="vue" || tech="react"',
'status=="200" || status=="301"',
'host="admin" || host="manage"',
// 混合查询
'host="api" && (tech="nginx" || tech="apache")',
'(status=="200" || status=="301") && tech="vue"',
'host="example" && status!="404" && tech="nginx"',
]
// 解析搜索查询字符串为参数对象
function parseSearchQuery(query: string): SearchParams {
const params: SearchParams = {}
// 匹配 key="value" 或 key=value 格式
const regex = /(\w+)\s*=\s*"?([^"&]+)"?/g
let match
while ((match = regex.exec(query)) !== null) {
const [, key, value] = match
const trimmedValue = value.trim()
switch (key.toLowerCase()) {
case 'host':
params.host = trimmedValue
break
case 'url':
params.url = trimmedValue
break
case 'title':
params.title = trimmedValue
break
case 'tech':
params.tech = trimmedValue
break
case 'status':
params.status = trimmedValue
break
case 'body':
params.body = trimmedValue
break
case 'header':
params.header = trimmedValue
break
}
// 验证搜索查询语法
function validateSearchQuery(query: string): { valid: boolean; error?: string } {
if (!query.trim()) {
return { valid: false, error: 'Query cannot be empty' }
}
// 如果没有匹配到任何字段,尝试作为 host 搜索
if (Object.keys(params).length === 0 && query.trim()) {
params.host = query.trim()
// 检查是否有未闭合的引号
const quoteCount = (query.match(/"/g) || []).length
if (quoteCount % 2 !== 0) {
return { valid: false, error: 'Unclosed quote detected' }
}
return params
// 检查基本语法field="value" 或 field=="value" 或 field!="value"
const conditionPattern = /(\w+)\s*(==|!=|=)\s*"([^"]*)"/g
const conditions = query.match(conditionPattern)
if (!conditions || conditions.length === 0) {
return { valid: false, error: 'Invalid syntax. Use: field="value", field=="value", or field!="value"' }
}
return { valid: true }
}
export function SearchPage() {
@@ -94,10 +88,17 @@ export function SearchPage() {
const handleSearch = useCallback((_filters: unknown, rawQuery: string) => {
if (!rawQuery.trim()) return
// 验证语法
const validation = validateSearchQuery(rawQuery)
if (!validation.valid) {
// 可以显示错误提示,这里简单处理
console.warn('Search validation:', validation.error)
}
setQuery(rawQuery)
const params = parseSearchQuery(rawQuery)
setSearchParams(params)
setPage(1) // 重置页码
// 直接将原始查询发送给后端解析
setSearchParams({ q: rawQuery })
setPage(1)
setSearchState("searching")
}, [])
@@ -112,7 +113,7 @@ export function SearchPage() {
const handlePageSizeChange = useCallback((newPageSize: number) => {
setPageSize(newPageSize)
setPage(1) // 重置页码
setPage(1)
}, [])
return (
@@ -136,7 +137,7 @@ export function SearchPage() {
<SmartFilterInput
fields={SEARCH_FILTER_FIELDS}
examples={SEARCH_FILTER_EXAMPLES}
placeholder='host="example.com" title="后台"'
placeholder='host="api" && tech="nginx" && status=="200"'
value={query}
onSearch={handleSearch}
className="w-full [&_input]:h-12 [&_input]:text-base [&_button]:h-12 [&_button]:w-12 [&_button]:p-0"
@@ -184,7 +185,7 @@ export function SearchPage() {
<SmartFilterInput
fields={SEARCH_FILTER_FIELDS}
examples={SEARCH_FILTER_EXAMPLES}
placeholder='host="example.com" title="后台"'
placeholder='host="api" && tech="nginx" && status=="200"'
value={query}
onSearch={handleSearch}
className="flex-1"

View File

@@ -13,16 +13,8 @@ export function useAssetSearch(
params: SearchParams,
options?: { enabled?: boolean }
) {
// 检查是否有有效的搜索条件
const hasSearchParams = !!(
params.host ||
params.title ||
params.tech ||
params.status ||
params.body ||
params.header ||
params.url
)
// 检查是否有有效的搜索查询
const hasSearchParams = !!(params.q && params.q.trim())
return useQuery<SearchResponse>({
queryKey: ['asset-search', params],

View File

@@ -3,6 +3,18 @@ import type { SearchParams, SearchResponse } from "@/types/search.types"
/**
* 资产搜索 API 服务
*
* 搜索语法:
* - field="value" 模糊匹配ILIKE %value%
* - field=="value" 精确匹配
* - field!="value" 不等于
* - && AND 连接
* - || OR 连接
*
* 示例:
* - host="api" && tech="nginx"
* - tech="vue" || tech="react"
* - status=="200" && host!="test"
*/
export class SearchService {
/**
@@ -10,16 +22,9 @@ export class SearchService {
* GET /api/assets/search/
*/
static async search(params: SearchParams): Promise<SearchResponse> {
// 构建查询参数,过滤空值
const queryParams = new URLSearchParams()
if (params.host) queryParams.append('host', params.host)
if (params.title) queryParams.append('title', params.title)
if (params.tech) queryParams.append('tech', params.tech)
if (params.status) queryParams.append('status', params.status)
if (params.body) queryParams.append('body', params.body)
if (params.header) queryParams.append('header', params.header)
if (params.url) queryParams.append('url', params.url)
if (params.q) queryParams.append('q', params.q)
if (params.page) queryParams.append('page', params.page.toString())
if (params.pageSize) queryParams.append('pageSize', params.pageSize.toString())

View File

@@ -30,15 +30,25 @@ export interface SearchResponse {
totalPages: number
}
// 搜索参数类型
// 搜索操作符类型
export type SearchOperator = '=' | '==' | '!='
// 单个搜索条件
export interface SearchCondition {
field: string
operator: SearchOperator
value: string
}
// 搜索表达式(支持 AND/OR 组合)
export interface SearchExpression {
conditions: SearchCondition[] // 同一组内的条件用 AND 连接
orGroups?: SearchExpression[] // 多组之间用 OR 连接
}
// 发送给后端的搜索参数
export interface SearchParams {
host?: string
title?: string
tech?: string
status?: string
body?: string
header?: string
url?: string
q?: string // 完整的搜索表达式字符串
page?: number
pageSize?: number
}