Skip to content

Commit

Permalink
Merge pull request #2294 from mabel-dev/#2230/1
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer authored Jan 24, 2025
2 parents 8a98f50 + 9973546 commit 411cee7
Show file tree
Hide file tree
Showing 18 changed files with 313 additions and 147 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 1011
__build__ = 1017

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
3 changes: 2 additions & 1 deletion opteryx/compiled/joins/cross_join.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ cimport numpy as cnp
from libc.stdint cimport int64_t

from opteryx.third_party.abseil.containers cimport FlatHashSet
from cpython.object cimport PyObject_Hash


cpdef tuple build_rows_indices_and_column(cnp.ndarray column_data):
Expand Down Expand Up @@ -147,7 +148,7 @@ cpdef tuple list_distinct(cnp.ndarray values, cnp.int64_t[::1] indices, FlatHash

for i in range(n):
v = values[i]
hash_value = <int64_t>hash(v)
hash_value = PyObject_Hash(v)
if seen_hashes.insert(hash_value):
new_values[j] = v
new_indices[j] = indices[i]
Expand Down
29 changes: 19 additions & 10 deletions opteryx/compiled/list_ops/list_ops.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
import numpy
cimport numpy as cnp
from cython import Py_ssize_t
from libc.stdint cimport int64_t
from libc.stdint cimport int64_t, uint8_t
from numpy cimport ndarray
from cpython.unicode cimport PyUnicode_AsUTF8String
from cpython.bytes cimport PyBytes_AsString
from cpython.object cimport PyObject_Hash

from opteryx.third_party.abseil.containers cimport FlatHashSet

Expand Down Expand Up @@ -326,7 +327,7 @@ cpdef cnp.ndarray[cnp.uint8_t, ndim=1] list_contains_any(cnp.ndarray array, cnp.
return res


cdef int boyer_moore_horspool(const char *haystack, size_t haystacklen, const char *needle, size_t needlelen):
cdef inline int boyer_moore_horspool(const char *haystack, size_t haystacklen, const char *needle, size_t needlelen):
"""
Case-sensitive Boyer-Moore-Horspool substring search.
Expand Down Expand Up @@ -369,7 +370,7 @@ cdef int boyer_moore_horspool(const char *haystack, size_t haystacklen, const ch
return 0 # No match found


cdef int boyer_moore_horspool_case_insensitive(const char *haystack, size_t haystacklen, const char *needle, size_t needlelen):
cdef inline int boyer_moore_horspool_case_insensitive(const char *haystack, size_t haystacklen, const char *needle, size_t needlelen):
"""
Case-insensitive Boyer-Moore-Horspool substring search.
Expand Down Expand Up @@ -430,28 +431,32 @@ cpdef cnp.ndarray[cnp.uint8_t, ndim=1] list_substring(cnp.ndarray[cnp.str, ndim=
cdef bytes needle_bytes = needle.encode('utf-8')
cdef char *c_pattern = PyBytes_AsString(needle_bytes)
cdef size_t pattern_length = len(needle_bytes)
cdef cnp.ndarray[cnp.uint8_t, ndim=1] result = numpy.zeros(n, dtype=numpy.uint8)
cdef cnp.ndarray[cnp.uint8_t, ndim=1] result = numpy.empty(n, dtype=numpy.uint8)
cdef Py_ssize_t i = 0
cdef Py_ssize_t length
cdef char *data

cdef uint8_t[::1] result_view = result

# Check the type of the first item to decide the processing method
if isinstance(haystack[0], str):
for i in range(n):
item = PyUnicode_AsUTF8String(haystack[i])
data = <char*> PyBytes_AsString(item)
length = len(item)
result_view[i] = 0
if length >= pattern_length:
if boyer_moore_horspool(data, length, c_pattern, pattern_length):
result[i] = 1
result_view[i] = 1
else:
for i in range(n):
item = haystack[i]
data = <char*> item
length = len(item)
result_view[i] = 0
if length >= pattern_length:
if boyer_moore_horspool(data, length, c_pattern, pattern_length):
result[i] = 1
result_view[i] = 1

return result

Expand All @@ -465,28 +470,32 @@ cpdef cnp.ndarray[cnp.uint8_t, ndim=1] list_substring_case_insensitive(cnp.ndarr
cdef bytes needle_bytes = needle.encode('utf-8')
cdef char *c_pattern = PyBytes_AsString(needle_bytes)
cdef size_t pattern_length = len(needle_bytes)
cdef cnp.ndarray[cnp.uint8_t, ndim=1] result = numpy.zeros(n, dtype=numpy.uint8)
cdef cnp.ndarray[cnp.uint8_t, ndim=1] result = numpy.empty(n, dtype=numpy.uint8)
cdef Py_ssize_t i = 0
cdef Py_ssize_t length
cdef char *data

cdef uint8_t[::1] result_view = result

# Check the type of the first item to decide the processing method
if isinstance(haystack[0], str):
for i in range(n):
item = PyUnicode_AsUTF8String(haystack[i])
data = <char*> PyBytes_AsString(item)
length = len(item)
result_view[i] = 0
if length >= pattern_length:
if boyer_moore_horspool_case_insensitive(data, length, c_pattern, pattern_length):
result[i] = 1
result_view[i] = 1
else:
for i in range(n):
item = haystack[i]
data = <char*> item
length = len(item)
result_view[i] = 0
if length >= pattern_length:
if boyer_moore_horspool_case_insensitive(data, length, c_pattern, pattern_length):
result[i] = 1
result_view[i] = 1

return result

Expand All @@ -498,7 +507,7 @@ cpdef FlatHashSet count_distinct(cnp.ndarray[object, ndim=1] values, FlatHashSet
object[:] values_view = values

for i in range(n):
hash_value = hash(values_view[i])
hash_value = PyObject_Hash(values_view[i])
seen_hashes.insert(hash_value)

return seen_hashes
131 changes: 128 additions & 3 deletions opteryx/compiled/structures/buffers.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,15 @@
from libc.stdint cimport int64_t

import numpy
cimport numpy as cnp
cimport numpy

from libc.stdint cimport int64_t
from libc.stdlib cimport malloc, realloc, free
from cpython.object cimport PyObject
from cpython.long cimport PyLong_AsLongLong
from cpython.sequence cimport PySequence_Fast, PySequence_Fast_GET_SIZE, PySequence_Fast_GET_ITEM
from cpython.ref cimport Py_DECREF, Py_INCREF


cdef class IntBuffer:
"""
Expand All @@ -27,7 +35,7 @@ cdef class IntBuffer:

cpdef void append(self, int64_t value):
""" Append an integer to the buffer. """
cdef cnp.ndarray[int64_t, ndim=1] new_buffer
cdef numpy.ndarray[int64_t, ndim=1] new_buffer
if self.size == self.capacity:
self.capacity *= 2
new_buffer = numpy.zeros(self.capacity, dtype=numpy.int64)
Expand All @@ -42,10 +50,127 @@ cdef class IntBuffer:
for i in range(len(iterable)):
self.append(iterable[i])

cpdef cnp.ndarray[int64_t, ndim=1] to_numpy(self):
cpdef numpy.ndarray[int64_t, ndim=1] to_numpy(self):
""" Convert the buffer to a NumPy array without copying. """
return numpy.asarray(self._buffer[:self.size])

cpdef buffer(self):
""" Convert the buffer to a NumPy array without copying. """
return self._buffer[:self.size]

cdef class CIntBuffer:
"""
CIntBuffer is roughly twice as fast but intermittently sigfaults, the issue appears to be
with the ownership and releasing/freeing incorrectly. Likely due to pyarrow threading/slicing
but nothing I've tried has stopped the segfaulting.
A fast growable integer buffer backed by raw C memory.
"""
cdef int64_t* data
cdef public size_t size
cdef size_t capacity
cdef bint sealed # 🚫 Prevents modifications after `to_numpy()`

def __cinit__(self, size_hint: int = 1024):
if size_hint < 1:
size_hint = 1
self.size = 0
self.capacity = size_hint
self.data = <int64_t*> malloc(self.capacity * sizeof(int64_t))
if self.data == NULL:
raise MemoryError("Failed to allocate IntBuffer")
self.sealed = False

def __dealloc__(self):
# Always free if data != NULL:
if self.data != NULL:
free(self.data)
self.data = NULL

cdef inline void ensure_capacity(self, size_t needed) except *:
"""
Ensures we have enough space. 🚫 Disabled if sealed.
"""
if self.sealed:
raise RuntimeError("Cannot modify buffer after exporting to NumPy")

cdef size_t new_cap
cdef int64_t* new_data

if needed <= self.capacity:
return

new_cap = self.capacity
while new_cap < needed:
new_cap <<= 1

new_data = <int64_t*> realloc(self.data, new_cap * sizeof(int64_t))
if new_data == NULL:
raise MemoryError("Failed to reallocate IntBuffer")

self.data = new_data
self.capacity = new_cap

cpdef void append(self, int64_t value) except *:
if self.sealed:
raise RuntimeError("Cannot append after exporting to NumPy")
if self.size == self.capacity:
self.ensure_capacity(self.size + 1)
self.data[self.size] = value
self.size += 1

cpdef void extend(self, object iterable) except *:
"""
Extend the buffer with a Python iterable of integers.
"""
if self.sealed:
raise RuntimeError("Cannot extend after exporting to NumPy")

cdef object seq = PySequence_Fast(iterable, "extend requires an iterable")
if seq is None:
raise TypeError("extend requires an iterable")

cdef Py_ssize_t length = PySequence_Fast_GET_SIZE(seq)
if length <= 0:
Py_DECREF(seq)
return

self.ensure_capacity(self.size + length)

cdef Py_ssize_t i
cdef PyObject* item
cdef int64_t value

for i in range(length):
item = PySequence_Fast_GET_ITEM(seq, i)
value = PyLong_AsLongLong(<object> item)
self.data[self.size + i] = value

self.size += length
Py_DECREF(seq)

cpdef numpy.ndarray[int64_t, ndim=1] to_numpy(self):
"""
Safely converts the buffer into a NumPy array without causing memory issues.
Returns:
A NumPy array backed by the buffer's memory (zero-copy).
"""
if self.sealed:
raise RuntimeError("Already exported to NumPy.")
self.sealed = True # Prevent further modification

# ✅ Create a NumPy array directly from the buffer
cdef numpy.ndarray[int64_t, ndim=1] numpy_array
numpy_array = numpy.PyArray_SimpleNewFromData(
1, [self.size], numpy.NPY_INT64, <void*>self.data
)

# ✅ Prevent NumPy from freeing the memory
numpy.PyArray_CLEARFLAGS(numpy_array, numpy.NPY_ARRAY_OWNDATA)

# ✅ Attach `self` as the BaseObject so NumPy keeps `CIntBuffer` alive
Py_INCREF(self) # Ensure Python keeps a reference
numpy.PyArray_SetBaseObject(numpy_array, <object>self)

return numpy_array
Loading

0 comments on commit 411cee7

Please sign in to comment.