Skip to content

Commit

Permalink
0.12.0-beta.9
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Dec 2, 2023
1 parent c3c306c commit 4bbba6b
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 16 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@
"""

# __version__ = "0.4.0-alpha.6"
__version__ = "0.12.0-beta.8"
__version__ = "0.12.0-beta.9"
48 changes: 33 additions & 15 deletions opteryx/connectors/sql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
# limitations under the License.

"""
The SQL Connector downloads data from remote servers and converts them
to pyarrow tables so they can be processed as per any other data source.
"""
from decimal import Decimal
from typing import Generator

import pyarrow
from orso import DataFrame
from orso import Row
from orso.schema import FlatColumn
from orso.schema import RelationSchema
from orso.types import PYTHON_TO_ORSO_MAP
Expand Down Expand Up @@ -55,38 +59,52 @@ def __init__(self, *args, connection: str = None, engine=None, **kwargs):

def read_dataset(
self, columns: list = None, chunk_size: int = INITIAL_CHUNK_SIZE
) -> "DatasetReader":
) -> Generator[pyarrow.Table, None, None]:
from sqlalchemy.sql import text

self.chunk_size = chunk_size
result_schema = self.schema

query_builder = Query().FROM(self.dataset)

# if we're projecting, update the SQL and the target morsel schema
# Update the SQL and the target morsel schema if projecting
if columns:
column_names = [col.name for col in columns]
query_builder.add("SELECT", *column_names)
result_schema.columns = [col for col in self.schema.columns if col.name in column_names]
else:
query_builder.add("SELECT", "*")

# Use orso as an intermediatary, it's row-based so is well suited to processing
# records coming back from a SQL query, and it has a well-optimized to arrow
# converter to create pyarrow Tables
morsel = DataFrame(schema=result_schema)
row_factory = Row.create_class(result_schema, tuples_only=True)

with self._engine.connect() as conn:
# DEBUG: log ("READ DATASET\n", str(query_builder))
for row in conn.execute(text(str(query_builder))):
morsel._rows.append(row)
if len(morsel) == self.chunk_size:
yield morsel.arrow()

if morsel.nbytes > 0:
self.chunk_size = int(len(morsel) // (morsel.nbytes / DEFAULT_MORSEL_SIZE))

morsel = DataFrame(schema=result_schema)

if len(morsel) > 0:
yield morsel.arrow()
result = conn.execute(text(str(query_builder)))

while True:
batch_rows = result.fetchmany(self.chunk_size)
if not batch_rows:
break

# convert each SqlAlchemy Row to an orso Row
for row in batch_rows:
morsel._rows.append(row_factory(row))
yield morsel.arrow()

# Dynamically adjust chunk size based on the data size, we start by downloading
# 500 records to get an idea of the row size, assuming these 500 are
# representative, we work out how many rows fit into 8Mb.
# Don't keep recalculating, this is not a cheap operation and it's predicting
# the future so isn't going to ever be 100% correct
if self.chunk_size == INITIAL_CHUNK_SIZE and morsel.nbytes() > 0:
self.chunk_size = int(len(morsel) // (morsel.nbytes() / DEFAULT_MORSEL_SIZE))
# DEBUG: log (f"CHANGING CHUNK SIZE TO {self.chunk_size} was {INITIAL_CHUNK_SIZE}.")

morsel = DataFrame(schema=result_schema)

def get_dataset_schema(self) -> RelationSchema:
from sqlalchemy import Table
Expand Down

0 comments on commit 4bbba6b

Please sign in to comment.