Skip to content

Commit

Permalink
Merge pull request #1972 from mabel-dev/#1917-2
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer authored Sep 1, 2024
2 parents a206088 + 5b1ae83 commit e0bfb00
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 20 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 760
__build__ = 761

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
1 change: 1 addition & 0 deletions opteryx/compiled/structures/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .hash_table import HashSet
from .hash_table import HashTable
from .hash_table import distinct
from .hash_table import list_distinct
from .memory_pool import MemoryPool
from .node import Node
10 changes: 10 additions & 0 deletions opteryx/compiled/structures/hash_table.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ cpdef tuple distinct(table, HashSet seen_hashes=None, list columns=None):

return (keep, seen_hashes)

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef tuple list_distinct(list values, cnp.ndarray indices, HashSet seen_hashes=None):
new_indices = []
new_values = []
for i, v in enumerate(values):
if seen_hashes.insert(hash(v)):
new_values.append(v)
new_indices.append(indices[i])
return new_values, new_indices, seen_hashes

@cython.boundscheck(False)
@cython.wraparound(False)
Expand Down
49 changes: 40 additions & 9 deletions opteryx/operators/cross_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def _cross_join_unnest_column(
target_column: FlatColumn = None,
conditions: Set = None,
statistics=None,
distinct: bool = False,
single_column: bool = False,
) -> Generator[pyarrow.Table, None, None]:
"""
Perform a cross join on an unnested column of pyarrow tables.
Expand All @@ -61,6 +63,10 @@ def _cross_join_unnest_column(
"""
from opteryx.compiled.cross_join import build_filtered_rows_indices_and_column
from opteryx.compiled.cross_join import build_rows_indices_and_column
from opteryx.compiled.structures import HashSet
from opteryx.compiled.structures import list_distinct

hash_set = HashSet()

# Check if the source node type is an identifier, raise error otherwise
if source.node_type != NodeType.IDENTIFIER:
Expand Down Expand Up @@ -99,16 +105,33 @@ def _cross_join_unnest_column(
column_data.to_numpy(False), conditions
)

# Rebuild the block with the new column data
new_block = left_block.take(indices)
new_block = pyarrow.Table.from_batches([new_block], schema=left_morsel.schema)
new_block = new_block.append_column(target_column.identity, [new_column_data])
if single_column and distinct and indices.size > 0:
# if the unnest target is the only field in the SELECT and we're DISTINCTING
new_column_data, indices, hash_set = list_distinct(
new_column_data, indices, hash_set
)

statistics.time_cross_join_unnest += time.monotonic_ns() - start
if new_block.num_rows > 0:
yield new_block
at_least_once = True
start = time.monotonic_ns()
if len(indices) > 0:
if single_column:
schema = pyarrow.schema(
[
pyarrow.field(
name=target_column.identity, type=target_column.arrow_field.type
)
]
)
new_block = pyarrow.Table.from_arrays([new_column_data], schema=schema)
else:
# Rebuild the block with the new column data if we have any rows to build for
new_block = left_block.take(indices)
new_block = pyarrow.Table.from_batches([new_block], schema=left_morsel.schema)
new_block = new_block.append_column(target_column.identity, [new_column_data])

statistics.time_cross_join_unnest += time.monotonic_ns() - start
if new_block.num_rows > 0:
yield new_block
at_least_once = True
start = time.monotonic_ns()

if not at_least_once:
# Create an empty table with the new schema
Expand Down Expand Up @@ -208,6 +231,7 @@ class CrossJoinDataObject(BasePlanDataObject):
_unnest_column: str = None
_unnest_target: str = None
_filters: str = None
_distinct: bool = False


class CrossJoinNode(BasePlanNode):
Expand All @@ -229,6 +253,7 @@ def __init__(self, properties: QueryProperties, **config):
self._unnest_column = config.get("unnest_column")
self._unnest_target = config.get("unnest_target")
self._filters = config.get("filters")
self._distinct = config.get("distinct", False)

# handle variation in how the unnested column is represented
if self._unnest_column:
Expand All @@ -240,6 +265,10 @@ def __init__(self, properties: QueryProperties, **config):
):
self._unnest_column.value = tuple([self._unnest_column.value])

self._single_column = config.get("pre_update_columns", set()) == {
self._unnest_target.identity,
}

@classmethod
def from_json(cls, json_obj: str) -> "BasePlanNode": # pragma: no cover
raise NotImplementedError()
Expand Down Expand Up @@ -279,4 +308,6 @@ def execute(self) -> Generator:
target_column=self._unnest_target,
conditions=self._filters,
statistics=self.statistics,
distinct=self._distinct,
single_column=self._single_column,
)
1 change: 1 addition & 0 deletions opteryx/planner/cost_based_optimizer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def __init__(self):
"""
self.strategies = [
ConstantFoldingStrategy(),
# DistinctPushdownStrategy(),
BooleanSimplificationStrategy(),
SplitConjunctivePredicatesStrategy(),
PredicateRewriteStrategy(),
Expand Down
2 changes: 2 additions & 0 deletions opteryx/planner/cost_based_optimizer/strategies/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .boolean_simplication import BooleanSimplificationStrategy
from .constant_folding import ConstantFoldingStrategy
from .distinct_pushdown import DistinctPushdownStrategy
from .operator_fusion import OperatorFusionStrategy
from .predicate_pushdown import PredicatePushdownStrategy
from .predicate_rewriter import PredicateRewriteStrategy
Expand All @@ -10,6 +11,7 @@
__all__ = [
"BooleanSimplificationStrategy",
"ConstantFoldingStrategy",
"DistinctPushdownStrategy",
"OperatorFusionStrategy",
"PredicatePushdownStrategy",
"PredicateRewriteStrategy",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Optimization Rule - Distinct Pushdown
Type: Heuristic
Goal: Reduce Rows
Rules:
- DISTINCT ON can't get pushed
"""

