From e8afb8afc5da1421bd9809b7ca95f5045ffd43b2 Mon Sep 17 00:00:00 2001 From: joocer Date: Sun, 1 Sep 2024 22:10:41 +0100 Subject: [PATCH 1/2] #1917 --- opteryx/compiled/structures/__init__.py | 1 + opteryx/compiled/structures/hash_table.pyx | 10 +++ opteryx/operators/cross_join_node.py | 49 +++++++++-- .../planner/cost_based_optimizer/__init__.py | 1 + .../strategies/__init__.py | 2 + .../strategies/distinct_pushdown.py | 83 +++++++++++++++++++ .../strategies/optimization_strategy.py | 11 ++- .../strategies/projection_pushdown.py | 1 + .../logical_planner/logical_planner.py | 11 ++- 9 files changed, 150 insertions(+), 19 deletions(-) create mode 100644 opteryx/planner/cost_based_optimizer/strategies/distinct_pushdown.py diff --git a/opteryx/compiled/structures/__init__.py b/opteryx/compiled/structures/__init__.py index d8c1e398e..691eccc56 100644 --- a/opteryx/compiled/structures/__init__.py +++ b/opteryx/compiled/structures/__init__.py @@ -1,5 +1,6 @@ from .hash_table import HashSet from .hash_table import HashTable from .hash_table import distinct +from .hash_table import list_distinct from .memory_pool import MemoryPool from .node import Node diff --git a/opteryx/compiled/structures/hash_table.pyx b/opteryx/compiled/structures/hash_table.pyx index 4bc81d0f3..c8ec3a6d4 100644 --- a/opteryx/compiled/structures/hash_table.pyx +++ b/opteryx/compiled/structures/hash_table.pyx @@ -103,6 +103,16 @@ cpdef tuple distinct(table, HashSet seen_hashes=None, list columns=None): return (keep, seen_hashes) +@cython.boundscheck(False) +@cython.wraparound(False) +cpdef tuple list_distinct(list values, cnp.ndarray indices, HashSet seen_hashes=None): + new_indices = [] + new_values = [] + for i, v in enumerate(values): + if seen_hashes.insert(hash(v)): + new_values.append(v) + new_indices.append(indices[i]) + return new_values, new_indices, seen_hashes @cython.boundscheck(False) @cython.wraparound(False) diff --git a/opteryx/operators/cross_join_node.py b/opteryx/operators/cross_join_node.py index bba4db867..15c634bae 100644 --- a/opteryx/operators/cross_join_node.py +++ b/opteryx/operators/cross_join_node.py @@ -47,6 +47,8 @@ def _cross_join_unnest_column( target_column: FlatColumn = None, conditions: Set = None, statistics=None, + distinct: bool = False, + single_column: bool = False, ) -> Generator[pyarrow.Table, None, None]: """ Perform a cross join on an unnested column of pyarrow tables. @@ -61,6 +63,10 @@ def _cross_join_unnest_column( """ from opteryx.compiled.cross_join import build_filtered_rows_indices_and_column from opteryx.compiled.cross_join import build_rows_indices_and_column + from opteryx.compiled.structures import HashSet + from opteryx.compiled.structures import list_distinct + + hash_set = HashSet() # Check if the source node type is an identifier, raise error otherwise if source.node_type != NodeType.IDENTIFIER: @@ -99,16 +105,33 @@ def _cross_join_unnest_column( column_data.to_numpy(False), conditions ) - # Rebuild the block with the new column data - new_block = left_block.take(indices) - new_block = pyarrow.Table.from_batches([new_block], schema=left_morsel.schema) - new_block = new_block.append_column(target_column.identity, [new_column_data]) + if single_column and distinct and indices.size > 0: + # if the unnest target is the only field in the SELECT and we're DISTINCTING + new_column_data, indices, hash_set = list_distinct( + new_column_data, indices, hash_set + ) - statistics.time_cross_join_unnest += time.monotonic_ns() - start - if new_block.num_rows > 0: - yield new_block - at_least_once = True - start = time.monotonic_ns() + if len(indices) > 0: + if single_column: + schema = pyarrow.schema( + [ + pyarrow.field( + name=target_column.identity, type=target_column.arrow_field.type + ) + ] + ) + new_block = pyarrow.Table.from_arrays([new_column_data], schema=schema) + else: + # Rebuild the block with the new column data if we have any rows to build for + new_block = left_block.take(indices) + new_block = pyarrow.Table.from_batches([new_block], schema=left_morsel.schema) + new_block = new_block.append_column(target_column.identity, [new_column_data]) + + statistics.time_cross_join_unnest += time.monotonic_ns() - start + if new_block.num_rows > 0: + yield new_block + at_least_once = True + start = time.monotonic_ns() if not at_least_once: # Create an empty table with the new schema @@ -208,6 +231,7 @@ class CrossJoinDataObject(BasePlanDataObject): _unnest_column: str = None _unnest_target: str = None _filters: str = None + _distinct: bool = False class CrossJoinNode(BasePlanNode): @@ -229,6 +253,7 @@ def __init__(self, properties: QueryProperties, **config): self._unnest_column = config.get("unnest_column") self._unnest_target = config.get("unnest_target") self._filters = config.get("filters") + self._distinct = config.get("distinct", False) # handle variation in how the unnested column is represented if self._unnest_column: @@ -240,6 +265,10 @@ def __init__(self, properties: QueryProperties, **config): ): self._unnest_column.value = tuple([self._unnest_column.value]) + self._single_column = config.get("pre_update_columns", set()) == { + self._unnest_target.identity, + } + @classmethod def from_json(cls, json_obj: str) -> "BasePlanNode": # pragma: no cover raise NotImplementedError() @@ -279,4 +308,6 @@ def execute(self) -> Generator: target_column=self._unnest_target, conditions=self._filters, statistics=self.statistics, + distinct=self._distinct, + single_column=self._single_column, ) diff --git a/opteryx/planner/cost_based_optimizer/__init__.py b/opteryx/planner/cost_based_optimizer/__init__.py index f5764894f..1eb364460 100644 --- a/opteryx/planner/cost_based_optimizer/__init__.py +++ b/opteryx/planner/cost_based_optimizer/__init__.py @@ -80,6 +80,7 @@ def __init__(self): """ self.strategies = [ ConstantFoldingStrategy(), + # DistinctPushdownStrategy(), BooleanSimplificationStrategy(), SplitConjunctivePredicatesStrategy(), PredicateRewriteStrategy(), diff --git a/opteryx/planner/cost_based_optimizer/strategies/__init__.py b/opteryx/planner/cost_based_optimizer/strategies/__init__.py index 36ed89b87..6820472f7 100644 --- a/opteryx/planner/cost_based_optimizer/strategies/__init__.py +++ b/opteryx/planner/cost_based_optimizer/strategies/__init__.py @@ -1,5 +1,6 @@ from .boolean_simplication import BooleanSimplificationStrategy from .constant_folding import ConstantFoldingStrategy +from .distinct_pushdown import DistinctPushdownStrategy from .operator_fusion import OperatorFusionStrategy from .predicate_pushdown import PredicatePushdownStrategy from .predicate_rewriter import PredicateRewriteStrategy @@ -10,6 +11,7 @@ __all__ = [ "BooleanSimplificationStrategy", "ConstantFoldingStrategy", + "DistinctPushdownStrategy", "OperatorFusionStrategy", "PredicatePushdownStrategy", "PredicateRewriteStrategy", diff --git a/opteryx/planner/cost_based_optimizer/strategies/distinct_pushdown.py b/opteryx/planner/cost_based_optimizer/strategies/distinct_pushdown.py new file mode 100644 index 000000000..eb3518e4d --- /dev/null +++ b/opteryx/planner/cost_based_optimizer/strategies/distinct_pushdown.py @@ -0,0 +1,83 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Optimization Rule - Distinct Pushdown + +Type: Heuristic +Goal: Reduce Rows + +Rules: + - DISTINCT ON can't get pushed +""" + +from orso.tools import random_string + +from opteryx.managers.expression import NodeType +from opteryx.managers.expression import get_all_nodes_of_type +from opteryx.models import LogicalColumn +from opteryx.planner.logical_planner import LogicalPlan +from opteryx.planner.logical_planner import LogicalPlanNode +from opteryx.planner.logical_planner import LogicalPlanStepType + +from .optimization_strategy import OptimizationStrategy +from .optimization_strategy import OptimizerContext + +PASSABLE_AGGREGATIONS = ("MIN", "MAX") +""" +Aggregations we can push the DISTINCT past +""" + + +class DistinctPushdownStrategy(OptimizationStrategy): + def visit(self, node: LogicalPlanNode, context: OptimizerContext) -> OptimizerContext: + """ """ + if not context.optimized_plan: + context.optimized_plan = context.pre_optimized_tree.copy() # type: ignore + + if (node.node_type == LogicalPlanStepType.Distinct) and node.on is None: + node.nid = context.node_id + node.plan_path = context.optimized_plan.trace_to_root(context.node_id) + context.collected_distincts.append(node) + context.optimized_plan.remove_node(context.node_id, heal=True) + return context + + if ( + node.node_type == LogicalPlanStepType.Join + and context.collected_distincts + and node.type == "cross join" + and node.unnest_target is not None + ): + node.distinct = True + context.optimized_plan[context.node_id] = node + context.collected_distincts.clear() + return context + + if node.node_type in ( + LogicalPlanStepType.Join, + LogicalPlanStepType.Scan, + LogicalPlanStepType.AggregateAndGroup, + LogicalPlanStepType.Aggregate, + ): + # anything we couldn't push, we need to put back + for distinct in context.collected_distincts: + for nid in distinct.plan_path: + if nid in context.optimized_plan: + context.optimized_plan.insert_node_before(distinct.nid, distinct, nid) + break + return context + + return context + + def complete(self, plan: LogicalPlan, context: OptimizerContext) -> LogicalPlan: + # No finalization needed for this strategy + return plan diff --git a/opteryx/planner/cost_based_optimizer/strategies/optimization_strategy.py b/opteryx/planner/cost_based_optimizer/strategies/optimization_strategy.py index a02775b85..01eb4f477 100644 --- a/opteryx/planner/cost_based_optimizer/strategies/optimization_strategy.py +++ b/opteryx/planner/cost_based_optimizer/strategies/optimization_strategy.py @@ -14,8 +14,9 @@ from opteryx.planner.logical_planner import LogicalPlanNode -# Context object to carry state class OptimizerContext: + """Context object to carry state""" + def __init__(self, tree: LogicalPlan): self.node_id = None self.parent_nid = None @@ -23,12 +24,14 @@ def __init__(self, tree: LogicalPlan): self.pre_optimized_tree = tree self.optimized_plan = LogicalPlan() - # We collect predicates we should be able to push to reads and joins self.collected_predicates: list = [] + """We collect predicates we should be able to push to reads and joins""" - # We collect column identities so we can push column selection as close to the - # read as possible, including off to remote systems self.collected_identities: set = set() + """We collect column identities so we can push column selection as close to the read as possible, including off to remote systems""" + + self.collected_distincts: list = [] + """We collect distincts to try to eliminate records earlier""" class OptimizationStrategy: diff --git a/opteryx/planner/cost_based_optimizer/strategies/projection_pushdown.py b/opteryx/planner/cost_based_optimizer/strategies/projection_pushdown.py index 4d0559aa9..1ff6ba9ba 100644 --- a/opteryx/planner/cost_based_optimizer/strategies/projection_pushdown.py +++ b/opteryx/planner/cost_based_optimizer/strategies/projection_pushdown.py @@ -35,6 +35,7 @@ def visit(self, node: LogicalPlanNode, context: OptimizerContext) -> OptimizerCo Returns: A tuple containing the potentially modified node and the updated context. """ + node.pre_update_columns = set(context.collected_identities) if node.columns: # Assumes node.columns is an iterable or None collected_columns = self.collect_columns(node) context.collected_identities.update(collected_columns) diff --git a/opteryx/planner/logical_planner/logical_planner.py b/opteryx/planner/logical_planner/logical_planner.py index 60ba9f055..0e68d25fa 100644 --- a/opteryx/planner/logical_planner/logical_planner.py +++ b/opteryx/planner/logical_planner/logical_planner.py @@ -101,14 +101,13 @@ def __str__(self): if node_type == LogicalPlanStepType.Filter: return f"FILTER ({format_expression(self.condition)})" if node_type == LogicalPlanStepType.Join: - filters = "" - if self.filters: - filters = f"({self.unnest_alias} IN ({', '.join(self.filters)}))" + distinct = " DISTINCT" if self.distinct else "" + filters = f"({self.unnest_alias} IN ({', '.join(self.filters)}))" if self.filters else "" if self.on: - return f"{self.type.upper()} JOIN ({format_expression(self.on, True)}){filters}" + return f"{self.type.upper()} JOIN{distinct} ({format_expression(self.on, True)}){filters}" if self.using: - return f"{self.type.upper()} JOIN (USING {','.join(map(format_expression, self.using))}){filters}" - return f"{self.type.upper()} {filters}" + return f"{self.type.upper()} JOIN{distinct} (USING {','.join(map(format_expression, self.using))}){filters}" + return f"{self.type.upper()}{distinct} {filters}" if node_type == LogicalPlanStepType.HeapSort: return f"HEAP SORT (LIMIT {self.limit}, ORDER BY {', '.join(format_expression(item[0]) + (' DESC' if item[1] =='descending' else '') for item in self.order_by)})" if node_type == LogicalPlanStepType.Limit: From 5b1ae831ee049a1069b5244662326b2384481a15 Mon Sep 17 00:00:00 2001 From: XB500 Date: Sun, 1 Sep 2024 21:11:13 +0000 Subject: [PATCH 2/2] Opteryx Version 0.17.0-alpha.761 --- opteryx/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opteryx/__version__.py b/opteryx/__version__.py index ef685f999..f3f7fa02e 100644 --- a/opteryx/__version__.py +++ b/opteryx/__version__.py @@ -1,4 +1,4 @@ -__build__ = 760 +__build__ = 761 # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License.