From 7abefb59054408d4463bca287a3b955a08d54b61 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Wed, 5 Feb 2025 19:28:00 +0530 Subject: [PATCH] Fix #19634: use system.query.history table for lineage - databricks --- .../source/database/databricks/connection.py | 9 +++- .../source/database/databricks/lineage.py | 36 +++++---------- .../source/database/databricks/queries.py | 24 ++++++++++ .../database/databricks/query_parser.py | 26 +++++------ .../source/database/databricks/usage.py | 44 +++---------------- .../source/database/unitycatalog/lineage.py | 5 ++- .../database/unitycatalog/query_parser.py | 7 +++ .../source/database/unitycatalog/usage.py | 43 +++++++++++++++++- .../database/databricksConnection.json | 6 +++ .../database/databricksConnection.ts | 12 ++--- 10 files changed, 127 insertions(+), 85 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/connection.py b/ingestion/src/metadata/ingestion/source/database/databricks/connection.py index 14d3d3e392e9..11c467bc03c0 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/connection.py @@ -41,6 +41,7 @@ from metadata.ingestion.source.database.databricks.client import DatabricksClient from metadata.ingestion.source.database.databricks.queries import ( DATABRICKS_GET_CATALOGS, + DATABRICKS_SQL_STATEMENT_TEST, ) from metadata.utils.constants import THREE_MIN from metadata.utils.logger import ingestion_logger @@ -106,7 +107,13 @@ def test_database_query(engine: Engine, statement: str): engine=connection, statement=DATABRICKS_GET_CATALOGS, ), - "GetQueries": client.test_query_api_access, + "GetQueries": partial( + test_database_query, + engine=connection, + statement=DATABRICKS_SQL_STATEMENT_TEST.format( + query_history=service_connection.queryHistoryTable + ), + ), } return test_connection_steps( diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/lineage.py b/ingestion/src/metadata/ingestion/source/database/databricks/lineage.py index a77cb780e56d..eb4b74b5d495 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/lineage.py @@ -11,12 +11,10 @@ """ Databricks lineage module """ -import traceback -from datetime import datetime -from typing import Iterator -from metadata.generated.schema.type.basic import DateTime -from metadata.generated.schema.type.tableQuery import TableQuery +from metadata.ingestion.source.database.databricks.queries import ( + DATABRICKS_SQL_STATEMENT, +) from metadata.ingestion.source.database.databricks.query_parser import ( DatabricksQueryParserSource, ) @@ -31,23 +29,13 @@ class DatabricksLineageSource(DatabricksQueryParserSource, LineageSource): Databricks Lineage Legacy Source """ - def yield_table_query(self) -> Iterator[TableQuery]: - data = self.client.list_query_history( - start_date=self.start, - end_date=self.end, + sql_stmt = DATABRICKS_SQL_STATEMENT + + filters = """ + AND ( + lower(statement_text) LIKE '%%create%%select%%' + OR lower(statement_text) LIKE '%%insert%%into%%select%%' + OR lower(statement_text) LIKE '%%update%%' + OR lower(statement_text) LIKE '%%merge%%' ) - for row in data or []: - try: - if self.client.is_query_valid(row): - yield TableQuery( - dialect=self.dialect.value, - query=row.get("query_text"), - userName=row.get("user_name"), - startTime=str(row.get("query_start_time_ms")), - endTime=str(row.get("execution_end_time_ms")), - analysisDate=DateTime(datetime.now()), - serviceName=self.config.serviceName, - ) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Error processing query_dict {row}: {exc}") + """ diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/queries.py b/ingestion/src/metadata/ingestion/source/database/databricks/queries.py index 732dc79ad685..25cdcedfc105 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/queries.py @@ -14,6 +14,30 @@ import textwrap +DATABRICKS_SQL_STATEMENT = textwrap.dedent( + """ + SELECT + statement_type AS query_type, + statement_text AS query_text, + executed_by AS user_name, + start_time AS start_time, + null AS database_name, + null AS schema_name, + end_time AS end_time, + total_duration_ms/1000 AS duration + from {query_history} + WHERE statement_text NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%' + AND statement_text NOT LIKE '/* {{"app": "dbt", %%}} */%%' + AND start_time between to_timestamp('{start_time}') and to_timestamp('{end_time}') + {filters} + LIMIT {result_limit} + """ +) + +DATABRICKS_SQL_STATEMENT_TEST = """ + SELECT statement_text from {query_history} LIMIT 1 +""" + DATABRICKS_VIEW_DEFINITIONS = textwrap.dedent( """ select diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/query_parser.py b/ingestion/src/metadata/ingestion/source/database/databricks/query_parser.py index 00628bfbddaa..c67b06aa30ed 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/query_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/query_parser.py @@ -22,7 +22,6 @@ ) from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.database.databricks.client import DatabricksClient from metadata.ingestion.source.database.query_parser_source import QueryParserSource from metadata.utils.logger import ingestion_logger @@ -36,18 +35,6 @@ class DatabricksQueryParserSource(QueryParserSource, ABC): filters: str - def _init_super( - self, - config: WorkflowSource, - metadata: OpenMetadata, - ): - super().__init__(config, metadata, False) - - # pylint: disable=super-init-not-called - def __init__(self, config: WorkflowSource, metadata: OpenMetadata): - self._init_super(config=config, metadata=metadata) - self.client = DatabricksClient(self.service_connection) - @classmethod def create( cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None @@ -61,7 +48,16 @@ def create( ) return cls(config, metadata) - def prepare(self): + def get_sql_statement(self, start_time, end_time): """ - By default, there's nothing to prepare + returns sql statement to fetch query logs. + + Override if we have specific parameters """ + return self.sql_stmt.format( + start_time=start_time, + end_time=end_time, + filters=self.get_filters(), + result_limit=self.source_config.resultLimit, + query_history=self.service_connection.queryHistoryTable, + ) diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/usage.py b/ingestion/src/metadata/ingestion/source/database/databricks/usage.py index 0e5364a465d8..fedbab2da486 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/usage.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/usage.py @@ -11,12 +11,10 @@ """ Databricks usage module """ -import traceback -from datetime import datetime -from typing import Iterable -from metadata.generated.schema.type.basic import DateTime -from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery +from metadata.ingestion.source.database.databricks.queries import ( + DATABRICKS_SQL_STATEMENT, +) from metadata.ingestion.source.database.databricks.query_parser import ( DatabricksQueryParserSource, ) @@ -31,36 +29,8 @@ class DatabricksUsageSource(DatabricksQueryParserSource, UsageSource): Databricks Usage Source """ - def yield_table_queries(self) -> Iterable[TableQuery]: - """ - Method to yield TableQueries - """ - queries = [] - data = self.client.list_query_history( - start_date=self.start, - end_date=self.end, - ) - for row in data or []: - try: - if self.client.is_query_valid(row): - queries.append( - TableQuery( - dialect=self.dialect.value, - query=row.get("query_text"), - userName=row.get("user_name"), - startTime=str(row.get("query_start_time_ms")), - endTime=str(row.get("execution_end_time_ms")), - analysisDate=DateTime(datetime.now()), - serviceName=self.config.serviceName, - duration=row.get("duration") - if row.get("duration") - else None, - ) - ) - except Exception as err: - logger.debug(traceback.format_exc()) - logger.warning( - f"Failed to process query {row.get('query_text')} due to: {err}" - ) + sql_stmt = DATABRICKS_SQL_STATEMENT - yield TableQueries(queries=queries) + filters = """ + AND statement_type NOT IN ('SHOW', 'DESCRIBE', 'USE') + """ diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py index 8f36b033fc10..c4cf92666ba4 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py @@ -27,6 +27,7 @@ EntitiesEdge, LineageDetails, ) +from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException, Source @@ -111,7 +112,9 @@ def _get_lineage_details( ) ) if col_lineage: - return LineageDetails(columnsLineage=col_lineage) + return LineageDetails( + columnsLineage=col_lineage, source=LineageSource.QueryLineage + ) return None def _iter(self, *_, **__) -> Iterable[Either[AddLineageRequest]]: diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/query_parser.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/query_parser.py index 5a6b7933a28d..f2ac03b99f98 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/query_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/query_parser.py @@ -44,6 +44,13 @@ class UnityCatalogQueryParserSource( filters: str + def _init_super( + self, + config: WorkflowSource, + metadata: OpenMetadata, + ): + super().__init__(config, metadata, False) + # pylint: disable=super-init-not-called def __init__(self, config: WorkflowSource, metadata: OpenMetadata): self._init_super(config=config, metadata=metadata) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/usage.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/usage.py index 7a310f736a3f..c533454be8cb 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/usage.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/usage.py @@ -11,17 +11,22 @@ """ unity catalog usage module """ +import traceback +from datetime import datetime +from typing import Iterable -from metadata.ingestion.source.database.databricks.usage import DatabricksUsageSource +from metadata.generated.schema.type.basic import DateTime +from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery from metadata.ingestion.source.database.unitycatalog.query_parser import ( UnityCatalogQueryParserSource, ) +from metadata.ingestion.source.database.usage_source import UsageSource from metadata.utils.logger import ingestion_logger logger = ingestion_logger() -class UnitycatalogUsageSource(UnityCatalogQueryParserSource, DatabricksUsageSource): +class UnitycatalogUsageSource(UnityCatalogQueryParserSource, UsageSource): """ UnityCatalog Usage Source @@ -29,3 +34,37 @@ class UnitycatalogUsageSource(UnityCatalogQueryParserSource, DatabricksUsageSour DatabricksUsageSource as both the sources would call the same API for fetching Usage Queries """ + + def yield_table_queries(self) -> Iterable[TableQuery]: + """ + Method to yield TableQueries + """ + queries = [] + data = self.client.list_query_history( + start_date=self.start, + end_date=self.end, + ) + for row in data or []: + try: + if self.client.is_query_valid(row): + queries.append( + TableQuery( + dialect=self.dialect.value, + query=row.get("query_text"), + userName=row.get("user_name"), + startTime=str(row.get("query_start_time_ms")), + endTime=str(row.get("execution_end_time_ms")), + analysisDate=DateTime(datetime.now()), + serviceName=self.config.serviceName, + duration=row.get("duration") + if row.get("duration") + else None, + ) + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.warning( + f"Failed to process query {row.get('query_text')} due to: {err}" + ) + + yield TableQueries(queries=queries) diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/databricksConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/databricksConnection.json index cb8346c5b840..22756827f76d 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/databricksConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/databricksConnection.json @@ -64,6 +64,12 @@ "type": "integer", "default": 120 }, + "queryHistoryTable":{ + "title": "Query History Table", + "description": "Table name to fetch the query history.", + "type": "string", + "default": "system.query.history" + }, "connectionOptions": { "title": "Connection Options", "$ref": "../connectionBasicType.json#/definitions/connectionOptions" diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/databricksConnection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/databricksConnection.ts index 09d254d0284b..a5cf9eb179db 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/databricksConnection.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/databricksConnection.ts @@ -1,5 +1,5 @@ /* - * Copyright 2024 Collate. + * Copyright 2025 Collate. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -10,9 +10,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - - /** +/** * Databricks Connection Config */ export interface DatabricksConnection { @@ -43,7 +41,11 @@ export interface DatabricksConnection { /** * Databricks compute resources URL. */ - httpPath?: string; + httpPath?: string; + /** + * Table name to fetch the query history. + */ + queryHistoryTable?: string; sampleDataStorageConfig?: SampleDataStorageConfig; /** * SQLAlchemy driver scheme options.