Skip to content

Commit

Permalink
Refactor the polling
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Dec 12, 2024
1 parent eff14e2 commit 4f4b140
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 42 deletions.
2 changes: 0 additions & 2 deletions chain/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ func (b *Bridge) bridgeStream(ctx context.Context, listenConfig bridge.ListenCon
raw = idle.NewIdleConn(raw, idleTimeout)
}
backoff = time.Second / 10
b.logger.Info("Connect", "remote_address", raw.RemoteAddr().String())
go b.stepIgnoreErr(ctx, dialer, raw, dials)
}
}(i, l)
Expand Down Expand Up @@ -289,7 +288,6 @@ func (b *Bridge) bridgeProxy(ctx context.Context, listenConfig bridge.ListenConf
raw = idle.NewIdleConn(raw, idleTimeout)
}
backoff = time.Second / 10
b.logger.Info("Connect", "remote_address", raw.RemoteAddr().String())
go h.ServeConn(raw)
}
}(i, host)
Expand Down
130 changes: 99 additions & 31 deletions chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,36 @@ package chain

import (
"context"
"errors"
"fmt"
"math/rand"
"math"
"net"
"strings"
"sync"
"time"

"github.com/wzshiming/bridge"
"github.com/wzshiming/bridge/config"
"github.com/wzshiming/bridge/internal/scheme"
"github.com/wzshiming/schedialer"
"github.com/wzshiming/schedialer/plugins/probe"
"github.com/wzshiming/schedialer/plugins/roundrobin"
"github.com/wzshiming/bridge/logger"
)

// BridgeChain is a bridger that supports multiple crossing of bridger.
type BridgeChain struct {
DialerFunc func(dialer bridge.Dialer) bridge.Dialer
proto map[string]bridge.Bridger
defaultProto bridge.Bridger

backoffCount map[string]uint64
mutex sync.Mutex
}

// NewBridgeChain create a new BridgeChain.
func NewBridgeChain() *BridgeChain {
return &BridgeChain{
proto: map[string]bridge.Bridger{},
DialerFunc: NewEnvDialer,
proto: map[string]bridge.Bridger{},
DialerFunc: NewEnvDialer,
backoffCount: map[string]uint64{},
}
}

Expand All @@ -35,7 +41,7 @@ func (b *BridgeChain) BridgeChain(ctx context.Context, dialer bridge.Dialer, add
return dialer, nil
}
address := addresses[len(addresses)-1]
d, err := b.Dial(ctx, dialer, strings.Split(address, "|"), "")
d, err := b.multiDial(ctx, dialer, strings.Split(address, "|"))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -65,7 +71,7 @@ func (b *BridgeChain) bridgeChainWithConfig(ctx context.Context, dialer bridge.D
return dialer, nil
}
address := addresses[len(addresses)-1]
d, err := b.Dial(ctx, dialer, address.LB, address.Probe)
d, err := b.multiDial(ctx, dialer, address.LB)
if err != nil {
return nil, err
}
Expand All @@ -76,32 +82,16 @@ func (b *BridgeChain) bridgeChainWithConfig(ctx context.Context, dialer bridge.D
return b.bridgeChainWithConfig(ctx, d, addresses...)
}

