Skip to content

Commit

Permalink
Fix #19634: use system.query.history table for lineage - databricks
Browse files Browse the repository at this point in the history
  • Loading branch information
ulixius9 committed Feb 5, 2025
1 parent 1c61236 commit 7abefb5
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from metadata.ingestion.source.database.databricks.client import DatabricksClient
from metadata.ingestion.source.database.databricks.queries import (
DATABRICKS_GET_CATALOGS,
DATABRICKS_SQL_STATEMENT_TEST,
)
from metadata.utils.constants import THREE_MIN
from metadata.utils.logger import ingestion_logger
Expand Down Expand Up @@ -106,7 +107,13 @@ def test_database_query(engine: Engine, statement: str):
engine=connection,
statement=DATABRICKS_GET_CATALOGS,
),
"GetQueries": client.test_query_api_access,
"GetQueries": partial(
test_database_query,
engine=connection,
statement=DATABRICKS_SQL_STATEMENT_TEST.format(
query_history=service_connection.queryHistoryTable
),
),
}

return test_connection_steps(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@
"""
Databricks lineage module
"""
import traceback
from datetime import datetime
from typing import Iterator

from metadata.generated.schema.type.basic import DateTime
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.source.database.databricks.queries import (
DATABRICKS_SQL_STATEMENT,
)
from metadata.ingestion.source.database.databricks.query_parser import (
DatabricksQueryParserSource,
)
Expand All @@ -31,23 +29,13 @@ class DatabricksLineageSource(DatabricksQueryParserSource, LineageSource):
Databricks Lineage Legacy Source
"""

def yield_table_query(self) -> Iterator[TableQuery]:
data = self.client.list_query_history(
start_date=self.start,
end_date=self.end,
sql_stmt = DATABRICKS_SQL_STATEMENT

filters = """
AND (
lower(statement_text) LIKE '%%create%%select%%'
OR lower(statement_text) LIKE '%%insert%%into%%select%%'
OR lower(statement_text) LIKE '%%update%%'
OR lower(statement_text) LIKE '%%merge%%'
)
for row in data or []:
try:
if self.client.is_query_valid(row):
yield TableQuery(
dialect=self.dialect.value,
query=row.get("query_text"),
userName=row.get("user_name"),
startTime=str(row.get("query_start_time_ms")),
endTime=str(row.get("execution_end_time_ms")),
analysisDate=DateTime(datetime.now()),
serviceName=self.config.serviceName,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error processing query_dict {row}: {exc}")
"""
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,30 @@

import textwrap

DATABRICKS_SQL_STATEMENT = textwrap.dedent(
"""
SELECT
statement_type AS query_type,
statement_text AS query_text,
executed_by AS user_name,
start_time AS start_time,
null AS database_name,
null AS schema_name,
end_time AS end_time,
total_duration_ms/1000 AS duration
from {query_history}
WHERE statement_text NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%'
AND statement_text NOT LIKE '/* {{"app": "dbt", %%}} */%%'
AND start_time between to_timestamp('{start_time}') and to_timestamp('{end_time}')
{filters}
LIMIT {result_limit}
"""
)

DATABRICKS_SQL_STATEMENT_TEST = """
SELECT statement_text from {query_history} LIMIT 1
"""

DATABRICKS_VIEW_DEFINITIONS = textwrap.dedent(
"""
select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.databricks.client import DatabricksClient
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
from metadata.utils.logger import ingestion_logger

Expand All @@ -36,18 +35,6 @@ class DatabricksQueryParserSource(QueryParserSource, ABC):

filters: str

def _init_super(
self,
config: WorkflowSource,
metadata: OpenMetadata,
):
super().__init__(config, metadata, False)

# pylint: disable=super-init-not-called
def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
self._init_super(config=config, metadata=metadata)
self.client = DatabricksClient(self.service_connection)

@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
Expand All @@ -61,7 +48,16 @@ def create(
)
return cls(config, metadata)

def prepare(self):
def get_sql_statement(self, start_time, end_time):
"""
By default, there's nothing to prepare
returns sql statement to fetch query logs.
Override if we have specific parameters
"""
return self.sql_stmt.format(
start_time=start_time,
end_time=end_time,
filters=self.get_filters(),
result_limit=self.source_config.resultLimit,
query_history=self.service_connection.queryHistoryTable,
)
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@
"""
Databricks usage module
"""
import traceback
from datetime import datetime
from typing import Iterable

from metadata.generated.schema.type.basic import DateTime
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery
from metadata.ingestion.source.database.databricks.queries import (
DATABRICKS_SQL_STATEMENT,
)
from metadata.ingestion.source.database.databricks.query_parser import (
DatabricksQueryParserSource,
)
Expand All @@ -31,36 +29,8 @@ class DatabricksUsageSource(DatabricksQueryParserSource, UsageSource):
Databricks Usage Source
"""

def yield_table_queries(self) -> Iterable[TableQuery]:
"""
Method to yield TableQueries
"""
queries = []
data = self.client.list_query_history(
start_date=self.start,
end_date=self.end,
)
for row in data or []:
try:
if self.client.is_query_valid(row):
queries.append(
TableQuery(
dialect=self.dialect.value,
query=row.get("query_text"),
userName=row.get("user_name"),
startTime=str(row.get("query_start_time_ms")),
endTime=str(row.get("execution_end_time_ms")),
analysisDate=DateTime(datetime.now()),
serviceName=self.config.serviceName,
duration=row.get("duration")
if row.get("duration")
else None,
)
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to process query {row.get('query_text')} due to: {err}"
)
sql_stmt = DATABRICKS_SQL_STATEMENT

yield TableQueries(queries=queries)
filters = """
AND statement_type NOT IN ('SHOW', 'DESCRIBE', 'USE')
"""
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
EntitiesEdge,
LineageDetails,
)
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException, Source
Expand Down Expand Up @@ -111,7 +112,9 @@ def _get_lineage_details(
)
)
if col_lineage:
return LineageDetails(columnsLineage=col_lineage)
return LineageDetails(
columnsLineage=col_lineage, source=LineageSource.QueryLineage
)
return None

def _iter(self, *_, **__) -> Iterable[Either[AddLineageRequest]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ class UnityCatalogQueryParserSource(

filters: str

def _init_super(
self,
config: WorkflowSource,
metadata: OpenMetadata,
):
super().__init__(config, metadata, False)

# pylint: disable=super-init-not-called
def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
self._init_super(config=config, metadata=metadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,60 @@
"""
unity catalog usage module
"""
import traceback
from datetime import datetime
from typing import Iterable

from metadata.ingestion.source.database.databricks.usage import DatabricksUsageSource
from metadata.generated.schema.type.basic import DateTime
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery
from metadata.ingestion.source.database.unitycatalog.query_parser import (
UnityCatalogQueryParserSource,
)
from metadata.ingestion.source.database.usage_source import UsageSource
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()


class UnitycatalogUsageSource(UnityCatalogQueryParserSource, DatabricksUsageSource):
class UnitycatalogUsageSource(UnityCatalogQueryParserSource, UsageSource):
"""
UnityCatalog Usage Source
This class would be inheriting all the methods from
DatabricksUsageSource as both the sources would call
the same API for fetching Usage Queries
"""

def yield_table_queries(self) -> Iterable[TableQuery]:
"""
Method to yield TableQueries
"""
queries = []
data = self.client.list_query_history(
start_date=self.start,
end_date=self.end,
)
for row in data or []:
try:
if self.client.is_query_valid(row):
queries.append(
TableQuery(
dialect=self.dialect.value,
query=row.get("query_text"),
userName=row.get("user_name"),
startTime=str(row.get("query_start_time_ms")),
endTime=str(row.get("execution_end_time_ms")),
analysisDate=DateTime(datetime.now()),
serviceName=self.config.serviceName,
duration=row.get("duration")
if row.get("duration")
else None,
)
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to process query {row.get('query_text')} due to: {err}"
)

yield TableQueries(queries=queries)
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@
"type": "integer",
"default": 120
},
"queryHistoryTable":{
"title": "Query History Table",
"description": "Table name to fetch the query history.",
"type": "string",
"default": "system.query.history"
},
"connectionOptions": {
"title": "Connection Options",
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 Collate.
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -10,9 +10,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/


/**
/**
* Databricks Connection Config
*/
export interface DatabricksConnection {
Expand Down Expand Up @@ -43,7 +41,11 @@ export interface DatabricksConnection {
/**
* Databricks compute resources URL.
*/
httpPath?: string;
httpPath?: string;
/**
* Table name to fetch the query history.
*/
queryHistoryTable?: string;
sampleDataStorageConfig?: SampleDataStorageConfig;
/**
* SQLAlchemy driver scheme options.
Expand Down

0 comments on commit 7abefb5

Please sign in to comment.