-
Notifications
You must be signed in to change notification settings - Fork 85
/
Copy pathstart.go
191 lines (161 loc) · 4.96 KB
/
start.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package relayer
import (
"context"
"fmt"
"log"
"net"
"net/http"
"os"
"strconv"
"sync"
"time"
"github.com/fasthttp/websocket"
"github.com/nbd-wtf/go-nostr"
"github.com/rs/cors"
"golang.org/x/time/rate"
)
// Server is a base for package users to implement nostr relays.
// It can serve HTTP requests and websockets, passing control over to a relay implementation.
//
// To implement a relay, it is enough to satisfy [Relay] interface. Other interfaces are
// [Informationer], [CustomWebSocketHandler], [ShutdownAware] and AdvancedXxx types.
// See their respective doc comments.
//
// The basic usage is to call Start or StartConf, which starts serving immediately.
// For a more fine-grained control, use NewServer.
// See [basic/main.go], [whitelisted/main.go], [expensive/main.go] and [rss-bridge/main.go]
// for example implementations.
//
// The following resource is a good starting point for details on what nostr protocol is
// and how it works: https://github.com/nostr-protocol/nostr
type Server struct {
// Default logger, as set by NewServer, is a stdlib logger prefixed with [Relay.Name],
// outputting to stderr.
Log Logger
options *Options
relay Relay
// keep a connection reference to all connected clients for Server.Shutdown
clientsMu sync.Mutex
clients map[*websocket.Conn]struct{}
// in case you call Server.Start
Addr string
serveMux *http.ServeMux
httpServer *http.Server
}
func (s *Server) Router() *http.ServeMux {
return s.serveMux
}
// NewServer initializes the relay and its storage using their respective Init methods,
// returning any non-nil errors, and returns a Server ready to listen for HTTP requests.
func NewServer(relay Relay, opts ...Option) (*Server, error) {
options := DefaultOptions()
for _, opt := range opts {
opt(options)
}
srv := &Server{
Log: defaultLogger(relay.Name() + ": "),
relay: relay,
clients: make(map[*websocket.Conn]struct{}),
serveMux: &http.ServeMux{},
options: options,
}
if storage := relay.Storage(context.Background()); storage != nil {
if err := storage.Init(); err != nil {
return nil, fmt.Errorf("storage init: %w", err)
}
}
// init the relay
if err := relay.Init(); err != nil {
return nil, fmt.Errorf("relay init: %w", err)
}
// start listening from events from other sources, if any
if inj, ok := relay.(Injector); ok {
go func() {
for event := range inj.InjectEvents() {
notifyListeners(&event)
}
}()
}
return srv, nil
}
// ServeHTTP implements http.Handler interface.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Upgrade") == "websocket" {
s.HandleWebsocket(w, r)
} else if r.Header.Get("Accept") == "application/nostr+json" {
s.HandleNIP11(w, r)
} else {
s.serveMux.ServeHTTP(w, r)
}
}
func (s *Server) Start(host string, port int, started ...chan bool) error {
addr := net.JoinHostPort(host, strconv.Itoa(port))
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
s.Addr = ln.Addr().String()
s.httpServer = &http.Server{
Handler: cors.Default().Handler(s),
Addr: addr,
WriteTimeout: 2 * time.Second,
ReadTimeout: 2 * time.Second,
IdleTimeout: 30 * time.Second,
}
// notify caller that we're starting
for _, started := range started {
close(started)
}
if err := s.httpServer.Serve(ln); err == http.ErrServerClosed {
return nil
} else if err != nil {
return err
} else {
return nil
}
}
// Shutdown sends a websocket close control message to all connected clients.
//
// If the relay is ShutdownAware, Shutdown calls its OnShutdown, passing the context as is.
// Note that the HTTP server make some time to shutdown and so the context deadline,
// if any, may have been shortened by the time OnShutdown is called.
func (s *Server) Shutdown(ctx context.Context) {
s.httpServer.Shutdown(ctx)
s.clientsMu.Lock()
defer s.clientsMu.Unlock()
for conn := range s.clients {
conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Second))
conn.Close()
delete(s.clients, conn)
}
if f, ok := s.relay.(ShutdownAware); ok {
f.OnShutdown(ctx)
}
}
type Option func(*Options)
type Options struct {
perConnectionLimiter *rate.Limiter
skipEventFunc func(*nostr.Event) bool
}
func DefaultOptions() *Options {
return &Options{}
}
func WithPerConnectionLimiter(rps rate.Limit, burst int) Option {
return func(o *Options) {
o.perConnectionLimiter = rate.NewLimiter(rps, burst)
}
}
func WithSkipEventFunc(skipEventFunc func(*nostr.Event) bool) Option {
return func(o *Options) {
o.skipEventFunc = skipEventFunc
}
}
func defaultLogger(prefix string) Logger {
l := log.New(os.Stderr, "", log.LstdFlags|log.Lmsgprefix)
l.SetPrefix(prefix)
return stdLogger{l}
}
type stdLogger struct{ log *log.Logger }
func (l stdLogger) Infof(format string, v ...any) { l.log.Printf(format, v...) }
func (l stdLogger) Warningf(format string, v ...any) { l.log.Printf(format, v...) }
func (l stdLogger) Errorf(format string, v ...any) { l.log.Printf(format, v...) }