func (b *BridgeChain) Dial(ctx context.Context, dialer bridge.Dialer, addresses []string, probeUrl string) (bridge.Dialer, error) {
if len(addresses) == 1 {
return b.dialOne(ctx, dialer, addresses[0])
}
plugins := []schedialer.Plugin{
roundrobin.NewRoundRobinWithIndex(100, rand.Uint64()%uint64(len(addresses))),
}
if probeUrl != "" {
plugins = append(plugins, probe.NewProbe(100, probeUrl))
}
plugin := schedialer.NewPlugins(plugins...)
for _, address := range addresses {
dial, err := b.dialOne(ctx, dialer, address)
if err != nil {
return nil, err
}
proxy := schedialer.Proxy{
Name: address,
Dialer: dial,
}
plugin.AddProxy(ctx, &proxy)
func (b *BridgeChain) multiDial(ctx context.Context, dialer bridge.Dialer, addresses []string) (bridge.Dialer, error) {
useCount := &backoffManager{
addresses: addresses,
dialer: dialer,
bc: b,
}
return schedialer.NewSchedialer(plugin), nil
return useCount, nil
}

func (b *BridgeChain) dialOne(ctx context.Context, dialer bridge.Dialer, address string) (bridge.Dialer, error) {
func (b *BridgeChain) singleDial(ctx context.Context, dialer bridge.Dialer, address string) (bridge.Dialer, error) {
sch, _, ok := scheme.SplitSchemeAddr(address)
if !ok {
return nil, fmt.Errorf("unsupported protocol format %q", address)
Expand All @@ -126,3 +116,81 @@ func (b *BridgeChain) Register(name string, bridger bridge.Bridger) error {
func (b *BridgeChain) RegisterDefault(bridger bridge.Bridger) {
b.defaultProto = bridger
}

type backoffManager struct {
addresses []string
dialer bridge.Dialer

bc *BridgeChain
}

func (u *backoffManager) useLeastAndCount(addresses []string) string {
if len(addresses) == 1 {
return addresses[0]
}
min := uint64(math.MaxUint64)

u.bc.mutex.Lock()
defer u.bc.mutex.Unlock()

var minAddress string
for _, address := range addresses {
if u.bc.backoffCount[address] < min {
min = u.bc.backoffCount[address]
minAddress = address
}
}
u.bc.backoffCount[minAddress]++
return minAddress
}

func (u *backoffManager) backoff(address string, count uint64) {
u.bc.mutex.Lock()
defer u.bc.mutex.Unlock()
u.bc.backoffCount[address] += count
}

func (u *backoffManager) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
var errs []error

timeout := time.Second * 10
dl, ok := ctx.Deadline()
if ok {
timeout = time.Until(dl)
}

period := timeout / time.Duration(len(u.addresses))
if period < time.Second {
period = time.Second
} else if period > 5*time.Second {
period = 5 * time.Second
}

for i := 0; i < len(u.addresses); i++ {
addr := u.useLeastAndCount(u.addresses)

ctx, cancel := context.WithTimeout(ctx, period)

dialer, err := u.bc.singleDial(ctx, u.dialer, addr)
if err != nil {
errs = append(errs, err)
logger.Std.Warn("failed dial", "err", err, "previous", addr)
cancel()
u.backoff(addr, 10)
continue
}
conn, err := dialer.DialContext(ctx, network, address)
if err != nil {
errs = append(errs, err)
logger.Std.Warn("failed dial target", "err", err, "previous", addr, "target", address)
cancel()
u.backoff(addr, 5)
continue
}

logger.Std.Info("success dial target", "previous", addr, "target", address)
cancel()
return conn, nil
}
return nil, fmt.Errorf("all addresses are failed: %w", errors.Join(errs...))
}
1 change: 0 additions & 1 deletion cmd/bridge/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func run(ctx context.Context, log *slog.Logger, tasks []config.Chain) {
}
go func(task config.Chain) {
defer wg.Done()
log := log.With("chains", task)
log.Info(chain.ShowChainWithConfig(task))
b := chain.NewBridge(log, dump)
err := b.BridgeWithConfig(ctx, task)
Expand Down
1 change: 0 additions & 1 deletion cmd/bridge/main_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func runWithReload(ctx context.Context, log *slog.Logger, tasks []config.Chain,
wg.Add(1)
go func(ctx context.Context, task config.Chain) {
defer wg.Done()
log := log.With("chains", task)
log.Info(chain.ShowChainWithConfig(task))
for ctx.Err() == nil {
b := chain.NewBridge(log, dump)
Expand Down
3 changes: 1 addition & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ func (c Chain) Unique() string {
}

type Node struct {
Probe string `json:"probe"`
LB []string `json:"lb"`
LB []string `json:"lb"`
}

func (m Node) MarshalJSON() ([]byte, error) {
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ require (
github.com/wzshiming/httpproxy v0.5.6
github.com/wzshiming/notify v0.1.1
github.com/wzshiming/permuteproxy v0.0.2
github.com/wzshiming/schedialer v0.6.1
github.com/wzshiming/shadowsocks v0.4.1
github.com/wzshiming/socks4 v0.3.2
github.com/wzshiming/socks5 v0.5.1
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ github.com/wzshiming/notify v0.1.1 h1:rJXoszrkNglhCVyn/IfW500f5cW03q1q7YzL8hsLch
github.com/wzshiming/notify v0.1.1/go.mod h1:SFhsQKZJznzsDcj/Qfo9A65k5IRcpUrpgbLRzZEa/DI=
github.com/wzshiming/permuteproxy v0.0.2 h1:svedMueotlxJk9oJfA0gs8WzRYOdgd0DER9XvKpjwlY=
github.com/wzshiming/permuteproxy v0.0.2/go.mod h1:Ny08A1JbuljB8FeJAOiB7dfvRGCVD8PB9hwrALIvYI8=
github.com/wzshiming/schedialer v0.6.1 h1:4VwtIjVF3uMoWqjbyw3oqYi7WGOEYvDu3L9OYT8sbGY=
github.com/wzshiming/schedialer v0.6.1/go.mod h1:TvVxg4QZIBTJzRfmL/G7g6CzynFQKPmtXtSeJ2c4Lus=
github.com/wzshiming/shadowsocks v0.4.1 h1:tyLYtLSQs90jpMIkD+T8KuZH5foXwOH0ZjxSOb45orI=
github.com/wzshiming/shadowsocks v0.4.1/go.mod h1:CfKm/Keclli2sPGfjskGVH+F3gpF0YPVdcf5t4krypY=
github.com/wzshiming/socks4 v0.3.2 h1:w87nwfgRWteVwIH39nqTur8c+2dcODeLgLrWspcUkSc=
Expand Down
9 changes: 7 additions & 2 deletions logger/logger.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package logger

import (
"context"
"fmt"
"log/slog"
"os"
)

var Std = slog.Default()
var Std = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
AddSource: true,
}))

func Wrap(logger *slog.Logger, name string) *wrap {
return &wrap{
Expand All @@ -18,5 +23,5 @@ type wrap struct {
}

func (w wrap) Println(v ...interface{}) {
w.Logger.Info(fmt.Sprintln(v...))
w.Logger.Log(context.Background(), slog.LevelInfo, fmt.Sprintln(v...))
}

0 comments on commit 4f4b140

Please sign in to comment.