Replaced prologue implementation with mummy for listener management, since it seems more suitable for future use (websockets, etc.).
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
import prologue, terminal, strformat, parsetoml, tables
|
||||
import mummy, terminal, strformat, parsetoml, tables
|
||||
import strutils, base64
|
||||
|
||||
import ./handlers
|
||||
@@ -6,15 +6,32 @@ import ../globals
|
||||
import ../core/logger
|
||||
import ../../common/[types, utils, serialize, profile]
|
||||
|
||||
proc error404*(ctx: Context) {.async.} =
|
||||
resp "", Http404
|
||||
# Not Found
|
||||
proc error404*(request: Request) =
|
||||
request.respond(404, body = "")
|
||||
|
||||
# Method not allowed
|
||||
proc error405*(request: Request) =
|
||||
request.respond(404, body = "")
|
||||
|
||||
# Utils
|
||||
proc hasKey(headers: seq[(string, string)], headerName: string): bool =
|
||||
for (name, value) in headers:
|
||||
if name.toLower() == headerName.toLower():
|
||||
return true
|
||||
return false
|
||||
|
||||
proc get(headers: seq[(string, string)], headerName: string): string =
|
||||
for (name, value) in headers:
|
||||
if name.toLower() == headerName.toLower():
|
||||
return value
|
||||
return ""
|
||||
|
||||
#[
|
||||
GET
|
||||
Called from agent to check for new tasks
|
||||
]#
|
||||
proc httpGet*(ctx: Context) {.async.} =
|
||||
|
||||
proc httpGet*(request: Request) =
|
||||
{.cast(gcsafe).}:
|
||||
|
||||
# Check heartbeat metadata placement
|
||||
@@ -24,17 +41,16 @@ proc httpGet*(ctx: Context) {.async.} =
|
||||
case cq.profile.getString("http-get.agent.heartbeat.placement.type"):
|
||||
of "header":
|
||||
let heartbeatHeader = cq.profile.getString("http-get.agent.heartbeat.placement.name")
|
||||
if not ctx.request.hasHeader(heartbeatHeader):
|
||||
resp "", Http404
|
||||
if not request.headers.hasKey(heartbeatHeader):
|
||||
request.respond(404, body = "")
|
||||
return
|
||||
|
||||
heartbeatString = ctx.request.getHeader(heartbeatHeader)[0]
|
||||
heartbeatString = request.headers.get(heartbeatHeader)
|
||||
|
||||
of "parameter":
|
||||
let param = cq.profile.getString("http-get.agent.heartbeat.placement.name")
|
||||
heartbeatString = ctx.getQueryParams(param)
|
||||
heartbeatString = request.queryParams.get(param)
|
||||
if heartbeatString.len <= 0:
|
||||
resp "", Http404
|
||||
request.respond(404, body = "")
|
||||
return
|
||||
|
||||
of "uri":
|
||||
@@ -60,7 +76,7 @@ proc httpGet*(ctx: Context) {.async.} =
|
||||
let tasks: seq[seq[byte]] = getTasks(heartbeat)
|
||||
|
||||
if tasks.len <= 0:
|
||||
resp "", Http200
|
||||
request.respond(200, body = "")
|
||||
return
|
||||
|
||||
# Create response, containing number of tasks, as well as length and content of each task
|
||||
@@ -73,7 +89,6 @@ proc httpGet*(ctx: Context) {.async.} =
|
||||
|
||||
# Apply data transformation to the response
|
||||
var response: string
|
||||
|
||||
case cq.profile.getString("http-get.server.output.encoding.type", default = "none"):
|
||||
of "none":
|
||||
response = Bytes.toString(responseBytes)
|
||||
@@ -85,52 +100,52 @@ proc httpGet*(ctx: Context) {.async.} =
|
||||
let suffix = cq.profile.getString("http-get.server.output.suffix")
|
||||
|
||||
# Add headers, as defined in the team server profile
|
||||
var headers: HttpHeaders
|
||||
for header, value in cq.profile.getTable("http-get.server.headers"):
|
||||
ctx.response.setHeader(header, value.getStringValue())
|
||||
headers.add((header, value.getStringValue()))
|
||||
|
||||
await ctx.respond(Http200, prefix & response & suffix, ctx.response.headers)
|
||||
ctx.handled = true # Ensure that HTTP response is sent only once
|
||||
request.respond(200, headers = headers, body = prefix & response & suffix)
|
||||
|
||||
# Notify operator that agent collected tasks
|
||||
cq.info(fmt"{$response.len} bytes sent.")
|
||||
|
||||
except CatchableError:
|
||||
resp "", Http404
|
||||
request.respond(404, body = "")
|
||||
|
||||
#[
|
||||
POST
|
||||
Called from agent to register itself or post results of a task
|
||||
]#
|
||||
proc httpPost*(ctx: Context) {.async.} =
|
||||
|
||||
proc httpPost*(request: Request) =
|
||||
{.cast(gcsafe).}:
|
||||
|
||||
# Check headers
|
||||
# If POST data is not binary data, return 404 error code
|
||||
if ctx.request.contentType != "application/octet-stream":
|
||||
resp "", Http404
|
||||
if request.headers.get("Content-Type") != "application/octet-stream":
|
||||
request.respond(404, body = "")
|
||||
return
|
||||
|
||||
try:
|
||||
# Differentiate between registration and task result packet
|
||||
var unpacker = Unpacker.init(ctx.request.body)
|
||||
var unpacker = Unpacker.init(request.body)
|
||||
let header = unpacker.deserializeHeader()
|
||||
|
||||
# Add response headers, as defined in team server profile
|
||||
var headers: HttpHeaders
|
||||
for header, value in cq.profile.getTable("http-post.server.headers"):
|
||||
ctx.response.setHeader(header, value.getStringValue())
|
||||
headers.add((header, value.getStringValue()))
|
||||
|
||||
if cast[PacketType](header.packetType) == MSG_REGISTER:
|
||||
if not register(string.toBytes(ctx.request.body)):
|
||||
resp "", Http400
|
||||
if not register(string.toBytes(request.body)):
|
||||
request.respond(400, body = "")
|
||||
return
|
||||
|
||||
elif cast[PacketType](header.packetType) == MSG_RESULT:
|
||||
handleResult(string.toBytes(ctx.request.body))
|
||||
handleResult(string.toBytes(request.body))
|
||||
|
||||
resp "", Http200
|
||||
request.respond(200, body = "")
|
||||
|
||||
except CatchableError:
|
||||
resp "", Http404
|
||||
request.respond(404, body = "")
|
||||
|
||||
return
|
||||
Reference in New Issue
Block a user