Maint: move OpenVPN streams processing to config package

This commit is contained in:
Quentin McGaw (desktop)
2021-08-18 21:16:28 +00:00
parent a0cb6fabfd
commit 0027a76c49
8 changed files with 136 additions and 73 deletions

View File

@@ -1,7 +1,6 @@
package openvpn
package config
import (
"context"
"strings"
"github.com/fatih/color"
@@ -9,48 +8,6 @@ import (
"github.com/qdm12/golibs/logging"
)
func (l *Loop) collectLines(ctx context.Context, done chan<- struct{},
stdout, stderr chan string, tunnelUpData tunnelUpData) {
defer close(done)
var line string
for {
errLine := false
select {
case <-ctx.Done():
// Context should only be canceled after stdout and stderr are done
// being written to.
close(stdout)
close(stderr)
return
case line = <-stdout:
case line = <-stderr:
errLine = true
}
line, level := processLogLine(line)
if line == "" {
continue // filtered out
}
if errLine {
level = logging.LevelError
}
switch level {
case logging.LevelDebug:
l.logger.Debug(line)
case logging.LevelInfo:
l.logger.Info(line)
case logging.LevelWarn:
l.logger.Warn(line)
case logging.LevelError:
l.logger.Error(line)
}
if strings.Contains(line, "Initialization Sequence Completed") {
l.onTunnelUp(ctx, tunnelUpData)
}
}
}
func processLogLine(s string) (filtered string, level logging.Level) {
for _, ignored := range []string{
"WARNING: you are using user/group/chroot/setcon without persist-tun -- this may cause restarts to fail",

View File

@@ -1,4 +1,4 @@
package openvpn
package config
import (
"testing"

View File

@@ -6,10 +6,12 @@ import (
"github.com/qdm12/golibs/logging"
)
var _ Interface = (*Configurator)(nil)
type Interface interface {
VersionGetter
AuthWriter
Starter
Runner
Writer
}

View File

@@ -0,0 +1,41 @@
package config
import (
"context"
"github.com/qdm12/gluetun/internal/configuration"
"github.com/qdm12/golibs/logging"
)
type Runner interface {
Run(ctx context.Context, errCh chan<- error, ready chan<- struct{},
logger logging.Logger, settings configuration.OpenVPN)
}
func (c *Configurator) Run(ctx context.Context, errCh chan<- error,
ready chan<- struct{}, logger logging.Logger, settings configuration.OpenVPN) {
stdoutLines, stderrLines, waitError, err := c.start(ctx, settings.Version, settings.Flags)
if err != nil {
errCh <- err
return
}
streamCtx, streamCancel := context.WithCancel(context.Background())
streamDone := make(chan struct{})
go streamLines(streamCtx, streamDone, logger,
stdoutLines, stderrLines, ready)
select {
case <-ctx.Done():
<-waitError
close(waitError)
streamCancel()
<-streamDone
errCh <- ctx.Err()
case err := <-waitError:
close(waitError)
streamCancel()
<-streamDone
errCh <- err
}
}

View File

@@ -17,12 +17,7 @@ const (
binOpenvpn25 = "openvpn"
)
type Starter interface {
Start(ctx context.Context, version string, flags []string) (
stdoutLines, stderrLines chan string, waitError chan error, err error)
}
func (c *Configurator) Start(ctx context.Context, version string, flags []string) (
func (c *Configurator) start(ctx context.Context, version string, flags []string) (
stdoutLines, stderrLines chan string, waitError chan error, err error) {
var bin string
switch version {

View File

@@ -0,0 +1,53 @@
package config
import (
"context"
"strings"
"github.com/qdm12/golibs/logging"
)
func streamLines(ctx context.Context, done chan<- struct{},
logger logging.Logger, stdout, stderr chan string,
tunnelReady chan<- struct{}) {
defer close(done)
var line string
for {
errLine := false
select {
case <-ctx.Done():
// Context should only be canceled after stdout and stderr are done
// being written to.
close(stdout)
close(stderr)
return
case line = <-stdout:
case line = <-stderr:
errLine = true
}
line, level := processLogLine(line)
if line == "" {
continue // filtered out
}
if errLine {
level = logging.LevelError
}
switch level {
case logging.LevelDebug:
logger.Debug(line)
case logging.LevelInfo:
logger.Info(line)
case logging.LevelWarn:
logger.Warn(line)
case logging.LevelError:
logger.Error(line)
}
if strings.Contains(line, "Initialization Sequence Completed") {
// do not close tunnelReady in case the initialization
// happens multiple times without Openvpn restarting
tunnelReady <- struct{}{}
}
}
}

View File

@@ -8,6 +8,28 @@ import (
"github.com/qdm12/gluetun/internal/models"
)
// waitForError waits 100ms for an error in the waitError channel.
func (l *Loop) waitForError(ctx context.Context,
waitError chan error) (err error) {
const waitDurationForError = 100 * time.Millisecond
timer := time.NewTimer(waitDurationForError)
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return ctx.Err()
case <-timer.C:
return nil
case err := <-waitError:
close(waitError)
if !timer.Stop() {
<-timer.C
}
return err
}
}
func (l *Loop) crashed(ctx context.Context, err error) {
l.signalOrSetStatus(constants.Crashed)
l.logAndWait(ctx, err)

View File

@@ -31,29 +31,23 @@ func (l *Loop) Run(ctx context.Context, done chan<- struct{}) {
l.crashed(ctx, err)
continue
}
openvpnCtx, openvpnCancel := context.WithCancel(context.Background())
stdoutLines, stderrLines, waitError, err := l.openvpnConf.Start(
openvpnCtx, openVPNSettings.Version, openVPNSettings.Flags)
if err != nil {
openvpnCancel()
l.crashed(ctx, err)
continue
}
linesCollectionCtx, linesCollectionCancel := context.WithCancel(context.Background())
lineCollectionDone := make(chan struct{})
tunnelUpData := tunnelUpData{
portForwarding: providerSettings.PortForwarding.Enabled,
serverName: serverName,
portForwarder: providerConf,
}
go l.collectLines(linesCollectionCtx, lineCollectionDone,
stdoutLines, stderrLines, tunnelUpData)
closeStreams := func() {
linesCollectionCancel()
<-lineCollectionDone
openvpnCtx, openvpnCancel := context.WithCancel(context.Background())
waitError := make(chan error)
tunnelReady := make(chan struct{})
go l.openvpnConf.Run(openvpnCtx, waitError, tunnelReady,
l.logger, openVPNSettings)
if err := l.waitForError(ctx, waitError); err != nil {
openvpnCancel()
l.crashed(ctx, err)
continue
}
l.backoffTime = defaultBackoffTime
@@ -62,6 +56,8 @@ func (l *Loop) Run(ctx context.Context, done chan<- struct{}) {
stayHere := true
for stayHere {
select {
case <-tunnelReady:
go l.onTunnelUp(openvpnCtx, tunnelUpData)
case <-ctx.Done():
const pfTimeout = 100 * time.Millisecond
l.stopPortForwarding(context.Background(),
@@ -69,7 +65,6 @@ func (l *Loop) Run(ctx context.Context, done chan<- struct{}) {
openvpnCancel()
<-waitError
close(waitError)
closeStreams()
return
case <-l.stop:
l.userTrigger = true
@@ -79,7 +74,6 @@ func (l *Loop) Run(ctx context.Context, done chan<- struct{}) {
<-waitError
// do not close waitError or the waitError
// select case will trigger
closeStreams()
l.stopped <- struct{}{}
case <-l.start:
l.userTrigger = true
@@ -87,7 +81,6 @@ func (l *Loop) Run(ctx context.Context, done chan<- struct{}) {
stayHere = false
case err := <-waitError: // unexpected error
close(waitError)
closeStreams()
l.statusManager.Lock() // prevent SetStatus from running in parallel