from orso.tools import random_string

from opteryx.managers.expression import NodeType
from opteryx.managers.expression import get_all_nodes_of_type
from opteryx.models import LogicalColumn
from opteryx.planner.logical_planner import LogicalPlan
from opteryx.planner.logical_planner import LogicalPlanNode
from opteryx.planner.logical_planner import LogicalPlanStepType

from .optimization_strategy import OptimizationStrategy
from .optimization_strategy import OptimizerContext

PASSABLE_AGGREGATIONS = ("MIN", "MAX")
"""
Aggregations we can push the DISTINCT past
"""


class DistinctPushdownStrategy(OptimizationStrategy):
def visit(self, node: LogicalPlanNode, context: OptimizerContext) -> OptimizerContext:
""" """
if not context.optimized_plan:
context.optimized_plan = context.pre_optimized_tree.copy() # type: ignore

if (node.node_type == LogicalPlanStepType.Distinct) and node.on is None:
node.nid = context.node_id
node.plan_path = context.optimized_plan.trace_to_root(context.node_id)
context.collected_distincts.append(node)
context.optimized_plan.remove_node(context.node_id, heal=True)
return context

if (
node.node_type == LogicalPlanStepType.Join
and context.collected_distincts
and node.type == "cross join"
and node.unnest_target is not None
):
node.distinct = True
context.optimized_plan[context.node_id] = node
context.collected_distincts.clear()
return context

if node.node_type in (
LogicalPlanStepType.Join,
LogicalPlanStepType.Scan,
LogicalPlanStepType.AggregateAndGroup,
LogicalPlanStepType.Aggregate,
):
# anything we couldn't push, we need to put back
for distinct in context.collected_distincts:
for nid in distinct.plan_path:
if nid in context.optimized_plan:
context.optimized_plan.insert_node_before(distinct.nid, distinct, nid)
break
return context

return context

def complete(self, plan: LogicalPlan, context: OptimizerContext) -> LogicalPlan:
# No finalization needed for this strategy
return plan
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,24 @@
from opteryx.planner.logical_planner import LogicalPlanNode


# Context object to carry state
class OptimizerContext:
"""Context object to carry state"""

def __init__(self, tree: LogicalPlan):
self.node_id = None
self.parent_nid = None
self.last_nid = None
self.pre_optimized_tree = tree
self.optimized_plan = LogicalPlan()

# We collect predicates we should be able to push to reads and joins
self.collected_predicates: list = []
"""We collect predicates we should be able to push to reads and joins"""

# We collect column identities so we can push column selection as close to the
# read as possible, including off to remote systems
self.collected_identities: set = set()
"""We collect column identities so we can push column selection as close to the read as possible, including off to remote systems"""

self.collected_distincts: list = []
"""We collect distincts to try to eliminate records earlier"""


class OptimizationStrategy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def visit(self, node: LogicalPlanNode, context: OptimizerContext) -> OptimizerCo
Returns:
A tuple containing the potentially modified node and the updated context.
"""
node.pre_update_columns = set(context.collected_identities)
if node.columns: # Assumes node.columns is an iterable or None
collected_columns = self.collect_columns(node)
context.collected_identities.update(collected_columns)
Expand Down
11 changes: 5 additions & 6 deletions opteryx/planner/logical_planner/logical_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,13 @@ def __str__(self):
if node_type == LogicalPlanStepType.Filter:
return f"FILTER ({format_expression(self.condition)})"
if node_type == LogicalPlanStepType.Join:
filters = ""
if self.filters:
filters = f"({self.unnest_alias} IN ({', '.join(self.filters)}))"
distinct = " DISTINCT" if self.distinct else ""
filters = f"({self.unnest_alias} IN ({', '.join(self.filters)}))" if self.filters else ""
if self.on:
return f"{self.type.upper()} JOIN ({format_expression(self.on, True)}){filters}"
return f"{self.type.upper()} JOIN{distinct} ({format_expression(self.on, True)}){filters}"
if self.using:
return f"{self.type.upper()} JOIN (USING {','.join(map(format_expression, self.using))}){filters}"
return f"{self.type.upper()} {filters}"
return f"{self.type.upper()} JOIN{distinct} (USING {','.join(map(format_expression, self.using))}){filters}"
return f"{self.type.upper()}{distinct} {filters}"
if node_type == LogicalPlanStepType.HeapSort:
return f"HEAP SORT (LIMIT {self.limit}, ORDER BY {', '.join(format_expression(item[0]) + (' DESC' if item[1] =='descending' else '') for item in self.order_by)})"
if node_type == LogicalPlanStepType.Limit:
Expand Down

0 comments on commit e0bfb00

Please sign in to comment.