Skip to content

Commit

Permalink
Merge pull request #2281 from mabel-dev/#2279
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer authored Jan 20, 2025
2 parents a7dc778 + 456b37e commit 607f67e
Show file tree
Hide file tree
Showing 11 changed files with 7,762 additions and 26 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 999
__build__ = 1003

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
1 change: 1 addition & 0 deletions opteryx/compiled/structures/bloom_filter.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ cdef class BloomFilter:
cdef uint32_t byte_array_size

cpdef void add(self, bytes member)
cdef inline void _add(self, bytes member)
cpdef bint possibly_contains(self, bytes member)
cdef inline bint _possibly_contains(self, bytes member)
cpdef cnp.ndarray[cnp.npy_bool, ndim=1] possibly_contains_many(self, cnp.ndarray keys)
Expand Down
79 changes: 57 additions & 22 deletions opteryx/compiled/structures/bloom_filter.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,32 @@
# cython: boundscheck=False

"""
This is not a general perpose Bloom Filter, if used outside of Draken, it may not
perform entirely as expected as it is optimized for a specific configuration.
This is not a general perpose Bloom Filter, if used outside Opteryx it may not
perform entirely as expected as it is optimized for a specific configuration
and constraints.
We have two size options, both using 2 hashes:
- A 512k slot bit array for up to 50k items (about 3% FPR)
- a 8m slot bit array for up to 1m items (about 2% FPR)
We perform one hash and then use a calculation based on the golden ratio to
determine the second position.
determine the second position. This is cheaper than performing two hashes whilst
still providing a good enough split of the two hashes.
The primary use for this structure is to prefilter JOINs, it is many times faster
(about 20x from initial benchmarking) to test for containment in the bloom filter
that to look up the item in the hash table.
Building the filter is fast - for tables up to 1 million records we create the filter
(1m records is roughly a 0.07s build). If the filter isn't effective (less that 5%
eliminations) we discard it which has meant some waste work.
"""

from libc.stdlib cimport malloc, free
from libc.string cimport memset, memcpy
from libc.stdint cimport uint8_t

from opteryx.compiled.functions.murmurhash3_32 cimport cy_murmurhash3
from opteryx.third_party.cyan4973.xxhash cimport cy_xxhash3_64

import numpy
cimport numpy as cnp
Expand All @@ -41,9 +52,10 @@ cdef inline void set_bit(unsigned char* bit_array, uint32_t bit):
bit_array[byte_idx] |= 1 << bit_idx

cdef class BloomFilter:
# cdef unsigned char* bit_array # defined in the .pxd file only
# cdef uint32_t bit_array_size
# cdef uint32_t byte_array_size
# defined in the .pxd file only - here so they aren't magic
# cdef unsigned char* bit_array
# cdef uint32_t bit_array_size
# cdef uint32_t byte_array_size

def __cinit__(self, uint32_t expected_records=50000):
"""Initialize Bloom Filter based on expected number of records."""
Expand All @@ -66,47 +78,64 @@ cdef class BloomFilter:
if self.bit_array:
free(self.bit_array)

cpdef void add(self, bytes member):
cdef inline void _add(self, bytes member):
cdef uint32_t item, h1, h2

item = cy_murmurhash3(<char*>member, len(member), 0)
item = cy_xxhash3_64(<char*>member, len(member))
h1 = item & (self.bit_array_size - 1)
# Apply the golden ratio to the item and use modulo to wrap within the size of the bit array
# Apply the golden ratio to the item and use a mask to keep within the
# size of the bit array.
h2 = (item * 2654435769U) & (self.bit_array_size - 1)
# Set bits
set_bit(self.bit_array, h1)
set_bit(self.bit_array, h2)

cpdef void add(self, bytes member):
self._add(member)

cdef inline bint _possibly_contains(self, bytes member):
"""Check if the item might be in the set"""
cdef uint32_t item, h1, h2

item = cy_murmurhash3(<char*>member, len(member), 0)
item = cy_xxhash3_64(<char*>member, len(member))
h1 = item & (self.bit_array_size - 1)
# Apply the golden ratio to the item and mask within the size of the bit array
h2 = (item * 2654435769U) & (self.bit_array_size - 1)
# Check bits using bitwise AND
return ((self.bit_array[h1 >> 3] & (1 << (h1 & 7))) != 0) and \
((self.bit_array[h2 >> 3] & (1 << (h2 & 7))) != 0)

cpdef bint possibly_contains(self, bytes member):
return self._possibly_contains(member)

cpdef cnp.ndarray[cnp.npy_bool, ndim=1] possibly_contains_many(self, cnp.ndarray keys):
"""
Return a boolean array indicating whether each key might be in the Bloom filter.
Parameters:
keys: cnp.ndarray
Array of keys to test for membership.
Returns:
A boolean array of the same length as `keys` with True or False values.
"""
cdef Py_ssize_t i
cdef Py_ssize_t n = len(keys)
cdef cnp.ndarray[cnp.npy_bool, ndim=1] result = numpy.zeros(n, dtype=bool)
cdef Py_ssize_t n = keys.shape[0]

