Restart unhealthy (#417) (#441)

This commit is contained in:
Quentin McGaw
2021-05-04 15:36:12 -04:00
committed by GitHub
parent 954e3c70b2
commit 167a0b0b29
4 changed files with 69 additions and 10 deletions

View File

@@ -253,9 +253,10 @@ func _main(ctx context.Context, buildInfo models.BuildInformation,
} // TODO move inside firewall? } // TODO move inside firewall?
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
healthy := make(chan bool)
openvpnLooper := openvpn.NewLooper(allSettings.OpenVPN, nonRootUsername, puid, pgid, allServers, openvpnLooper := openvpn.NewLooper(allSettings.OpenVPN, nonRootUsername, puid, pgid, allServers,
ovpnConf, firewallConf, routingConf, logger, httpClient, os.OpenFile, tunnelReadyCh, cancel) ovpnConf, firewallConf, routingConf, logger, httpClient, os.OpenFile, tunnelReadyCh, healthy, cancel)
wg.Add(1) wg.Add(1)
// wait for restartOpenvpn // wait for restartOpenvpn
go openvpnLooper.Run(ctx, wg) go openvpnLooper.Run(ctx, wg)
@@ -302,7 +303,7 @@ func _main(ctx context.Context, buildInfo models.BuildInformation,
healthcheckServer := healthcheck.NewServer( healthcheckServer := healthcheck.NewServer(
constants.HealthcheckAddress, logger) constants.HealthcheckAddress, logger)
wg.Add(1) wg.Add(1)
go healthcheckServer.Run(ctx, wg) go healthcheckServer.Run(ctx, healthy, wg)
// Start openvpn for the first time in a blocking call // Start openvpn for the first time in a blocking call
// until openvpn is launched // until openvpn is launched

View File

@@ -10,7 +10,7 @@ import (
"time" "time"
) )
func (s *server) runHealthcheckLoop(ctx context.Context, wg *sync.WaitGroup) { func (s *server) runHealthcheckLoop(ctx context.Context, healthy chan<- bool, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
for { for {
previousErr := s.handler.getErr() previousErr := s.handler.getErr()
@@ -18,6 +18,12 @@ func (s *server) runHealthcheckLoop(ctx context.Context, wg *sync.WaitGroup) {
err := healthCheck(ctx, s.resolver) err := healthCheck(ctx, s.resolver)
s.handler.setErr(err) s.handler.setErr(err)
// Notify the healthy channel, or not if it's already full
select {
case healthy <- err == nil:
default:
}
if previousErr != nil && err == nil { if previousErr != nil && err == nil {
s.logger.Info("healthy!") s.logger.Info("healthy!")
} else if previousErr == nil && err != nil { } else if previousErr == nil && err != nil {
@@ -36,8 +42,8 @@ func (s *server) runHealthcheckLoop(ctx context.Context, wg *sync.WaitGroup) {
} }
continue continue
} }
// Success, check again in 10 minutes // Success, check again in 5 seconds
const period = 10 * time.Minute const period = 5 * time.Second
timer := time.NewTimer(period) timer := time.NewTimer(period)
select { select {
case <-ctx.Done(): case <-ctx.Done():

View File

@@ -12,7 +12,7 @@ import (
) )
type Server interface { type Server interface {
Run(ctx context.Context, wg *sync.WaitGroup) Run(ctx context.Context, healthy chan<- bool, wg *sync.WaitGroup)
} }
type server struct { type server struct {
@@ -32,12 +32,12 @@ func NewServer(address string, logger logging.Logger) Server {
} }
} }
func (s *server) Run(ctx context.Context, wg *sync.WaitGroup) { func (s *server) Run(ctx context.Context, healthy chan<- bool, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
internalWg := &sync.WaitGroup{} internalWg := &sync.WaitGroup{}
internalWg.Add(1) internalWg.Add(1)
go s.runHealthcheckLoop(ctx, internalWg) go s.runHealthcheckLoop(ctx, healthy, internalWg)
server := http.Server{ server := http.Server{
Addr: s.address, Addr: s.address,

View File

@@ -45,6 +45,7 @@ type looper struct {
client *http.Client client *http.Client
openFile os.OpenFileFunc openFile os.OpenFileFunc
tunnelReady chan<- struct{} tunnelReady chan<- struct{}
healthy <-chan bool
cancel context.CancelFunc cancel context.CancelFunc
// Internal channels and locks // Internal channels and locks
loopLock sync.Mutex loopLock sync.Mutex
@@ -54,15 +55,19 @@ type looper struct {
portForwardSignals chan net.IP portForwardSignals chan net.IP
crashed bool crashed bool
backoffTime time.Duration backoffTime time.Duration
healthWaitTime time.Duration
} }
const defaultBackoffTime = 15 * time.Second const (
defaultBackoffTime = 15 * time.Second
defaultHealthWaitTime = 6 * time.Second
)
func NewLooper(settings configuration.OpenVPN, func NewLooper(settings configuration.OpenVPN,
username string, puid, pgid int, allServers models.AllServers, username string, puid, pgid int, allServers models.AllServers,
conf Configurator, fw firewall.Configurator, routing routing.Routing, conf Configurator, fw firewall.Configurator, routing routing.Routing,
logger logging.Logger, client *http.Client, openFile os.OpenFileFunc, logger logging.Logger, client *http.Client, openFile os.OpenFileFunc,
tunnelReady chan<- struct{}, cancel context.CancelFunc) Looper { tunnelReady chan<- struct{}, healthy <-chan bool, cancel context.CancelFunc) Looper {
return &looper{ return &looper{
state: state{ state: state{
status: constants.Stopped, status: constants.Stopped,
@@ -80,6 +85,7 @@ func NewLooper(settings configuration.OpenVPN,
client: client, client: client,
openFile: openFile, openFile: openFile,
tunnelReady: tunnelReady, tunnelReady: tunnelReady,
healthy: healthy,
cancel: cancel, cancel: cancel,
start: make(chan struct{}), start: make(chan struct{}),
running: make(chan models.LoopStatus), running: make(chan models.LoopStatus),
@@ -87,6 +93,7 @@ func NewLooper(settings configuration.OpenVPN,
stopped: make(chan struct{}), stopped: make(chan struct{}),
portForwardSignals: make(chan net.IP), portForwardSignals: make(chan net.IP),
backoffTime: defaultBackoffTime, backoffTime: defaultBackoffTime,
healthWaitTime: defaultHealthWaitTime,
} }
} }
@@ -215,6 +222,22 @@ func (l *looper) Run(ctx context.Context, wg *sync.WaitGroup) { //nolint:gocogni
l.logAndWait(ctx, err) l.logAndWait(ctx, err)
l.crashed = true l.crashed = true
stayHere = false stayHere = false
case healthy := <-l.healthy:
if healthy {
continue
}
// ensure it stays unhealthy for some time before restarting it
healthy = l.waitForHealth(ctx)
if healthy || ctx.Err() != nil {
continue
}
l.crashed = true // flag as crashed
l.state.setStatusWithLock(constants.Stopping)
l.logger.Warn("unhealthy program: restarting openvpn")
openvpnCancel()
<-waitError
l.state.setStatusWithLock(constants.Stopped)
stayHere = false
} }
} }
close(waitError) close(waitError)
@@ -240,6 +263,35 @@ func (l *looper) logAndWait(ctx context.Context, err error) {
} }
} }
// waitForHealth waits for a true healthy signal
// after restarting openvpn in order to avoid restarting
// openvpn in a loop as it requires a few seconds to connect.
func (l *looper) waitForHealth(ctx context.Context) (healthy bool) {
l.logger.Info("unhealthy program: waiting %s for it to change to healthy", l.healthWaitTime)
timer := time.NewTimer(l.healthWaitTime)
l.healthWaitTime *= 2
for {
select {
case healthy = <-l.healthy:
if !healthy {
break
}
if !timer.Stop() {
<-timer.C
}
l.healthWaitTime = defaultHealthWaitTime
return true
case <-timer.C:
return false
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return false
}
}
}
// portForward is a blocking operation which may or may not be infinite. // portForward is a blocking operation which may or may not be infinite.
// You should therefore always call it in a goroutine. // You should therefore always call it in a goroutine.
func (l *looper) portForward(ctx context.Context, wg *sync.WaitGroup, func (l *looper) portForward(ctx context.Context, wg *sync.WaitGroup,