diff --git a/internal/openvpn/logs.go b/internal/openvpn/config/logs.go similarity index 67% rename from internal/openvpn/logs.go rename to internal/openvpn/config/logs.go index c880e88d..04b66a05 100644 --- a/internal/openvpn/logs.go +++ b/internal/openvpn/config/logs.go @@ -1,7 +1,6 @@ -package openvpn +package config import ( - "context" "strings" "github.com/fatih/color" @@ -9,48 +8,6 @@ import ( "github.com/qdm12/golibs/logging" ) -func (l *Loop) collectLines(ctx context.Context, done chan<- struct{}, - stdout, stderr chan string, tunnelUpData tunnelUpData) { - defer close(done) - - var line string - - for { - errLine := false - select { - case <-ctx.Done(): - // Context should only be canceled after stdout and stderr are done - // being written to. - close(stdout) - close(stderr) - return - case line = <-stdout: - case line = <-stderr: - errLine = true - } - line, level := processLogLine(line) - if line == "" { - continue // filtered out - } - if errLine { - level = logging.LevelError - } - switch level { - case logging.LevelDebug: - l.logger.Debug(line) - case logging.LevelInfo: - l.logger.Info(line) - case logging.LevelWarn: - l.logger.Warn(line) - case logging.LevelError: - l.logger.Error(line) - } - if strings.Contains(line, "Initialization Sequence Completed") { - l.onTunnelUp(ctx, tunnelUpData) - } - } -} - func processLogLine(s string) (filtered string, level logging.Level) { for _, ignored := range []string{ "WARNING: you are using user/group/chroot/setcon without persist-tun -- this may cause restarts to fail", diff --git a/internal/openvpn/logs_test.go b/internal/openvpn/config/logs_test.go similarity index 98% rename from internal/openvpn/logs_test.go rename to internal/openvpn/config/logs_test.go index 4c30332f..03c3f57d 100644 --- a/internal/openvpn/logs_test.go +++ b/internal/openvpn/config/logs_test.go @@ -1,4 +1,4 @@ -package openvpn +package config import ( "testing" diff --git a/internal/openvpn/config/openvpn.go b/internal/openvpn/config/openvpn.go index 537694d2..d35b17eb 100644 --- a/internal/openvpn/config/openvpn.go +++ b/internal/openvpn/config/openvpn.go @@ -6,10 +6,12 @@ import ( "github.com/qdm12/golibs/logging" ) +var _ Interface = (*Configurator)(nil) + type Interface interface { VersionGetter AuthWriter - Starter + Runner Writer } diff --git a/internal/openvpn/config/run.go b/internal/openvpn/config/run.go new file mode 100644 index 00000000..6c1607f5 --- /dev/null +++ b/internal/openvpn/config/run.go @@ -0,0 +1,41 @@ +package config + +import ( + "context" + + "github.com/qdm12/gluetun/internal/configuration" + "github.com/qdm12/golibs/logging" +) + +type Runner interface { + Run(ctx context.Context, errCh chan<- error, ready chan<- struct{}, + logger logging.Logger, settings configuration.OpenVPN) +} + +func (c *Configurator) Run(ctx context.Context, errCh chan<- error, + ready chan<- struct{}, logger logging.Logger, settings configuration.OpenVPN) { + stdoutLines, stderrLines, waitError, err := c.start(ctx, settings.Version, settings.Flags) + if err != nil { + errCh <- err + return + } + + streamCtx, streamCancel := context.WithCancel(context.Background()) + streamDone := make(chan struct{}) + go streamLines(streamCtx, streamDone, logger, + stdoutLines, stderrLines, ready) + + select { + case <-ctx.Done(): + <-waitError + close(waitError) + streamCancel() + <-streamDone + errCh <- ctx.Err() + case err := <-waitError: + close(waitError) + streamCancel() + <-streamDone + errCh <- err + } +} diff --git a/internal/openvpn/config/start.go b/internal/openvpn/config/start.go index 668972a6..ef982dc4 100644 --- a/internal/openvpn/config/start.go +++ b/internal/openvpn/config/start.go @@ -17,12 +17,7 @@ const ( binOpenvpn25 = "openvpn" ) -type Starter interface { - Start(ctx context.Context, version string, flags []string) ( - stdoutLines, stderrLines chan string, waitError chan error, err error) -} - -func (c *Configurator) Start(ctx context.Context, version string, flags []string) ( +func (c *Configurator) start(ctx context.Context, version string, flags []string) ( stdoutLines, stderrLines chan string, waitError chan error, err error) { var bin string switch version { diff --git a/internal/openvpn/config/stream.go b/internal/openvpn/config/stream.go new file mode 100644 index 00000000..d01b88e5 --- /dev/null +++ b/internal/openvpn/config/stream.go @@ -0,0 +1,53 @@ +package config + +import ( + "context" + "strings" + + "github.com/qdm12/golibs/logging" +) + +func streamLines(ctx context.Context, done chan<- struct{}, + logger logging.Logger, stdout, stderr chan string, + tunnelReady chan<- struct{}) { + defer close(done) + + var line string + + for { + errLine := false + select { + case <-ctx.Done(): + // Context should only be canceled after stdout and stderr are done + // being written to. + close(stdout) + close(stderr) + return + case line = <-stdout: + case line = <-stderr: + errLine = true + } + line, level := processLogLine(line) + if line == "" { + continue // filtered out + } + if errLine { + level = logging.LevelError + } + switch level { + case logging.LevelDebug: + logger.Debug(line) + case logging.LevelInfo: + logger.Info(line) + case logging.LevelWarn: + logger.Warn(line) + case logging.LevelError: + logger.Error(line) + } + if strings.Contains(line, "Initialization Sequence Completed") { + // do not close tunnelReady in case the initialization + // happens multiple times without Openvpn restarting + tunnelReady <- struct{}{} + } + } +} diff --git a/internal/openvpn/helpers.go b/internal/openvpn/helpers.go index 2d9a40f8..7484771a 100644 --- a/internal/openvpn/helpers.go +++ b/internal/openvpn/helpers.go @@ -8,6 +8,28 @@ import ( "github.com/qdm12/gluetun/internal/models" ) +// waitForError waits 100ms for an error in the waitError channel. +func (l *Loop) waitForError(ctx context.Context, + waitError chan error) (err error) { + const waitDurationForError = 100 * time.Millisecond + timer := time.NewTimer(waitDurationForError) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return ctx.Err() + case <-timer.C: + return nil + case err := <-waitError: + close(waitError) + if !timer.Stop() { + <-timer.C + } + return err + } +} + func (l *Loop) crashed(ctx context.Context, err error) { l.signalOrSetStatus(constants.Crashed) l.logAndWait(ctx, err) diff --git a/internal/openvpn/run.go b/internal/openvpn/run.go index 6c330ff9..fff53642 100644 --- a/internal/openvpn/run.go +++ b/internal/openvpn/run.go @@ -31,29 +31,23 @@ func (l *Loop) Run(ctx context.Context, done chan<- struct{}) { l.crashed(ctx, err) continue } - - openvpnCtx, openvpnCancel := context.WithCancel(context.Background()) - - stdoutLines, stderrLines, waitError, err := l.openvpnConf.Start( - openvpnCtx, openVPNSettings.Version, openVPNSettings.Flags) - if err != nil { - openvpnCancel() - l.crashed(ctx, err) - continue - } - - linesCollectionCtx, linesCollectionCancel := context.WithCancel(context.Background()) - lineCollectionDone := make(chan struct{}) tunnelUpData := tunnelUpData{ portForwarding: providerSettings.PortForwarding.Enabled, serverName: serverName, portForwarder: providerConf, } - go l.collectLines(linesCollectionCtx, lineCollectionDone, - stdoutLines, stderrLines, tunnelUpData) - closeStreams := func() { - linesCollectionCancel() - <-lineCollectionDone + + openvpnCtx, openvpnCancel := context.WithCancel(context.Background()) + waitError := make(chan error) + tunnelReady := make(chan struct{}) + + go l.openvpnConf.Run(openvpnCtx, waitError, tunnelReady, + l.logger, openVPNSettings) + + if err := l.waitForError(ctx, waitError); err != nil { + openvpnCancel() + l.crashed(ctx, err) + continue } l.backoffTime = defaultBackoffTime @@ -62,6 +56,8 @@ func (l *Loop) Run(ctx context.Context, done chan<- struct{}) { stayHere := true for stayHere { select { + case <-tunnelReady: + go l.onTunnelUp(openvpnCtx, tunnelUpData) case <-ctx.Done(): const pfTimeout = 100 * time.Millisecond l.stopPortForwarding(context.Background(), @@ -69,7 +65,6 @@ func (l *Loop) Run(ctx context.Context, done chan<- struct{}) { openvpnCancel() <-waitError close(waitError) - closeStreams() return case <-l.stop: l.userTrigger = true @@ -79,7 +74,6 @@ func (l *Loop) Run(ctx context.Context, done chan<- struct{}) { <-waitError // do not close waitError or the waitError // select case will trigger - closeStreams() l.stopped <- struct{}{} case <-l.start: l.userTrigger = true @@ -87,7 +81,6 @@ func (l *Loop) Run(ctx context.Context, done chan<- struct{}) { stayHere = false case err := <-waitError: // unexpected error close(waitError) - closeStreams() l.statusManager.Lock() // prevent SetStatus from running in parallel