Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Feb 6, 2025
1 parent 4df9c50 commit 42d5b54
Show file tree
Hide file tree
Showing 13 changed files with 212 additions and 136 deletions.
38 changes: 36 additions & 2 deletions opteryx/compiled/joins/inner_join.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
# cython: wraparound=False
# cython: boundscheck=False

cimport numpy as cnp
cimport numpy
import numpy
from libc.stdint cimport uint8_t, int64_t

from opteryx.third_party.abseil.containers cimport FlatHashMap
from opteryx.compiled.structures.buffers cimport IntBuffer
from cpython.object cimport PyObject_Hash

cpdef FlatHashMap abs_hash_join_map(relation, list join_columns):
Expand Down Expand Up @@ -57,7 +58,7 @@ cpdef FlatHashMap abs_hash_join_map(relation, list join_columns):
combined_nulls[i] &= bit

# Get non-null indices using memory views
cdef cnp.ndarray non_null_indices = numpy.nonzero(combined_nulls)[0]
cdef numpy.ndarray non_null_indices = numpy.nonzero(combined_nulls)[0]

# Memory view for the values array (for the join columns)
cdef object[:, ::1] values_array = numpy.array(list(relation.take(non_null_indices).select(join_columns).itercolumns()), dtype=object)
Expand All @@ -78,3 +79,36 @@ cpdef FlatHashMap abs_hash_join_map(relation, list join_columns):
ht.insert(hash_value, non_null_indices[i])

return ht


cpdef tuple nested_loop_join(left_relation, right_relation, numpy.ndarray left_columns, numpy.ndarray right_columns):
cdef IntBuffer left_indexes = IntBuffer()
cdef IntBuffer right_indexes = IntBuffer()
cdef int64_t nl = left_relation.shape[0]
cdef int64_t nr = right_relation.shape[0]
cdef int64_t left_idx, right_idx
cdef int64_t left_hash_value, right_hash_value
cdef object value

cdef object[:, ::1] left_values_array = numpy.array(list(left_relation.select(left_columns).itercolumns()), dtype=object)
cdef object[:, ::1] right_values_array = numpy.array(list(right_relation.select(right_columns).itercolumns()), dtype=object)

cdef int64_t[::1] right_hashes = numpy.empty(nr, dtype=numpy.int64)
for right_idx in range(nr):
right_hash_value = 0
for value in right_values_array[:, right_idx]:
right_hash_value = <int64_t>(right_hash_value * 31 + PyObject_Hash(value))
right_hashes[right_idx] = right_hash_value

for left_idx in range(nl):

left_hash_value = 0
for value in left_values_array[:, left_idx]:
left_hash_value = <int64_t>(left_hash_value * 31 + PyObject_Hash(value))

for right_idx in range(nr):
if left_hash_value == right_hashes[right_idx]:
left_indexes.append(left_idx)
right_indexes.append(right_idx)

return (left_indexes.to_numpy(), right_indexes.to_numpy())
23 changes: 23 additions & 0 deletions opteryx/compiled/structures/buffers.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# cython: language_level=3
# cython: nonecheck=False
# cython: cdivision=True
# cython: initializedcheck=False
# cython: infer_types=True
# cython: wraparound=False
# cython: boundscheck=False

from libc.stdint cimport int64_t

import numpy
cimport numpy

cdef class IntBuffer:

cdef public int64_t[::1] _buffer
cdef public int64_t size
cdef public int64_t capacity

cpdef void append(self, int64_t value)
cpdef void extend(self, iterable)
cpdef numpy.ndarray[int64_t, ndim=1] to_numpy(self)
cpdef buffer(self)
131 changes: 3 additions & 128 deletions opteryx/compiled/structures/buffers.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,18 @@
# cython: wraparound=False
# cython: boundscheck=False

from libc.stdint cimport int64_t

import numpy
cimport numpy

from libc.stdint cimport int64_t
from libc.stdlib cimport malloc, realloc, free
from cpython.object cimport PyObject
from cpython.long cimport PyLong_AsLongLong
from cpython.sequence cimport PySequence_Fast, PySequence_Fast_GET_SIZE, PySequence_Fast_GET_ITEM
from cpython.ref cimport Py_DECREF, Py_INCREF


cdef class IntBuffer:
"""
A fast integer buffer using Cython-managed memory.
"""
cdef public int64_t[:] _buffer
cdef public size_t size
cdef public size_t capacity
# cdef public int64_t[::1] _buffer
# cdef public int64_t size
# cdef public int64_t capacity

def __cinit__(self, size_hint: int = 1024):
self.capacity = size_hint
Expand Down Expand Up @@ -57,120 +49,3 @@ cdef class IntBuffer:
cpdef buffer(self):
""" Convert the buffer to a NumPy array without copying. """
return self._buffer[:self.size]

