diff --git a/opteryx/compiled/joins/inner_join.pyx b/opteryx/compiled/joins/inner_join.pyx index ea8a800d..596e6e70 100644 --- a/opteryx/compiled/joins/inner_join.pyx +++ b/opteryx/compiled/joins/inner_join.pyx @@ -6,11 +6,12 @@ # cython: wraparound=False # cython: boundscheck=False -cimport numpy as cnp +cimport numpy import numpy from libc.stdint cimport uint8_t, int64_t from opteryx.third_party.abseil.containers cimport FlatHashMap +from opteryx.compiled.structures.buffers cimport IntBuffer from cpython.object cimport PyObject_Hash cpdef FlatHashMap abs_hash_join_map(relation, list join_columns): @@ -57,7 +58,7 @@ cpdef FlatHashMap abs_hash_join_map(relation, list join_columns): combined_nulls[i] &= bit # Get non-null indices using memory views - cdef cnp.ndarray non_null_indices = numpy.nonzero(combined_nulls)[0] + cdef numpy.ndarray non_null_indices = numpy.nonzero(combined_nulls)[0] # Memory view for the values array (for the join columns) cdef object[:, ::1] values_array = numpy.array(list(relation.take(non_null_indices).select(join_columns).itercolumns()), dtype=object) @@ -78,3 +79,36 @@ cpdef FlatHashMap abs_hash_join_map(relation, list join_columns): ht.insert(hash_value, non_null_indices[i]) return ht + + +cpdef tuple nested_loop_join(left_relation, right_relation, numpy.ndarray left_columns, numpy.ndarray right_columns): + cdef IntBuffer left_indexes = IntBuffer() + cdef IntBuffer right_indexes = IntBuffer() + cdef int64_t nl = left_relation.shape[0] + cdef int64_t nr = right_relation.shape[0] + cdef int64_t left_idx, right_idx + cdef int64_t left_hash_value, right_hash_value + cdef object value + + cdef object[:, ::1] left_values_array = numpy.array(list(left_relation.select(left_columns).itercolumns()), dtype=object) + cdef object[:, ::1] right_values_array = numpy.array(list(right_relation.select(right_columns).itercolumns()), dtype=object) + + cdef int64_t[::1] right_hashes = numpy.empty(nr, dtype=numpy.int64) + for right_idx in range(nr): + right_hash_value = 0 + for value in right_values_array[:, right_idx]: + right_hash_value = (right_hash_value * 31 + PyObject_Hash(value)) + right_hashes[right_idx] = right_hash_value + + for left_idx in range(nl): + + left_hash_value = 0 + for value in left_values_array[:, left_idx]: + left_hash_value = (left_hash_value * 31 + PyObject_Hash(value)) + + for right_idx in range(nr): + if left_hash_value == right_hashes[right_idx]: + left_indexes.append(left_idx) + right_indexes.append(right_idx) + + return (left_indexes.to_numpy(), right_indexes.to_numpy()) diff --git a/opteryx/compiled/structures/buffers.pxd b/opteryx/compiled/structures/buffers.pxd new file mode 100644 index 00000000..e92268de --- /dev/null +++ b/opteryx/compiled/structures/buffers.pxd @@ -0,0 +1,23 @@ +# cython: language_level=3 +# cython: nonecheck=False +# cython: cdivision=True +# cython: initializedcheck=False +# cython: infer_types=True +# cython: wraparound=False +# cython: boundscheck=False + +from libc.stdint cimport int64_t + +import numpy +cimport numpy + +cdef class IntBuffer: + + cdef public int64_t[::1] _buffer + cdef public int64_t size + cdef public int64_t capacity + + cpdef void append(self, int64_t value) + cpdef void extend(self, iterable) + cpdef numpy.ndarray[int64_t, ndim=1] to_numpy(self) + cpdef buffer(self) \ No newline at end of file diff --git a/opteryx/compiled/structures/buffers.pyx b/opteryx/compiled/structures/buffers.pyx index 81978cb7..7fe4052f 100644 --- a/opteryx/compiled/structures/buffers.pyx +++ b/opteryx/compiled/structures/buffers.pyx @@ -7,26 +7,18 @@ # cython: wraparound=False # cython: boundscheck=False -from libc.stdint cimport int64_t - import numpy cimport numpy from libc.stdint cimport int64_t -from libc.stdlib cimport malloc, realloc, free -from cpython.object cimport PyObject -from cpython.long cimport PyLong_AsLongLong -from cpython.sequence cimport PySequence_Fast, PySequence_Fast_GET_SIZE, PySequence_Fast_GET_ITEM -from cpython.ref cimport Py_DECREF, Py_INCREF - cdef class IntBuffer: """ A fast integer buffer using Cython-managed memory. """ - cdef public int64_t[:] _buffer - cdef public size_t size - cdef public size_t capacity + # cdef public int64_t[::1] _buffer + # cdef public int64_t size + # cdef public int64_t capacity def __cinit__(self, size_hint: int = 1024): self.capacity = size_hint @@ -57,120 +49,3 @@ cdef class IntBuffer: cpdef buffer(self): """ Convert the buffer to a NumPy array without copying. """ return self._buffer[:self.size] - -cdef class CIntBuffer: - """ - CIntBuffer is roughly twice as fast but intermittently sigfaults, the issue appears to be - with the ownership and releasing/freeing incorrectly. Likely due to pyarrow threading/slicing - but nothing I've tried has stopped the segfaulting. - - A fast growable integer buffer backed by raw C memory. - """ - cdef int64_t* data - cdef public size_t size - cdef size_t capacity - cdef bint sealed # 🚫 Prevents modifications after `to_numpy()` - - def __cinit__(self, size_hint: int = 1024): - if size_hint < 1: - size_hint = 1 - self.size = 0 - self.capacity = size_hint - self.data = malloc(self.capacity * sizeof(int64_t)) - if self.data == NULL: - raise MemoryError("Failed to allocate IntBuffer") - self.sealed = False - - def __dealloc__(self): - # Always free if data != NULL: - if self.data != NULL: - free(self.data) - self.data = NULL - - cdef inline void ensure_capacity(self, size_t needed) except *: - """ - Ensures we have enough space. 🚫 Disabled if sealed. - """ - if self.sealed: - raise RuntimeError("Cannot modify buffer after exporting to NumPy") - - cdef size_t new_cap - cdef int64_t* new_data - - if needed <= self.capacity: - return - - new_cap = self.capacity - while new_cap < needed: - new_cap <<= 1 - - new_data = realloc(self.data, new_cap * sizeof(int64_t)) - if new_data == NULL: - raise MemoryError("Failed to reallocate IntBuffer") - - self.data = new_data - self.capacity = new_cap - - cpdef void append(self, int64_t value) except *: - if self.sealed: - raise RuntimeError("Cannot append after exporting to NumPy") - if self.size == self.capacity: - self.ensure_capacity(self.size + 1) - self.data[self.size] = value - self.size += 1 - - cpdef void extend(self, object iterable) except *: - """ - Extend the buffer with a Python iterable of integers. - """ - if self.sealed: - raise RuntimeError("Cannot extend after exporting to NumPy") - - cdef object seq = PySequence_Fast(iterable, "extend requires an iterable") - if seq is None: - raise TypeError("extend requires an iterable") - - cdef Py_ssize_t length = PySequence_Fast_GET_SIZE(seq) - if length <= 0: - Py_DECREF(seq) - return - - self.ensure_capacity(self.size + length) - - cdef Py_ssize_t i - cdef PyObject* item - cdef int64_t value - - for i in range(length): - item = PySequence_Fast_GET_ITEM(seq, i) - value = PyLong_AsLongLong( item) - self.data[self.size + i] = value - - self.size += length - Py_DECREF(seq) - - cpdef numpy.ndarray[int64_t, ndim=1] to_numpy(self): - """ - Safely converts the buffer into a NumPy array without causing memory issues. - - Returns: - A NumPy array backed by the buffer's memory (zero-copy). - """ - if self.sealed: - raise RuntimeError("Already exported to NumPy.") - self.sealed = True # Prevent further modification - - # ✅ Create a NumPy array directly from the buffer - cdef numpy.ndarray[int64_t, ndim=1] numpy_array - numpy_array = numpy.PyArray_SimpleNewFromData( - 1, [self.size], numpy.NPY_INT64, self.data - ) - - # ✅ Prevent NumPy from freeing the memory - numpy.PyArray_CLEARFLAGS(numpy_array, numpy.NPY_ARRAY_OWNDATA) - - # ✅ Attach `self` as the BaseObject so NumPy keeps `CIntBuffer` alive - Py_INCREF(self) # Ensure Python keeps a reference - numpy.PyArray_SetBaseObject(numpy_array, self) - - return numpy_array diff --git a/opteryx/connectors/disk_connector.py b/opteryx/connectors/disk_connector.py index cb9ebe48..70b6bd82 100644 --- a/opteryx/connectors/disk_connector.py +++ b/opteryx/connectors/disk_connector.py @@ -32,6 +32,8 @@ # Define os.O_BINARY for non-Windows platforms if it's not already defined if not hasattr(os, "O_BINARY"): os.O_BINARY = 0 # Value has no effect on non-Windows platforms +if not hasattr(os, "O_DIRECT"): + os.O_DIRECT = 0 # Value has no effect on non-Windows platforms def read_blob( @@ -70,7 +72,7 @@ def read_blob( import mmap try: - file_descriptor = os.open(blob_name, os.O_RDONLY | os.O_BINARY) + file_descriptor = os.open(blob_name, os.O_RDONLY | os.O_BINARY | os.O_DIRECT) if hasattr(os, "posix_fadvise"): os.posix_fadvise(file_descriptor, 0, 0, os.POSIX_FADV_WILLNEED) size = os.fstat(file_descriptor).st_size diff --git a/opteryx/functions/string_functions.py b/opteryx/functions/string_functions.py index 022ec589..1cb3bb9a 100644 --- a/opteryx/functions/string_functions.py +++ b/opteryx/functions/string_functions.py @@ -359,11 +359,11 @@ def match_against(arr, val): def regex_replace(array, _pattern, _replacement): import pyarrow - from opteryx.third_party.mrabarnett import regex as re + from opteryx.third_party.mrabarnett import regex pattern = _pattern[0] replacement = _replacement[0] - compiled_pattern = re.compile(pattern) + compiled_pattern = regex.compile(pattern) # Apply the regex replacement to each element in the array vectorized_replace = numpy.vectorize(lambda x: compiled_pattern.sub(replacement, x)) diff --git a/opteryx/operators/__init__.py b/opteryx/operators/__init__.py index 6fd29cb9..aea4273c 100644 --- a/opteryx/operators/__init__.py +++ b/opteryx/operators/__init__.py @@ -27,6 +27,7 @@ # from .information_schema_node import InformationSchemaNode # information_schema from .inner_join_node import InnerJoinNode +from .nested_loop_join_node import NestedLoopJoinNode from .limit_node import LimitNode # select the first N records from .outer_join_node import OuterJoinNode diff --git a/opteryx/operators/inner_join_node.py b/opteryx/operators/inner_join_node.py index 99bbcae4..22a3a4df 100644 --- a/opteryx/operators/inner_join_node.py +++ b/opteryx/operators/inner_join_node.py @@ -35,7 +35,7 @@ from opteryx.compiled.joins.inner_join import abs_hash_join_map from opteryx.compiled.structures import hash_join_map from opteryx.compiled.structures.bloom_filter import create_bloom_filter -from opteryx.compiled.structures.buffers import IntBuffer as IntBuffer +from opteryx.compiled.structures.buffers import IntBuffer from opteryx.models import QueryProperties from opteryx.utils.arrow import align_tables @@ -85,7 +85,6 @@ def __init__(self, properties: QueryProperties, **parameters): self.right_columns = parameters.get("right_columns") self.left_buffer = [] - self.right_buffer = [] self.left_hash = None self.left_filter = None diff --git a/opteryx/operators/nested_loop_join_node.py b/opteryx/operators/nested_loop_join_node.py new file mode 100644 index 00000000..261727c5 --- /dev/null +++ b/opteryx/operators/nested_loop_join_node.py @@ -0,0 +1,74 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# See the License at http://www.apache.org/licenses/LICENSE-2.0 +# Distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND. + +""" +Inner (Nested Loop) Join Node + +This is a SQL Query Execution Plan Node. + +This is an implementation of a nested loop join, which is a simple join algorithm, it excels +when one of the relations is very small - in this situation it's many times faster than a hash +join as we don't need to create the hash table. + +The Join Order Optimization Strategy will decide if this node should be used, based on the size. +""" + +from threading import Lock + +import numpy +import pyarrow +from pyarrow import Table + +from opteryx import EOS +from opteryx.compiled.joins.inner_join import nested_loop_join +from opteryx.models import QueryProperties +from opteryx.utils.arrow import align_tables + +from . import JoinNode + + +class NestedLoopJoinNode(JoinNode): + join_type = "nested_loop" + + def __init__(self, properties: QueryProperties, **parameters): + JoinNode.__init__(self, properties=properties, **parameters) + + self.left_columns = numpy.array(parameters.get("left_columns"), dtype=numpy.bytes_) + self.right_columns = numpy.array(parameters.get("right_columns"), dtype=numpy.bytes_) + + self.left_relation = None + self.left_buffer = [] + self.lock = Lock() + + @property + def name(self): # pragma: no cover + return "Nested Loop Join" + + @property + def config(self): # pragma: no cover + return "" + + def execute(self, morsel: Table, join_leg: str) -> Table: + with self.lock: + if join_leg == "left": + if morsel == EOS: + self.left_relation = pyarrow.concat_tables( + self.left_buffer, promote_options="none" + ) + self.left_buffer.clear() + else: + self.left_buffer.append(morsel) + yield None + return + + if join_leg == "right": + if morsel == EOS: + yield EOS + return + + left_indexes, right_indexes = nested_loop_join( + self.left_relation, morsel, self.left_columns, self.right_columns + ) + yield align_tables(self.left_relation, morsel, left_indexes, right_indexes) diff --git a/opteryx/planner/binder/binder_visitor.py b/opteryx/planner/binder/binder_visitor.py index 24f95e42..95f09775 100644 --- a/opteryx/planner/binder/binder_visitor.py +++ b/opteryx/planner/binder/binder_visitor.py @@ -715,6 +715,22 @@ def visit_join(self, node: Node, context: BindingContext) -> Tuple[Node, Binding f"CROSS JOIN UNNEST requires an ARRAY type column, not {node.unnest_column.schema_column.type}." ) + # this is very much not how we want to do this, but let's start somewhere + node.left_size = sum( + context.schemas[relation_name].row_count_metric + or context.schemas[relation_name].row_count_estimate + or float("inf") + for relation_name in node.left_relation_names + if relation_name in context.schemas + ) + node.right_size = sum( + context.schemas[relation_name].row_count_metric + or context.schemas[relation_name].row_count_estimate + or float("inf") + for relation_name in node.right_relation_names + if relation_name in context.schemas + ) + if node.type == "inner" and node.on is None: from opteryx.exceptions import SqlError diff --git a/opteryx/planner/optimizer/__init__.py b/opteryx/planner/optimizer/__init__.py index 500b176c..b5a6b0ec 100644 --- a/opteryx/planner/optimizer/__init__.py +++ b/opteryx/planner/optimizer/__init__.py @@ -80,6 +80,7 @@ def __init__(self, statistics: QueryStatistics): PredicateRewriteStrategy(statistics), PredicatePushdownStrategy(statistics), ProjectionPushdownStrategy(statistics), + JoinOrderingStrategy(statistics), DistinctPushdownStrategy(statistics), OperatorFusionStrategy(statistics), LimitPushdownStrategy(statistics), diff --git a/opteryx/planner/optimizer/strategies/__init__.py b/opteryx/planner/optimizer/strategies/__init__.py index 3c09b370..456683ff 100644 --- a/opteryx/planner/optimizer/strategies/__init__.py +++ b/opteryx/planner/optimizer/strategies/__init__.py @@ -2,6 +2,7 @@ from .constant_folding import ConstantFoldingStrategy from .correlated_filters import CorrelatedFiltersStrategy from .distinct_pushdown import DistinctPushdownStrategy +from .join_ordering import JoinOrderingStrategy from .limit_pushdown import LimitPushdownStrategy from .operator_fusion import OperatorFusionStrategy from .predicate_pushdown import PredicatePushdownStrategy @@ -15,6 +16,7 @@ "ConstantFoldingStrategy", "CorrelatedFiltersStrategy", "DistinctPushdownStrategy", + "JoinOrderingStrategy", "LimitPushdownStrategy", "OperatorFusionStrategy", "PredicatePushdownStrategy", diff --git a/opteryx/planner/optimizer/strategies/join_ordering.py b/opteryx/planner/optimizer/strategies/join_ordering.py new file mode 100644 index 00000000..72f42a08 --- /dev/null +++ b/opteryx/planner/optimizer/strategies/join_ordering.py @@ -0,0 +1,46 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# See the License at http://www.apache.org/licenses/LICENSE-2.0 +# Distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND. + +""" +Optimization Rule - Join Ordering + +Type: Cost-Based +Goal: Faster Joins + +Build a left-deep join tree, where the left relation of any pair is the smaller relation. + +We also decide if we should use a nested loop join or a hash join based on the size of the left relation. +""" + +from opteryx.planner.logical_planner import LogicalPlan +from opteryx.planner.logical_planner import LogicalPlanNode +from opteryx.planner.logical_planner import LogicalPlanStepType + +from .optimization_strategy import OptimizationStrategy +from .optimization_strategy import OptimizerContext +from .optimization_strategy import get_nodes_of_type_from_logical_plan + + +class JoinOrderingStrategy(OptimizationStrategy): + def visit(self, node: LogicalPlanNode, context: OptimizerContext) -> OptimizerContext: + if not context.optimized_plan: + context.optimized_plan = context.pre_optimized_tree.copy() # type: ignore + + if node.node_type == LogicalPlanStepType.Join and node.type == "inner": + # Tiny datasets benefit from nested loop joins (avoids building a hash table) + if min(node.left_size, node.right_size) < 1000: + node.type = "nested_inner" + context.optimized_plan[context.node_id] = node + + return context + + def complete(self, plan: LogicalPlan, context: OptimizerContext) -> LogicalPlan: + # No finalization needed for this strategy + return plan + + def should_i_run(self, plan): + # only run if there are LIMIT clauses in the plan + candidates = get_nodes_of_type_from_logical_plan(plan, (LogicalPlanStepType.Join,)) + return len(candidates) > 0 diff --git a/opteryx/planner/physical_planner.py b/opteryx/planner/physical_planner.py index bfcd98fe..fda3a243 100644 --- a/opteryx/planner/physical_planner.py +++ b/opteryx/planner/physical_planner.py @@ -11,7 +11,7 @@ from opteryx.models import PhysicalPlan from opteryx.planner.logical_planner import LogicalPlanStepType -ENABLE_TWO_PART_AGGREGATOR: bool = features.enable_two_part_aggregator +ENABLE_TWO_PART_AGGREGATOR: bool = features.enable_two_part_aggregator or True def create_physical_plan(logical_plan, query_properties) -> PhysicalPlan: @@ -49,6 +49,9 @@ def create_physical_plan(logical_plan, query_properties) -> PhysicalPlan: if node_config.get("type") == "inner": # INNER JOIN, NATURAL JOIN node = operators.InnerJoinNode(query_properties, **node_config) + elif node_config.get("type") == "nested_inner": + # INNER JOIN, NATURAL JOIN + node = operators.NestedLoopJoinNode(query_properties, **node_config) elif node_config.get("type") in ("left outer", "full outer", "right outer"): # LEFT JOIN, RIGHT JOIN, FULL JOIN node = operators.OuterJoinNode(query_properties, **node_config)