Loops and HTTP control server rework (#308)

- CRUD REST HTTP server
- `/v1` HTTP server prefix
- Retrocompatible with older routes (redirects to v1 or handles the requests directly)
- DNS, Updater and Openvpn refactored to have a REST-like state with new methods to change their states synchronously
- Openvpn, Unbound and Updater status, see #287
This commit is contained in:
Quentin McGaw
2020-12-19 20:10:34 -05:00
committed by GitHub
parent d60d629105
commit 4257581f55
30 changed files with 1191 additions and 438 deletions

View File

@@ -20,23 +20,18 @@ import (
type Looper interface {
Run(ctx context.Context, wg *sync.WaitGroup)
Restart()
PortForward(vpnGatewayIP net.IP)
GetStatus() (status models.LoopStatus)
SetStatus(status models.LoopStatus) (outcome string, err error)
GetSettings() (settings settings.OpenVPN)
SetSettings(settings settings.OpenVPN)
GetPortForwarded() (portForwarded uint16)
SetAllServers(allServers models.AllServers)
SetSettings(settings settings.OpenVPN) (outcome string)
GetServers() (servers models.AllServers)
SetServers(servers models.AllServers)
GetPortForwarded() (port uint16)
PortForward(vpnGatewayIP net.IP)
}
type looper struct {
// Variable parameters
provider models.VPNProvider
settings settings.OpenVPN
settingsMutex sync.RWMutex
portForwarded uint16
portForwardedMutex sync.RWMutex
allServers models.AllServers
allServersMutex sync.RWMutex
state state
// Fixed parameters
uid int
gid int
@@ -50,22 +45,27 @@ type looper struct {
fileManager files.FileManager
streamMerger command.StreamMerger
cancel context.CancelFunc
// Internal channels
restart chan struct{}
// Internal channels and locks
loopLock sync.Mutex
running chan models.LoopStatus
stop, stopped chan struct{}
start chan struct{}
portForwardSignals chan net.IP
}
func NewLooper(provider models.VPNProvider, settings settings.OpenVPN,
func NewLooper(settings settings.OpenVPN,
uid, gid int, allServers models.AllServers,
conf Configurator, fw firewall.Configurator, routing routing.Routing,
logger logging.Logger, client *http.Client, fileManager files.FileManager,
streamMerger command.StreamMerger, cancel context.CancelFunc) Looper {
return &looper{
provider: provider,
settings: settings,
state: state{
status: constants.Stopped,
settings: settings,
allServers: allServers,
},
uid: uid,
gid: gid,
allServers: allServers,
conf: conf,
fw: fw,
routing: routing,
@@ -75,46 +75,29 @@ func NewLooper(provider models.VPNProvider, settings settings.OpenVPN,
fileManager: fileManager,
streamMerger: streamMerger,
cancel: cancel,
restart: make(chan struct{}),
start: make(chan struct{}),
running: make(chan models.LoopStatus),
stop: make(chan struct{}),
stopped: make(chan struct{}),
portForwardSignals: make(chan net.IP),
}
}
func (l *looper) Restart() { l.restart <- struct{}{} }
func (l *looper) PortForward(vpnGateway net.IP) { l.portForwardSignals <- vpnGateway }
func (l *looper) GetSettings() (settings settings.OpenVPN) {
l.settingsMutex.RLock()
defer l.settingsMutex.RUnlock()
return l.settings
}
func (l *looper) SetSettings(settings settings.OpenVPN) {
l.settingsMutex.Lock()
defer l.settingsMutex.Unlock()
l.settings = settings
}
func (l *looper) SetAllServers(allServers models.AllServers) {
l.allServersMutex.Lock()
defer l.allServersMutex.Unlock()
l.allServers = allServers
}
func (l *looper) Run(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
crashed := false
select {
case <-l.restart:
case <-l.start:
case <-ctx.Done():
return
}
defer l.logger.Warn("loop exited")
for ctx.Err() == nil {
settings := l.GetSettings()
l.allServersMutex.RLock()
providerConf := provider.New(l.provider, l.allServers, time.Now)
l.allServersMutex.RUnlock()
settings, allServers := l.state.getSettingsAndServers()
providerConf := provider.New(settings.Provider.Name, allServers, time.Now)
connection, err := providerConf.GetOpenVPNConnection(settings.Provider.ServerSelection)
if err != nil {
l.logger.Error(err)
@@ -155,6 +138,10 @@ func (l *looper) Run(ctx context.Context, wg *sync.WaitGroup) {
stream, waitFn, err := l.conf.Start(openvpnCtx)
if err != nil {
openvpnCancel()
if !crashed {
l.running <- constants.Crashed
crashed = true
}
l.logAndWait(ctx, err)
continue
}
@@ -179,23 +166,41 @@ func (l *looper) Run(ctx context.Context, wg *sync.WaitGroup) {
err := waitFn() // blocking
waitError <- err
}()
select {
case <-ctx.Done():
l.logger.Warn("context canceled: exiting loop")
openvpnCancel()
<-waitError
close(waitError)
return
case <-l.restart: // triggered restart
l.logger.Info("restarting")
openvpnCancel()
<-waitError
close(waitError)
case err := <-waitError: // unexpected error
openvpnCancel()
close(waitError)
l.logAndWait(ctx, err)
if !crashed {
l.running <- constants.Running
crashed = false
} else {
l.state.setStatusWithLock(constants.Running)
}
stayHere := true
for stayHere {
select {
case <-ctx.Done():
l.logger.Warn("context canceled: exiting loop")
openvpnCancel()
<-waitError
close(waitError)
return
case <-l.stop:
l.logger.Info("stopping")
openvpnCancel()
<-waitError
l.stopped <- struct{}{}
case <-l.start:
l.logger.Info("starting")
stayHere = false
case err := <-waitError: // unexpected error
openvpnCancel()
l.state.setStatusWithLock(constants.Crashed)
l.logAndWait(ctx, err)
crashed = true
stayHere = false
}
}
close(waitError)
openvpnCancel() // just for the linter
}
}
@@ -218,24 +223,21 @@ func (l *looper) logAndWait(ctx context.Context, err error) {
func (l *looper) portForward(ctx context.Context, wg *sync.WaitGroup,
providerConf provider.Provider, client *http.Client, gateway net.IP) {
defer wg.Done()
settings := l.GetSettings()
l.state.portForwardedMu.RLock()
settings := l.state.settings
l.state.portForwardedMu.RUnlock()
if !settings.Provider.PortForwarding.Enabled {
return
}
syncState := func(port uint16) (pfFilepath models.Filepath) {
l.portForwardedMutex.Lock()
l.portForwarded = port
l.portForwardedMutex.Unlock()
settings := l.GetSettings()
l.state.portForwardedMu.Lock()
defer l.state.portForwardedMu.Unlock()
l.state.portForwarded = port
l.state.settingsMu.RLock()
defer l.state.settingsMu.RUnlock()
return settings.Provider.PortForwarding.Filepath
}
providerConf.PortForward(ctx,
client, l.fileManager, l.pfLogger,
gateway, l.fw, syncState)
}
func (l *looper) GetPortForwarded() (portForwarded uint16) {
l.portForwardedMutex.RLock()
defer l.portForwardedMutex.RUnlock()
return l.portForwarded
}