-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathhll.go
146 lines (125 loc) · 3.84 KB
/
hll.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package zetasketch
import (
"fmt"
"github.com/bsm/zetasketch/hllplus"
"github.com/bsm/zetasketch/internal/zetasketch"
pb "github.com/bsm/zetasketch/internal/zetasketch"
"google.golang.org/protobuf/proto"
)
// HLL implements a HLL++ aggregator for estimating cardinalities of multisets.
//
// The precision defines the accuracy of the HLL++ aggregator at the cost of the memory used. The
// upper bound on the memory required is 2^precision bytes, but less memory is used for
// smaller cardinalities. The relative error is 1.04 / sqrt(2^precision).
// A typical value used at Google is 15, which gives an error of about 0.6% while requiring an upper
// bound of 32KiB of memory.
//
// Note that this aggregator is not designed to be thread safe.
type HLL struct {
h *hllplus.HLL
n int64
}
// NewHLL inits a new HLL++ aggregator.
func NewHLL(cfg *HLLConfig) *HLL {
h, err := hllplus.New(cfg.precision(), cfg.sparsePrecision())
if err != nil {
panic(err)
}
return &HLL{h: h}
}
// Add adds value v to the aggregator.
func (h *HLL) Add(v Value) {
h.n++
h.h.Add(v.Sum64())
}
// NumValues returns the number of values seen.
func (h *HLL) NumValues() int64 {
return h.n
}
// Merge merges aggregator other into h.
func (h *HLL) Merge(other Aggregator) error {
h2, ok := other.(*HLL)
if !ok {
return fmt.Errorf("cannot merge %T into %T", other, h)
}
h.h.Merge(h2.h)
h.n += h2.n
return nil
}
// Result returns an estimate of the unique of values.
func (h *HLL) Result() int64 {
return h.h.Estimate()
}
// MarshalBinary serializes aggregator to bytes.
func (h *HLL) MarshalBinary() ([]byte, error) {
return proto.Marshal(h.proto())
}
// UnmarshalBinary deserializes aggregator from bytes.
func (h *HLL) UnmarshalBinary(data []byte) error {
msg := new(pb.AggregatorStateProto)
if err := proto.Unmarshal(data, msg); err != nil {
return err
}
return h.fromProto(msg)
}
func (h *HLL) proto() *pb.AggregatorStateProto {
var (
encodingVersion int32 = 2
aggType = pb.AggregatorType_HYPERLOGLOG_PLUS_UNIQUE
numValues = int64(h.n)
)
msg := &pb.AggregatorStateProto{
Type: &aggType,
EncodingVersion: &encodingVersion,
NumValues: &numValues,
}
proto.SetExtension(msg, zetasketch.E_HyperloglogplusUniqueState, h.h.Proto())
return msg
}
func (h *HLL) fromProto(msg *pb.AggregatorStateProto) error {
if msg.GetType() != pb.AggregatorType_HYPERLOGLOG_PLUS_UNIQUE {
return fmt.Errorf("incompatible binary message: unexpected type %s", msg.GetType().String())
}
if msg.GetEncodingVersion() != 2 {
return fmt.Errorf("incompatible binary message: unsupported encoding version %#v", msg.GetEncodingVersion())
}
if msg.NumValues == nil {
return fmt.Errorf("incompatible binary message: no num values")
}
ext := proto.GetExtension(msg, zetasketch.E_HyperloglogplusUniqueState)
hState, ok := ext.(*pb.HyperLogLogPlusUniqueStateProto)
if !ok {
return fmt.Errorf("incompatible binary message: invalid HyperLogLog++ state")
}
hll, err := hllplus.NewFromProto(hState)
if err != nil {
return err
}
h.h = hll
h.n = msg.GetNumValues()
return nil
}
// -----------------------------------------------------------------------
// HLLConfig speficies the configuration parameters for the HLL++ aggregator.
type HLLConfig struct {
// Defaults to 15.
Precision uint8
// If no sparse precision is specified, the default is calculated as precision + 5.
SparsePrecision uint8
}
func (c *HLLConfig) precision() uint8 {
if c != nil && c.Precision >= hllplus.MinPrecision && c.Precision <= hllplus.MaxPrecision {
return c.Precision
}
return 15
}
func (c *HLLConfig) sparsePrecision() uint8 {
min := c.precision()
if c != nil && c.SparsePrecision >= min && c.SparsePrecision <= hllplus.MaxSparsePrecision {
return c.SparsePrecision
}
if n := min + 5; n <= hllplus.MaxSparsePrecision {
return n
}
return hllplus.MaxSparsePrecision
}