chore(all): memory and thread safe storage

- settings: get filter choices from storage for settings validation
- updater: update servers to the storage
- storage: minimal deep copying and data duplication
- storage: add merged servers mutex for thread safety
- connection: filter servers in storage
- formatter: format servers to Markdown in storage
- PIA: get server by name from storage directly
- Updater: get servers count from storage directly
- Updater: equality check done in storage, fix #882
This commit is contained in:
Quentin McGaw
2022-06-05 14:58:46 +00:00
parent 1e6b4ed5eb
commit 36b504609b
84 changed files with 1267 additions and 877 deletions

View File

@@ -9,7 +9,6 @@ import (
"github.com/qdm12/gluetun/internal/configuration/settings"
"github.com/qdm12/gluetun/internal/constants"
"github.com/qdm12/gluetun/internal/models"
"github.com/qdm12/gluetun/internal/storage"
"github.com/qdm12/gluetun/internal/updater"
)
@@ -24,16 +23,14 @@ type Looper interface {
}
type Updater interface {
UpdateServers(ctx context.Context) (allServers models.AllServers, err error)
UpdateServers(ctx context.Context) (err error)
}
type looper struct {
state state
// Objects
updater Updater
flusher storage.Flusher
setAllServers func(allServers models.AllServers)
logger Logger
updater Updater
logger Logger
// Internal channels and locks
loopLock sync.Mutex
start chan struct{}
@@ -49,32 +46,35 @@ type looper struct {
const defaultBackoffTime = 5 * time.Second
type Storage interface {
SetServers(provider string, servers []models.Server) (err error)
GetServersCount(provider string) (count int)
ServersAreEqual(provider string, servers []models.Server) (equal bool)
}
type Logger interface {
Info(s string)
Warn(s string)
Error(s string)
}
func NewLooper(settings settings.Updater, currentServers models.AllServers,
flusher storage.Flusher, setAllServers func(allServers models.AllServers),
func NewLooper(settings settings.Updater, storage Storage,
client *http.Client, logger Logger) Looper {
return &looper{
state: state{
status: constants.Stopped,
settings: settings,
},
updater: updater.New(settings, client, currentServers, logger),
flusher: flusher,
setAllServers: setAllServers,
logger: logger,
start: make(chan struct{}),
running: make(chan models.LoopStatus),
stop: make(chan struct{}),
stopped: make(chan struct{}),
updateTicker: make(chan struct{}),
timeNow: time.Now,
timeSince: time.Since,
backoffTime: defaultBackoffTime,
updater: updater.New(settings, client, storage, logger),
logger: logger,
start: make(chan struct{}),
running: make(chan models.LoopStatus),
stop: make(chan struct{}),
stopped: make(chan struct{}),
updateTicker: make(chan struct{}),
timeNow: time.Now,
timeSince: time.Since,
backoffTime: defaultBackoffTime,
}
}
@@ -106,20 +106,19 @@ func (l *looper) Run(ctx context.Context, done chan<- struct{}) {
for ctx.Err() == nil {
updateCtx, updateCancel := context.WithCancel(ctx)
serversCh := make(chan models.AllServers)
errorCh := make(chan error)
runWg := &sync.WaitGroup{}
runWg.Add(1)
go func() {
defer runWg.Done()
servers, err := l.updater.UpdateServers(updateCtx)
err := l.updater.UpdateServers(updateCtx)
if err != nil {
if updateCtx.Err() == nil {
errorCh <- err
}
return
}
serversCh <- servers
l.state.setStatusWithLock(constants.Completed)
}()
if !crashed {
@@ -148,16 +147,7 @@ func (l *looper) Run(ctx context.Context, done chan<- struct{}) {
updateCancel()
runWg.Wait()
l.stopped <- struct{}{}
case servers := <-serversCh:
l.setAllServers(servers)
if err := l.flusher.FlushToFile(&servers); err != nil {
l.logger.Error(err.Error())
}
runWg.Wait()
l.state.setStatusWithLock(constants.Completed)
l.logger.Info("Updated servers information")
case err := <-errorCh:
close(serversCh)
runWg.Wait()
l.state.setStatusWithLock(constants.Crashed)
l.logAndWait(ctx, err)

View File

@@ -3,8 +3,6 @@ package updater
import (
"context"
"fmt"
"reflect"
"time"
"github.com/qdm12/gluetun/internal/constants/providers"
"github.com/qdm12/gluetun/internal/models"
@@ -31,18 +29,25 @@ import (
)
func (u *Updater) updateProvider(ctx context.Context, provider string) (err error) {
existingServers := u.getProviderServers(provider)
minServers := getMinServers(existingServers)
existingServersCount := u.storage.GetServersCount(provider)
minServers := getMinServers(existingServersCount)
servers, err := u.getServers(ctx, provider, minServers)
if err != nil {
return err
return fmt.Errorf("cannot get servers: %w", err)
}
if reflect.DeepEqual(existingServers, servers) {
if u.storage.ServersAreEqual(provider, servers) {
return nil
}
u.patchProvider(provider, servers)
// Note the servers variable must NOT BE MUTATED after this call,
// since the implementation does not deep copy the servers.
// TODO set in storage in provider updater directly, server by server,
// to avoid accumulating server data in memory.
err = u.storage.SetServers(provider, servers)
if err != nil {
return fmt.Errorf("cannot set servers to storage: %w", err)
}
return nil
}
@@ -101,25 +106,7 @@ func (u *Updater) getServers(ctx context.Context, provider string,
return providerUpdater.GetServers(ctx, minServers)
}
func (u *Updater) getProviderServers(provider string) (servers []models.Server) {
providerServers, ok := u.servers.ProviderToServers[provider]
if !ok {
panic(fmt.Sprintf("provider %s is unknown", provider))
}
return providerServers.Servers
}
func getMinServers(servers []models.Server) (minServers int) {
func getMinServers(existingServersCount int) (minServers int) {
const minRatio = 0.8
return int(minRatio * float64(len(servers)))
}
func (u *Updater) patchProvider(provider string, servers []models.Server) {
providerServers, ok := u.servers.ProviderToServers[provider]
if !ok {
panic(fmt.Sprintf("provider %s is unknown", provider))
}
providerServers.Timestamp = time.Now().Unix()
providerServers.Servers = servers
u.servers.ProviderToServers[provider] = providerServers
return int(minRatio * float64(existingServersCount))
}

View File

@@ -19,7 +19,7 @@ type Updater struct {
options settings.Updater
// state
servers models.AllServers
storage Storage
// Functions for tests
logger Logger
@@ -29,6 +29,12 @@ type Updater struct {
unzipper unzip.Unzipper
}
type Storage interface {
SetServers(provider string, servers []models.Server) (err error)
GetServersCount(provider string) (count int)
ServersAreEqual(provider string, servers []models.Server) (equal bool)
}
type Logger interface {
Info(s string)
Warn(s string)
@@ -36,20 +42,20 @@ type Logger interface {
}
func New(settings settings.Updater, httpClient *http.Client,
currentServers models.AllServers, logger Logger) *Updater {
storage Storage, logger Logger) *Updater {
unzipper := unzip.New(httpClient)
return &Updater{
options: settings,
storage: storage,
logger: logger,
timeNow: time.Now,
presolver: resolver.NewParallelResolver(settings.DNSAddress.String()),
client: httpClient,
unzipper: unzipper,
options: settings,
servers: currentServers,
}
}
func (u *Updater) UpdateServers(ctx context.Context) (allServers models.AllServers, err error) {
func (u *Updater) UpdateServers(ctx context.Context) (err error) {
caser := cases.Title(language.English)
for _, provider := range u.options.Providers {
u.logger.Info("updating " + caser.String(provider) + " servers...")
@@ -62,17 +68,17 @@ func (u *Updater) UpdateServers(ctx context.Context) (allServers models.AllServe
// return the only error for the single provider.
if len(u.options.Providers) == 1 {
return allServers, err
return err
}
// stop updating the next providers if context is canceled.
if ctxErr := ctx.Err(); ctxErr != nil {
return allServers, ctxErr
return ctxErr
}
// Log the error and continue updating the next provider.
u.logger.Error(err.Error())
}
return u.servers, nil
return nil
}