Maintenance: improve stream merging

This commit is contained in:
Quentin McGaw
2021-01-26 04:17:22 +00:00
parent 937d09f1c3
commit a243d48fb1
13 changed files with 321 additions and 248 deletions

61
internal/dns/logs.go Normal file
View File

@@ -0,0 +1,61 @@
package dns
import (
"regexp"
"strings"
"sync"
"github.com/qdm12/gluetun/internal/constants"
"github.com/qdm12/golibs/logging"
)
func (l *looper) collectLines(wg *sync.WaitGroup, stdout, stderr <-chan string) {
defer wg.Done()
var line string
var ok bool
for {
select {
case line, ok = <-stderr:
case line, ok = <-stdout:
}
if !ok {
return
}
line, level := processLogLine(line)
switch level {
case logging.DebugLevel:
l.logger.Debug(line)
case logging.InfoLevel:
l.logger.Info(line)
case logging.WarnLevel:
l.logger.Warn(line)
case logging.ErrorLevel:
l.logger.Error(line)
}
}
}
var unboundPrefix = regexp.MustCompile(`\[[0-9]{10}\] unbound\[[0-9]+:[0|1]\] `)
func processLogLine(s string) (filtered string, level logging.Level) {
prefix := unboundPrefix.FindString(s)
filtered = s[len(prefix):]
switch {
case strings.HasPrefix(filtered, "notice: "):
filtered = strings.TrimPrefix(filtered, "notice: ")
level = logging.InfoLevel
case strings.HasPrefix(filtered, "info: "):
filtered = strings.TrimPrefix(filtered, "info: ")
level = logging.InfoLevel
case strings.HasPrefix(filtered, "warn: "):
filtered = strings.TrimPrefix(filtered, "warn: ")
level = logging.WarnLevel
case strings.HasPrefix(filtered, "error: "):
filtered = strings.TrimPrefix(filtered, "error: ")
level = logging.ErrorLevel
default:
level = logging.InfoLevel
}
filtered = constants.ColorUnbound().Sprintf(filtered)
return filtered, level
}

49
internal/dns/logs_test.go Normal file
View File

@@ -0,0 +1,49 @@
package dns
import (
"testing"
"github.com/qdm12/golibs/logging"
"github.com/stretchr/testify/assert"
)
func Test_processLogLine(t *testing.T) {
t.Parallel()
tests := map[string]struct {
s string
filtered string
level logging.Level
}{
"empty string": {"", "", logging.InfoLevel},
"random string": {"asdasqdb", "asdasqdb", logging.InfoLevel},
"unbound notice": {
"[1594595249] unbound[75:0] notice: init module 0: validator",
"init module 0: validator",
logging.InfoLevel},
"unbound info": {
"[1594595249] unbound[75:0] info: init module 0: validator",
"init module 0: validator",
logging.InfoLevel},
"unbound warn": {
"[1594595249] unbound[75:0] warn: init module 0: validator",
"init module 0: validator",
logging.WarnLevel},
"unbound error": {
"[1594595249] unbound[75:0] error: init module 0: validator",
"init module 0: validator",
logging.ErrorLevel},
"unbound unknown": {
"[1594595249] unbound[75:0] BLA: init module 0: validator",
"BLA: init module 0: validator",
logging.InfoLevel},
}
for name, tc := range tests {
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
filtered, level := processLogLine(tc.s)
assert.Equal(t, tc.filtered, filtered)
assert.Equal(t, tc.level, level)
})
}
}

View File

