-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathqueue.go
91 lines (71 loc) · 2.12 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package messages
import (
"bytes"
"context"
"database/sql"
"strings"
)
// Execer lets functions accept a DB or a Tx without knowing the difference
type Execer interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}
// A Queue represents a Vitess message queue
type Queue struct {
Name string
fieldNames []string
// predefine these sql strings
insertSQL string
insertScheduledSQL string
s *subscription
}
// NewQueue returns a queue definition but doesn't make any network requests
func NewQueue(name string, fieldNames []string) *Queue {
q := &Queue{
Name: name,
fieldNames: fieldNames,
}
// only do this string manipulation once
q.insertSQL = q.generateInsertSQL()
q.insertScheduledSQL = q.generateInsertScheduledSQL()
return q
}
// generateInsertSQL does the string manipulation to generate the insert statement
func (q *Queue) generateInsertSQL() string {
buf := bytes.Buffer{}
// generate default insert into queue with required fields
buf.WriteString("INSERT INTO `")
buf.WriteString(q.Name)
buf.WriteString("` (id")
// add quoted user fields to the insert statement
for _, f := range q.fieldNames {
buf.WriteString(", `")
buf.WriteString(f)
buf.WriteString("`")
}
buf.WriteString(") VALUES (?")
// add params representing user data
buf.WriteString(strings.Repeat(",?", len(q.fieldNames)))
// close VALUES
buf.WriteString(")")
return buf.String()
}
// generateInsertScheduledSQL does the string manipulation to generate the insertFuture statement
func (q *Queue) generateInsertScheduledSQL() string {
buf := bytes.Buffer{}
// generate default insert into queue with required fields
buf.WriteString("INSERT INTO `")
buf.WriteString(q.Name)
buf.WriteString("` (time_scheduled, id")
// add quoted user fields to the insert statement
for _, f := range q.fieldNames {
buf.WriteString(", `")
buf.WriteString(f)
buf.WriteString("`")
}
buf.WriteString(") VALUES (?,?")
// add params representing user data
buf.WriteString(strings.Repeat(",?", len(q.fieldNames)))
// close VALUES
buf.WriteString(")")
return buf.String()
}