cdef class CIntBuffer:
"""
CIntBuffer is roughly twice as fast but intermittently sigfaults, the issue appears to be
with the ownership and releasing/freeing incorrectly. Likely due to pyarrow threading/slicing
but nothing I've tried has stopped the segfaulting.
A fast growable integer buffer backed by raw C memory.
"""
cdef int64_t* data
cdef public size_t size
cdef size_t capacity
cdef bint sealed # 🚫 Prevents modifications after `to_numpy()`

def __cinit__(self, size_hint: int = 1024):
if size_hint < 1:
size_hint = 1
self.size = 0
self.capacity = size_hint
self.data = <int64_t*> malloc(self.capacity * sizeof(int64_t))
if self.data == NULL:
raise MemoryError("Failed to allocate IntBuffer")
self.sealed = False

def __dealloc__(self):
# Always free if data != NULL:
if self.data != NULL:
free(self.data)
self.data = NULL

cdef inline void ensure_capacity(self, size_t needed) except *:
"""
Ensures we have enough space. 🚫 Disabled if sealed.
"""
if self.sealed:
raise RuntimeError("Cannot modify buffer after exporting to NumPy")

cdef size_t new_cap
cdef int64_t* new_data

if needed <= self.capacity:
return

new_cap = self.capacity
while new_cap < needed:
new_cap <<= 1

new_data = <int64_t*> realloc(self.data, new_cap * sizeof(int64_t))
if new_data == NULL:
raise MemoryError("Failed to reallocate IntBuffer")

self.data = new_data
self.capacity = new_cap

cpdef void append(self, int64_t value) except *:
if self.sealed:
raise RuntimeError("Cannot append after exporting to NumPy")
if self.size == self.capacity:
self.ensure_capacity(self.size + 1)
self.data[self.size] = value
self.size += 1

cpdef void extend(self, object iterable) except *:
"""
Extend the buffer with a Python iterable of integers.
"""
if self.sealed:
raise RuntimeError("Cannot extend after exporting to NumPy")

cdef object seq = PySequence_Fast(iterable, "extend requires an iterable")
if seq is None:
raise TypeError("extend requires an iterable")

cdef Py_ssize_t length = PySequence_Fast_GET_SIZE(seq)
if length <= 0:
Py_DECREF(seq)
return

self.ensure_capacity(self.size + length)

cdef Py_ssize_t i
cdef PyObject* item
cdef int64_t value

for i in range(length):
item = PySequence_Fast_GET_ITEM(seq, i)
value = PyLong_AsLongLong(<object> item)
self.data[self.size + i] = value

self.size += length
Py_DECREF(seq)

cpdef numpy.ndarray[int64_t, ndim=1] to_numpy(self):
"""
Safely converts the buffer into a NumPy array without causing memory issues.
Returns:
A NumPy array backed by the buffer's memory (zero-copy).
"""
if self.sealed:
raise RuntimeError("Already exported to NumPy.")
self.sealed = True # Prevent further modification

# ✅ Create a NumPy array directly from the buffer
cdef numpy.ndarray[int64_t, ndim=1] numpy_array
numpy_array = numpy.PyArray_SimpleNewFromData(
1, [self.size], numpy.NPY_INT64, <void*>self.data
)

# ✅ Prevent NumPy from freeing the memory
numpy.PyArray_CLEARFLAGS(numpy_array, numpy.NPY_ARRAY_OWNDATA)

# ✅ Attach `self` as the BaseObject so NumPy keeps `CIntBuffer` alive
Py_INCREF(self) # Ensure Python keeps a reference
numpy.PyArray_SetBaseObject(numpy_array, <object>self)

return numpy_array
4 changes: 3 additions & 1 deletion opteryx/connectors/disk_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
# Define os.O_BINARY for non-Windows platforms if it's not already defined
if not hasattr(os, "O_BINARY"):
os.O_BINARY = 0 # Value has no effect on non-Windows platforms
if not hasattr(os, "O_DIRECT"):
os.O_DIRECT = 0 # Value has no effect on non-Windows platforms