@@ -12,7 +12,6 @@ import (
"github.com/qdm12/gluetun/internal/constants"
"github.com/qdm12/gluetun/internal/models"
"github.com/qdm12/gluetun/internal/settings"
"github.com/qdm12/golibs/command"
"github.com/qdm12/golibs/logging"
)
@@ -30,7 +29,6 @@ type looper struct {
conf unbound.Configurator
client *http.Client
logger logging.Logger
streamMerger command.StreamMerger
username string
puid int
pgid int
@@ -48,8 +46,7 @@ type looper struct {
const defaultBackoffTime = 10 * time.Second
func NewLooper(conf unbound.Configurator, settings settings.DNS, client *http.Client,
logger logging.Logger, streamMerger command.StreamMerger,
username string, puid, pgid int) Looper {
logger logging.Logger, username string, puid, pgid int) Looper {
return &looper{
state: state{
status: constants.Stopped,
@@ -61,7 +58,6 @@ func NewLooper(conf unbound.Configurator, settings settings.DNS, client *http.Cl
username: username,
puid: puid,
pgid: pgid,
streamMerger: streamMerger,
start: make(chan struct{}),
running: make(chan models.LoopStatus),
stop: make(chan struct{}),
@@ -107,8 +103,10 @@ func (l *looper) Run(ctx context.Context, wg *sync.WaitGroup, dnsReadyCh chan<-
for ctx.Err() == nil {
// Upper scope variables for Unbound only
var unboundCancel context.CancelFunc = func() {}
waitError := make(chan error)
// Their values are to be used if DOT=off
var waitError chan error
var unboundCancel context.CancelFunc
var closeStreams func()
for l.GetSettings().Enabled {
if ctx.Err() != nil {
@@ -116,7 +114,7 @@ func (l *looper) Run(ctx context.Context, wg *sync.WaitGroup, dnsReadyCh chan<-
return
}
var err error
unboundCancel, err = l.setupUnbound(ctx, crashed, waitError)
unboundCancel, waitError, closeStreams, err = l.setupUnbound(ctx, wg, crashed)
if err != nil {
if !errors.Is(err, errUpdateFiles) {
const fallback = true
@@ -130,6 +128,9 @@ func (l *looper) Run(ctx context.Context, wg *sync.WaitGroup, dnsReadyCh chan<-
if !l.GetSettings().Enabled {
const fallback = false
l.useUnencryptedDNS(fallback)
waitError := make(chan error)
unboundCancel = func() { waitError <- nil }
closeStreams = func() {}
}
dnsReadyCh <- struct{}{}
@@ -142,6 +143,7 @@ func (l *looper) Run(ctx context.Context, wg *sync.WaitGroup, dnsReadyCh chan<-
unboundCancel()
<-waitError
close(waitError)
closeStreams()
return
case <-l.stop:
l.logger.Info("stopping")
@@ -163,7 +165,7 @@ func (l *looper) Run(ctx context.Context, wg *sync.WaitGroup, dnsReadyCh chan<-
}
}
close(waitError)
unboundCancel()
closeStreams()
}
}
@@ -172,28 +174,29 @@ var errUpdateFiles = errors.New("cannot update files")
// Returning cancel == nil signals we want to re-run setupUnbound
// Returning err == errUpdateFiles signals we should not fall back
// on the plaintext DNS as DOT is still up and running.
func (l *looper) setupUnbound(ctx context.Context,
previousCrashed bool, waitError chan<- error) (cancel context.CancelFunc, err error) {
func (l *looper) setupUnbound(ctx context.Context, wg *sync.WaitGroup,
previousCrashed bool) (cancel context.CancelFunc, waitError chan error,
closeStreams func(), err error) {
err = l.updateFiles(ctx)
if err != nil {
l.state.setStatusWithLock(constants.Crashed)
return nil, errUpdateFiles
return nil, nil, nil, errUpdateFiles
}
settings := l.GetSettings()
unboundCtx, cancel := context.WithCancel(context.Background())
stream, waitFn, err := l.conf.Start(unboundCtx, settings.Unbound.VerbosityDetailsLevel)
stdoutLines, stderrLines, waitError, err := l.conf.Start(unboundCtx, settings.Unbound.VerbosityDetailsLevel)
if err != nil {
cancel()
if !previousCrashed {
l.running <- constants.Crashed
}
return nil, err
return nil, nil, nil, err
}
// Started successfully
go l.streamMerger.Merge(unboundCtx, stream, command.MergeName("unbound"))
wg.Add(1)
go l.collectLines(wg, stdoutLines, stderrLines)
l.conf.UseDNSInternally(net.IP{127, 0, 0, 1}) // use Unbound
if err := l.conf.UseDNSSystemWide(net.IP{127, 0, 0, 1}, settings.KeepNameserver); err != nil { // use Unbound
@@ -205,14 +208,13 @@ func (l *looper) setupUnbound(ctx context.Context,
l.running <- constants.Crashed
}
cancel()
return nil, err
<-waitError
close(waitError)
close(stdoutLines)
close(stderrLines)
return nil, nil, nil, err
}
go func() {
err := waitFn() // blocking
waitError <- err
}()
l.logger.Info("ready")
if !previousCrashed {
l.running <- constants.Running
@@ -220,7 +222,13 @@ func (l *looper) setupUnbound(ctx context.Context,
l.backoffTime = defaultBackoffTime
l.state.setStatusWithLock(constants.Running)
}
return cancel, nil
closeStreams = func() {
close(stdoutLines)
close(stderrLines)
}
return cancel, waitError, closeStreams, nil
}
func (l *looper) useUnencryptedDNS(fallback bool) {