# Create an uninitialized bool array rather than a zeroed one
cdef cnp.ndarray[cnp.npy_bool, ndim=1] result = numpy.empty(n, dtype=numpy.bool_)

# Wrap both `keys` and `result` in typed memory views for faster indexing
cdef object[::1] keys_view = keys
cdef uint8_t[::1] result_view = result

for i in range(n):
key = keys[i]
if key is not None and self._possibly_contains(key):
result[i] = 1
result_view[i] = False if keys_view[i] is None else self._possibly_contains(keys_view[i])

return result

cpdef memoryview serialize(self):
"""Serialize the Bloom filter to a memory view"""
return memoryview(self.bit_array[:self.byte_array_size])


cpdef BloomFilter deserialize(const unsigned char* data):
"""Deserialize a memory view to a Bloom filter"""
cdef BloomFilter bf = BloomFilter()
Expand All @@ -115,11 +144,17 @@ cpdef BloomFilter deserialize(const unsigned char* data):


cpdef BloomFilter create_bloom_filter(keys):
cdef BloomFilter bf = BloomFilter(len(keys))

cdef Py_ssize_t n = len(keys)
cdef Py_ssize_t i
cdef BloomFilter bf = BloomFilter(n)

keys = keys.drop_null()
keys = keys.cast(pyarrow.binary()).to_numpy(False)
for key in keys:
bf.add(key)

cdef object[::1] keys_view = keys # Memory view for fast access

for i in range(len(keys)):
bf._add(keys_view[i])

return bf
28 changes: 28 additions & 0 deletions opteryx/compiled/third_party/xxhash.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# cython: language_level=3
# cython: boundscheck=False
# cython: wraparound=False
# cython: nonecheck=False
# cython: overflowcheck=False
# cython: cdivision=True
# distutils: language=c++

from libc.stdint cimport uint64_t
from libc.stddef cimport size_t
from cpython.bytes cimport PyBytes_AsStringAndSize

# Import xxHash function signatures from `xxhash.h`
cdef extern from "xxhash.h":
uint64_t XXH3_64bits(const void* input, size_t length) nogil

cdef inline uint64_t cy_xxhash3_64(const void *key, size_t len) nogil:
return XXH3_64bits(key, len)

cpdef uint64_t hash_bytes(bytes key):
""" Python-accessible function for hashing bytes. """
cdef char* data
cdef Py_ssize_t length

if PyBytes_AsStringAndSize(key, &data, &length) != 0:
raise ValueError("Invalid byte string")

return cy_xxhash3_64(data, length)
6 changes: 3 additions & 3 deletions opteryx/operators/inner_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ def execute(self, morsel: Table, join_leg: str) -> Table:
morsel = morsel.filter(maybe_in_left)

# If the bloom filter is not effective, disable it.
# In basic benchmarks, the bloom filter is ~15x the speed of the join.
# so the break-even point is about 7% of the rows being eliminated.
# In basic benchmarks, the bloom filter is ~20x the speed of the join.
# so the break-even point is about 5% of the rows being eliminated.
eliminated_rows = len(maybe_in_left) - morsel.num_rows
if eliminated_rows < 0.1 * morsel.num_rows:
if eliminated_rows < 0.05 * len(maybe_in_left):
self.left_filter = None
self.statistics.feature_dynamically_disabled_bloom_filter += 1

Expand Down
12 changes: 12 additions & 0 deletions opteryx/third_party/cyan4973/xxhash.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# cython: language_level=3
# cython: boundscheck=False
# cython: wraparound=False
# cython: nonecheck=False
# cython: overflowcheck=False
# cython: lintrule=ignore

from libc.stdint cimport uint64_t
from libc.stddef cimport size_t

cdef uint64_t cy_xxhash3_64(const void *key, size_t len) nogil
cpdef uint64_t hash_bytes(bytes key)
10 changes: 10 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ def rust_build(setup_kwargs: Dict[str, Any]) -> None:
extra_compile_args=COMPILE_FLAGS + ["-std=c++17"],
extra_link_args=["-Lthird_party/abseil"], # Link Abseil library
),
Extension(
name="opteryx.third_party.cyan4973.xxhash",
sources=[
"opteryx/compiled/third_party/xxhash.pyx",
"third_party/cyan4973/xxhash.c"
],
include_dirs=include_dirs + ["third_party/cyan4973"],
extra_compile_args=COMPILE_FLAGS,
extra_link_args=["-Lthird_party/cyan4973"], # Link Abseil library
),
Extension(
name="opteryx.compiled.functions.functions",
sources=["opteryx/compiled/functions/functions.pyx"],
Expand Down
26 changes: 26 additions & 0 deletions third_party/cyan4973/LICENSE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
xxHash Library
Copyright (c) 2012-2021 Yann Collet
All rights reserved.

BSD 2-Clause License (https://www.opensource.org/licenses/bsd-license.php)

Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice, this
list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Loading

0 comments on commit 607f67e

Please sign in to comment.