Feature: more robust updater DNS resolution
- Parallel resolver to resolve multiple hosts - Repeat resolver to repeat resolution for a single host - Additional parameters for fault toleration - Do not update servers if e.g. > 10% DNS resolutions failed - resolver package in updater package
This commit is contained in:
15
internal/updater/resolver/ips.go
Normal file
15
internal/updater/resolver/ips.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package resolver
|
||||
|
||||
import "net"
|
||||
|
||||
func uniqueIPsToSlice(uniqueIPs map[string]struct{}) (ips []net.IP) {
|
||||
ips = make([]net.IP, 0, len(uniqueIPs))
|
||||
for key := range uniqueIPs {
|
||||
IP := net.ParseIP(key)
|
||||
if IPv4 := IP.To4(); IPv4 != nil {
|
||||
IP = IPv4
|
||||
}
|
||||
ips = append(ips, IP)
|
||||
}
|
||||
return ips
|
||||
}
|
||||
41
internal/updater/resolver/ips_test.go
Normal file
41
internal/updater/resolver/ips_test.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package resolver
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func Test_uniqueIPsToSlice(t *testing.T) {
|
||||
t.Parallel()
|
||||
testCases := map[string]struct {
|
||||
inputIPs map[string]struct{}
|
||||
outputIPs []net.IP
|
||||
}{
|
||||
"nil": {
|
||||
inputIPs: nil,
|
||||
outputIPs: []net.IP{},
|
||||
},
|
||||
"empty": {
|
||||
inputIPs: map[string]struct{}{},
|
||||
outputIPs: []net.IP{},
|
||||
},
|
||||
"single IPv4": {
|
||||
inputIPs: map[string]struct{}{"1.1.1.1": {}},
|
||||
outputIPs: []net.IP{{1, 1, 1, 1}},
|
||||
},
|
||||
"two IPv4s": {
|
||||
inputIPs: map[string]struct{}{"1.1.1.1": {}, "1.1.2.1": {}},
|
||||
outputIPs: []net.IP{{1, 1, 1, 1}, {1, 1, 2, 1}},
|
||||
},
|
||||
}
|
||||
for name, testCase := range testCases {
|
||||
testCase := testCase
|
||||
t.Run(name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
outputIPs := uniqueIPsToSlice(testCase.inputIPs)
|
||||
assert.Equal(t, testCase.outputIPs, outputIPs)
|
||||
})
|
||||
}
|
||||
}
|
||||
17
internal/updater/resolver/net.go
Normal file
17
internal/updater/resolver/net.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package resolver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
)
|
||||
|
||||
func newResolver(resolverAddress string) *net.Resolver {
|
||||
d := net.Dialer{}
|
||||
resolverAddress = net.JoinHostPort(resolverAddress, "53")
|
||||
return &net.Resolver{
|
||||
PreferGo: true,
|
||||
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
|
||||
return d.DialContext(ctx, "udp", resolverAddress)
|
||||
},
|
||||
}
|
||||
}
|
||||
127
internal/updater/resolver/parallel.go
Normal file
127
internal/updater/resolver/parallel.go
Normal file
@@ -0,0 +1,127 @@
|
||||
package resolver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
)
|
||||
|
||||
type Parallel interface {
|
||||
Resolve(ctx context.Context, hosts []string, settings ParallelSettings) (
|
||||
hostToIPs map[string][]net.IP, warnings []string, err error)
|
||||
}
|
||||
|
||||
type parallel struct {
|
||||
repeatResolver Repeat
|
||||
}
|
||||
|
||||
func NewParallelResolver(address string) Parallel {
|
||||
return ¶llel{
|
||||
repeatResolver: NewRepeat(address),
|
||||
}
|
||||
}
|
||||
|
||||
type ParallelSettings struct {
|
||||
Repeat RepeatSettings
|
||||
FailEarly bool
|
||||
// Maximum ratio of the hosts failing DNS resolution
|
||||
// divided by the total number of hosts requested.
|
||||
// This value is between 0 and 1. Note this is only
|
||||
// applicable if FailEarly is not set to true.
|
||||
MaxFailRatio float64
|
||||
// MinFound is the minimum number of hosts to be found.
|
||||
// If it is bigger than the number of hosts given, it
|
||||
// is set to the number of hosts given.
|
||||
MinFound int
|
||||
}
|
||||
|
||||
type parallelResult struct {
|
||||
host string
|
||||
IPs []net.IP
|
||||
}
|
||||
|
||||
var (
|
||||
ErrMinFound = errors.New("not enough hosts found")
|
||||
ErrMaxFailRatio = errors.New("maximum failure ratio reached")
|
||||
)
|
||||
|
||||
func (pr *parallel) Resolve(ctx context.Context, hosts []string,
|
||||
settings ParallelSettings) (hostToIPs map[string][]net.IP, warnings []string, err error) {
|
||||
minFound := settings.MinFound
|
||||
if minFound > len(hosts) {
|
||||
minFound = len(hosts)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
results := make(chan parallelResult)
|
||||
defer close(results)
|
||||
errors := make(chan error)
|
||||
defer close(errors)
|
||||
|
||||
for _, host := range hosts {
|
||||
go pr.resolveAsync(ctx, host, settings.Repeat, results, errors)
|
||||
}
|
||||
|
||||
hostToIPs = make(map[string][]net.IP, len(hosts))
|
||||
maxFails := int(settings.MaxFailRatio * float64(len(hosts)))
|
||||
|
||||
for range hosts {
|
||||
select {
|
||||
case newErr := <-errors:
|
||||
if settings.FailEarly {
|
||||
if err == nil {
|
||||
// only set the error to the first error encountered
|
||||
// and not the context canceled errors coming after.
|
||||
err = newErr
|
||||
cancel()
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
// do not add warnings coming from the call to cancel()
|
||||
if len(warnings) < maxFails {
|
||||
warnings = append(warnings, newErr.Error())
|
||||
}
|
||||
|
||||
if len(warnings) == maxFails {
|
||||
cancel() // cancel only once when we reach maxFails
|
||||
}
|
||||
case result := <-results:
|
||||
hostToIPs[result.host] = result.IPs
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil { // fail early
|
||||
return nil, warnings, err
|
||||
}
|
||||
|
||||
if len(hostToIPs) < minFound {
|
||||
return nil, warnings,
|
||||
fmt.Errorf("%w: found %d hosts but expected at least %d",
|
||||
ErrMinFound, len(hostToIPs), minFound)
|
||||
}
|
||||
|
||||
failureRatio := float64(len(warnings)) / float64(len(hosts))
|
||||
if failureRatio > settings.MaxFailRatio {
|
||||
return hostToIPs, warnings,
|
||||
fmt.Errorf("%w: %.2f failure ratio reached", ErrMaxFailRatio, failureRatio)
|
||||
}
|
||||
|
||||
return hostToIPs, warnings, nil
|
||||
}
|
||||
|
||||
func (pr *parallel) resolveAsync(ctx context.Context, host string,
|
||||
settings RepeatSettings, results chan<- parallelResult, errors chan<- error) {
|
||||
IPs, err := pr.repeatResolver.Resolve(ctx, host, settings)
|
||||
if err != nil {
|
||||
errors <- err
|
||||
return
|
||||
}
|
||||
results <- parallelResult{
|
||||
host: host,
|
||||
IPs: IPs,
|
||||
}
|
||||
}
|
||||
141
internal/updater/resolver/repeat.go
Normal file
141
internal/updater/resolver/repeat.go
Normal file
@@ -0,0 +1,141 @@
|
||||
package resolver
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"sort"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Repeat interface {
|
||||
Resolve(ctx context.Context, host string, settings RepeatSettings) (IPs []net.IP, err error)
|
||||
}
|
||||
|
||||
type repeat struct {
|
||||
resolver *net.Resolver
|
||||
}
|
||||
|
||||
func NewRepeat(address string) Repeat {
|
||||
return &repeat{
|
||||
resolver: newResolver(address),
|
||||
}
|
||||
}
|
||||
|
||||
type RepeatSettings struct {
|
||||
MaxDuration time.Duration
|
||||
BetweenDuration time.Duration
|
||||
MaxNoNew int
|
||||
// Maximum consecutive DNS resolution failures
|
||||
MaxFails int
|
||||
SortIPs bool
|
||||
}
|
||||
|
||||
func (r *repeat) Resolve(ctx context.Context, host string, settings RepeatSettings) (ips []net.IP, err error) {
|
||||
timedCtx, cancel := context.WithTimeout(ctx, settings.MaxDuration)
|
||||
defer cancel()
|
||||
|
||||
noNewCounter := 0
|
||||
failCounter := 0
|
||||
uniqueIPs := make(map[string]struct{})
|
||||
|
||||
for err == nil {
|
||||
// TODO
|
||||
// - one resolving every 100ms for round robin DNS responses
|
||||
// - one every second for time based DNS cycling responses
|
||||
noNewCounter, failCounter, err = r.resolveOnce(ctx, timedCtx, host, settings, uniqueIPs, noNewCounter, failCounter)
|
||||
}
|
||||
|
||||
if len(uniqueIPs) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ips = uniqueIPsToSlice(uniqueIPs)
|
||||
|
||||
if settings.SortIPs {
|
||||
sort.Slice(ips, func(i, j int) bool {
|
||||
return bytes.Compare(ips[i], ips[j]) < 1
|
||||
})
|
||||
}
|
||||
|
||||
return ips, nil
|
||||
}
|
||||
|
||||
var (
|
||||
ErrMaxNoNew = errors.New("reached the maximum number of no new update")
|
||||
ErrMaxFails = errors.New("reached the maximum number of consecutive failures")
|
||||
ErrTimeout = errors.New("reached the timeout")
|
||||
)
|
||||
|
||||
func (r *repeat) resolveOnce(ctx, timedCtx context.Context, host string,
|
||||
settings RepeatSettings, uniqueIPs map[string]struct{}, noNewCounter, failCounter int) (
|
||||
newNoNewCounter, newFailCounter int, err error) {
|
||||
IPs, err := r.lookupIPs(timedCtx, host)
|
||||
if err != nil {
|
||||
failCounter++
|
||||
if settings.MaxFails > 0 && failCounter == settings.MaxFails {
|
||||
return noNewCounter, failCounter, fmt.Errorf("%w: %d failed attempts resolving %s: %s",
|
||||
ErrMaxFails, settings.MaxFails, host, err)
|
||||
}
|
||||
// it's fine to fail some of the resolutions
|
||||
return noNewCounter, failCounter, nil
|
||||
}
|
||||
failCounter = 0 // reset the counter if we had no error
|
||||
|
||||
anyNew := false
|
||||
for _, IP := range IPs {
|
||||
key := IP.String()
|
||||
if _, ok := uniqueIPs[key]; !ok {
|
||||
anyNew = true
|
||||
uniqueIPs[key] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
if !anyNew {
|
||||
noNewCounter++
|
||||
}
|
||||
|
||||
if settings.MaxNoNew > 0 && noNewCounter == settings.MaxNoNew {
|
||||
// we reached the maximum number of resolutions without
|
||||
// finding any new IP address to our unique IP addresses set.
|
||||
return noNewCounter, failCounter,
|
||||
fmt.Errorf("%w: %d times no updated for %d IP addresses found",
|
||||
ErrMaxNoNew, noNewCounter, len(uniqueIPs))
|
||||
}
|
||||
|
||||
timer := time.NewTimer(settings.BetweenDuration)
|
||||
select {
|
||||
case <-timer.C:
|
||||
return noNewCounter, failCounter, nil
|
||||
case <-ctx.Done():
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
return noNewCounter, failCounter, ctx.Err()
|
||||
case <-timedCtx.Done():
|
||||
if err := ctx.Err(); err != nil {
|
||||
// timedCtx was canceled from its parent context
|
||||
return noNewCounter, failCounter, err
|
||||
}
|
||||
return noNewCounter, failCounter,
|
||||
fmt.Errorf("%w: %s", ErrTimeout, timedCtx.Err())
|
||||
}
|
||||
}
|
||||
|
||||
func (r *repeat) lookupIPs(ctx context.Context, host string) (ips []net.IP, err error) {
|
||||
addresses, err := r.resolver.LookupIPAddr(ctx, host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ips = make([]net.IP, 0, len(addresses))
|
||||
for i := range addresses {
|
||||
ip := addresses[i].IP
|
||||
if ip == nil {
|
||||
continue
|
||||
}
|
||||
ips = append(ips, ip)
|
||||
}
|
||||
return ips, nil
|
||||
}
|
||||
3
internal/updater/resolver/resolver.go
Normal file
3
internal/updater/resolver/resolver.go
Normal file
@@ -0,0 +1,3 @@
|
||||
// Package resolver defines custom resolvers to resolve
|
||||
// hosts multiple times with adjustable settings.
|
||||
package resolver
|
||||
Reference in New Issue
Block a user