Agent fetches serialized task data from prologue web server and successfully parses it.
This commit is contained in:
@@ -3,28 +3,18 @@ import ./types
|
||||
|
||||
type
|
||||
Packer* = ref object
|
||||
headerStream: StringStream
|
||||
payloadStream: StringStream
|
||||
stream: StringStream
|
||||
|
||||
proc initTaskPacker*(): Packer =
|
||||
result = new Packer
|
||||
result.headerStream = newStringStream()
|
||||
result.payloadStream = newStringStream()
|
||||
result.stream = newStringStream()
|
||||
|
||||
proc addToHeader*[T: uint8 | uint16 | uint32 | uint64](packer: Packer, value: T): Packer {.discardable.} =
|
||||
packer.headerStream.write(value)
|
||||
proc add*[T: uint8 | uint16 | uint32 | uint64](packer: Packer, value: T): Packer {.discardable.} =
|
||||
packer.stream.write(value)
|
||||
return packer
|
||||
|
||||
proc addToPayload*[T: uint8 | uint16 | uint32 | uint64](packer: Packer, value: T): Packer {.discardable.} =
|
||||
packer.payloadStream.write(value)
|
||||
return packer
|
||||
|
||||
proc addDataToHeader*(packer: Packer, data: openArray[byte]): Packer {.discardable.} =
|
||||
packer.headerStream.writeData(data[0].unsafeAddr, data.len)
|
||||
return packer
|
||||
|
||||
proc addDataToPayload*(packer: Packer, data: openArray[byte]): Packer {.discardable.} =
|
||||
packer.payloadStream.writeData(data[0].unsafeAddr, data.len)
|
||||
proc addData*(packer: Packer, data: openArray[byte]): Packer {.discardable.} =
|
||||
packer.stream.writeData(data[0].unsafeAddr, data.len)
|
||||
return packer
|
||||
|
||||
proc addArgument*(packer: Packer, arg: TaskArg): Packer {.discardable.} =
|
||||
@@ -33,34 +23,79 @@ proc addArgument*(packer: Packer, arg: TaskArg): Packer {.discardable.} =
|
||||
# Optional argument was passed as "", ignore
|
||||
return
|
||||
|
||||
packer.addToPayload(arg.argType)
|
||||
packer.add(arg.argType)
|
||||
|
||||
case arg.argType:
|
||||
of cast[uint8](STRING), cast[uint8](BINARY):
|
||||
# Add length for variable-length data types
|
||||
packer.addToPayload(cast[uint32](arg.data.len))
|
||||
packer.addDataToPayload(arg.data)
|
||||
packer.add(cast[uint32](arg.data.len))
|
||||
packer.addData(arg.data)
|
||||
else:
|
||||
packer.addDataToPayload(arg.data)
|
||||
packer.addData(arg.data)
|
||||
return packer
|
||||
|
||||
proc packPayload*(packer: Packer): seq[byte] =
|
||||
packer.payloadStream.setPosition(0)
|
||||
let data = packer.payloadStream.readAll()
|
||||
proc pack*(packer: Packer): seq[byte] =
|
||||
packer.stream.setPosition(0)
|
||||
let data = packer.stream.readAll()
|
||||
|
||||
result = newSeq[byte](data.len)
|
||||
for i, c in data:
|
||||
result[i] = byte(c.ord)
|
||||
|
||||
packer.payloadStream.setPosition(0)
|
||||
packer.stream.setPosition(0)
|
||||
|
||||
proc packHeader*(packer: Packer): seq[byte] =
|
||||
packer.headerStream.setPosition(0)
|
||||
let data = packer.headerStream.readAll()
|
||||
proc reset*(packer: Packer): Packer {.discardable.} =
|
||||
packer.stream.close()
|
||||
packer.stream = newStringStream()
|
||||
return packer
|
||||
|
||||
type
|
||||
Unpacker* = ref object
|
||||
stream: StringStream
|
||||
position: int
|
||||
|
||||
proc initUnpacker*(data: string): Unpacker =
|
||||
result = new Unpacker
|
||||
result.stream = newStringStream(data)
|
||||
result.position = 0
|
||||
|
||||
proc getUint8*(unpacker: Unpacker): uint8 =
|
||||
result = unpacker.stream.readUint8()
|
||||
unpacker.position += 1
|
||||
|
||||
proc getUint16*(unpacker: Unpacker): uint16 =
|
||||
result = unpacker.stream.readUint16()
|
||||
unpacker.position += 2
|
||||
|
||||
proc getUint32*(unpacker: Unpacker): uint32 =
|
||||
result = unpacker.stream.readUint32()
|
||||
unpacker.position += 4
|
||||
|
||||
proc getUint64*(unpacker: Unpacker): uint64 =
|
||||
result = unpacker.stream.readUint64()
|
||||
unpacker.position += 8
|
||||
|
||||
proc getBytes*(unpacker: Unpacker, length: int): seq[byte] =
|
||||
result = newSeq[byte](length)
|
||||
let bytesRead = unpacker.stream.readData(result[0].addr, length)
|
||||
unpacker.position += bytesRead
|
||||
|
||||
# Convert string to seq[byte]
|
||||
result = newSeq[byte](data.len)
|
||||
for i, c in data:
|
||||
result[i] = byte(c.ord)
|
||||
if bytesRead != length:
|
||||
raise newException(IOError, "Not enough data to read")
|
||||
|
||||
proc getArgument*(unpacker: Unpacker): TaskArg =
|
||||
result.argType = unpacker.getUint8()
|
||||
|
||||
packer.headerStream.setPosition(0)
|
||||
case result.argType:
|
||||
of cast[uint8](STRING), cast[uint8](BINARY):
|
||||
# Variable-length fields are prefixed with the content-length
|
||||
let length = unpacker.getUint32()
|
||||
result.data = unpacker.getBytes(int(length))
|
||||
of cast[uint8](INT):
|
||||
result.data = unpacker.getBytes(4)
|
||||
of cast[uint8](LONG):
|
||||
result.data = unpacker.getBytes(8)
|
||||
of cast[uint8](BOOL):
|
||||
result.data = unpacker.getBytes(1)
|
||||
else:
|
||||
discard
|
||||
Reference in New Issue
Block a user