def read_blob(
Expand Down Expand Up @@ -70,7 +72,7 @@ def read_blob(
import mmap

try:
file_descriptor = os.open(blob_name, os.O_RDONLY | os.O_BINARY)
file_descriptor = os.open(blob_name, os.O_RDONLY | os.O_BINARY | os.O_DIRECT)
if hasattr(os, "posix_fadvise"):
os.posix_fadvise(file_descriptor, 0, 0, os.POSIX_FADV_WILLNEED)
size = os.fstat(file_descriptor).st_size
Expand Down
4 changes: 2 additions & 2 deletions opteryx/functions/string_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,11 @@ def match_against(arr, val):
def regex_replace(array, _pattern, _replacement):
import pyarrow

from opteryx.third_party.mrabarnett import regex as re
from opteryx.third_party.mrabarnett import regex

pattern = _pattern[0]
replacement = _replacement[0]
compiled_pattern = re.compile(pattern)
compiled_pattern = regex.compile(pattern)

# Apply the regex replacement to each element in the array
vectorized_replace = numpy.vectorize(lambda x: compiled_pattern.sub(replacement, x))
Expand Down
1 change: 1 addition & 0 deletions opteryx/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

# from .information_schema_node import InformationSchemaNode # information_schema
from .inner_join_node import InnerJoinNode
from .nested_loop_join_node import NestedLoopJoinNode
from .limit_node import LimitNode # select the first N records

from .outer_join_node import OuterJoinNode
Expand Down
3 changes: 1 addition & 2 deletions opteryx/operators/inner_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from opteryx.compiled.joins.inner_join import abs_hash_join_map
from opteryx.compiled.structures import hash_join_map
from opteryx.compiled.structures.bloom_filter import create_bloom_filter
from opteryx.compiled.structures.buffers import IntBuffer as IntBuffer
from opteryx.compiled.structures.buffers import IntBuffer
from opteryx.models import QueryProperties
from opteryx.utils.arrow import align_tables

Expand Down Expand Up @@ -85,7 +85,6 @@ def __init__(self, properties: QueryProperties, **parameters):
self.right_columns = parameters.get("right_columns")

self.left_buffer = []
self.right_buffer = []
self.left_hash = None
self.left_filter = None

Expand Down
74 changes: 74 additions & 0 deletions opteryx/operators/nested_loop_join_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# See the License at http://www.apache.org/licenses/LICENSE-2.0
# Distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND.

"""
Inner (Nested Loop) Join Node
This is a SQL Query Execution Plan Node.
This is an implementation of a nested loop join, which is a simple join algorithm, it excels
when one of the relations is very small - in this situation it's many times faster than a hash
join as we don't need to create the hash table.
The Join Order Optimization Strategy will decide if this node should be used, based on the size.
"""

from threading import Lock

import numpy
import pyarrow
from pyarrow import Table

from opteryx import EOS
from opteryx.compiled.joins.inner_join import nested_loop_join
from opteryx.models import QueryProperties
from opteryx.utils.arrow import align_tables

from . import JoinNode


class NestedLoopJoinNode(JoinNode):
join_type = "nested_loop"

def __init__(self, properties: QueryProperties, **parameters):
JoinNode.__init__(self, properties=properties, **parameters)

self.left_columns = numpy.array(parameters.get("left_columns"), dtype=numpy.bytes_)
self.right_columns = numpy.array(parameters.get("right_columns"), dtype=numpy.bytes_)

self.left_relation = None
self.left_buffer = []
self.lock = Lock()

@property
def name(self): # pragma: no cover
return "Nested Loop Join"

@property
def config(self): # pragma: no cover
return ""

def execute(self, morsel: Table, join_leg: str) -> Table:
with self.lock:
if join_leg == "left":
if morsel == EOS:
self.left_relation = pyarrow.concat_tables(
self.left_buffer, promote_options="none"
)
self.left_buffer.clear()
else:
self.left_buffer.append(morsel)
yield None
return

if join_leg == "right":
if morsel == EOS:
yield EOS
return

left_indexes, right_indexes = nested_loop_join(
self.left_relation, morsel, self.left_columns, self.right_columns
)
yield align_tables(self.left_relation, morsel, left_indexes, right_indexes)
16 changes: 16 additions & 0 deletions opteryx/planner/binder/binder_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,22 @@ def visit_join(self, node: Node, context: BindingContext) -> Tuple[Node, Binding
f"CROSS JOIN UNNEST requires an ARRAY type column, not {node.unnest_column.schema_column.type}."
)

# this is very much not how we want to do this, but let's start somewhere
node.left_size = sum(
context.schemas[relation_name].row_count_metric
or context.schemas[relation_name].row_count_estimate
or float("inf")
for relation_name in node.left_relation_names
if relation_name in context.schemas
)
node.right_size = sum(
context.schemas[relation_name].row_count_metric
or context.schemas[relation_name].row_count_estimate
or float("inf")
for relation_name in node.right_relation_names
if relation_name in context.schemas
)

if node.type == "inner" and node.on is None:
from opteryx.exceptions import SqlError

Expand Down
1 change: 1 addition & 0 deletions opteryx/planner/optimizer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def __init__(self, statistics: QueryStatistics):
PredicateRewriteStrategy(statistics),
PredicatePushdownStrategy(statistics),
ProjectionPushdownStrategy(statistics),
JoinOrderingStrategy(statistics),
DistinctPushdownStrategy(statistics),
OperatorFusionStrategy(statistics),
LimitPushdownStrategy(statistics),
Expand Down
Loading

0 comments on commit 42d5b54

Please sign in to comment.