Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(eap): write script to send scrubbed data into a gcs bucket #6698

Merged
merged 4 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions snuba/manual_jobs/extract_span_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from typing import Any, Mapping, Optional

from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster
from snuba.clusters.storage_sets import StorageSetKey
from snuba.manual_jobs import Job, JobLogger, JobSpec


class ExtractSpanData(Job):
def __init__(self, job_spec: JobSpec) -> None:
self.__validate_job_params(job_spec.params)
super().__init__(job_spec)

def __validate_job_params(self, params: Optional[Mapping[Any, Any]]) -> None:
assert params
required_params = [
"organization_ids",
"start_timestamp",
"end_timestamp",
"table_name",
"limit",
"gcp_bucket_name",
"output_file_path",
"allowed_keys",
]
for param in required_params:
assert param in params

self._organization_ids = params["organization_ids"]
self._start_timestamp = params["start_timestamp"]
self._end_timestamp = params["end_timestamp"]
self._table_name = params["table_name"]
self._limit = params["limit"]
self._gcp_bucket_name = params["gcp_bucket_name"]
self._output_file_path = params["output_file_path"]
self._allowed_keys = params["allowed_keys"]

def _generate_spans_query(self) -> str:
# Columns that should not be scrubbed
unscrubbed_columns = {
"span_id",
"trace_id",
"parent_span_id",
"segment_id",
"is_segment",
"_sort_timestamp",
"start_timestamp",
"end_timestamp",
"duration_micro",
"exclusive_time_micro",
"retention_days",
"sampling_factor",
"sampling_weight",
"sign",
}

base_columns = [
"organization_id",
"project_id",
"service",
"trace_id",
"span_id",
"parent_span_id",
"segment_id",
"segment_name",
"is_segment",
"_sort_timestamp",
"start_timestamp",
"end_timestamp",
"duration_micro",
"exclusive_time_micro",
"retention_days",
"name",
"sampling_factor",
"sampling_weight",
"sign",
]

map_columns: list[str] = []
for prefix in ["attr_str_", "attr_num_"]:
map_columns.extend(f"{prefix}{i}" for i in range(20))

all_columns = base_columns + map_columns

# We scrub all strings except for the allowed keys.
# To perform the scrubbing, we generate a salt based on the orgnization_id using sipHash128Reference (we use a different hash function for the salt so that we don't end up storing the salt).
# We then concatenate the salt with the value we are hashing and hash the result with BLAKE3.
scrubbed_columns = []
for col in all_columns:
if col in unscrubbed_columns:
scrubbed_columns.append(col)
elif col.startswith("attr_num"):
scrubbed_columns.append(
f"mapApply((k, v) -> (if(k in {self._allowed_keys}, k, BLAKE3(concat(sipHash128Reference(organization_id), k))), v), {col}) AS {col}_scrubbed"
)
elif col.startswith("attr_str"):
scrubbed_columns.append(
f"mapApply((k, v) -> (if(k in {self._allowed_keys}, k, BLAKE3(concat(sipHash128Reference(organization_id), k))), BLAKE3(concat(sipHash128Reference(organization_id), v))), {col}) AS {col}_scrubbed"
)
else:
scrubbed_columns.append(
f"BLAKE3(concat(salt, {col})) AS {col}_scrubbed"
)

query = f"""
SELECT
{', '.join(scrubbed_columns)}
FROM {self._table_name}
WHERE _sort_timestamp BETWEEN toDateTime('{self._start_timestamp}') AND toDateTime('{self._end_timestamp}')
AND organization_id IN {self._organization_ids}
LIMIT {self._limit}
"""

return query

def execute(self, logger: JobLogger) -> None:
cluster = get_cluster(StorageSetKey.EVENTS_ANALYTICS_PLATFORM)
connection = cluster.get_query_connection(ClickhouseClientSettings.QUERY)
davidtsuk marked this conversation as resolved.
Show resolved Hide resolved

query = f"""
INSERT INTO FUNCTION gcs('{self._gcp_bucket_name}/{self._output_file_path}',
'CSVWithNames',
'auto',
'gzip'
)
{self._generate_spans_query()}
"""

logger.info("Executing query")
connection.execute(query=query)
logger.info(
f"Data written to GCS bucket: {self._gcp_bucket_name}/{self._output_file_path}"
)
130 changes: 130 additions & 0 deletions tests/manual_jobs/test_extract_span_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import random
import uuid
from datetime import datetime, timedelta
from typing import Any, Mapping

import pytest

from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.manual_jobs import JobSpec
from snuba.manual_jobs.job_status import JobStatus
from snuba.manual_jobs.runner import run_job
from tests.helpers import write_raw_unprocessed_events


def _gen_message(
dt: datetime,
organization_id: int,
measurements: dict[str, dict[str, float]] | None = None,
tags: dict[str, str] | None = None,
) -> Mapping[str, Any]:
measurements = measurements or {}
tags = tags or {}
return {
"description": "/api/0/relays/projectconfigs/",
"duration_ms": 152,
"event_id": "d826225de75d42d6b2f01b957d51f18f",
"exclusive_time_ms": 0.228,
"is_segment": True,
"data": {
"sentry.environment": "development",
"thread.name": "uWSGIWorker1Core0",
"thread.id": "8522009600",
"sentry.segment.name": "/api/0/relays/projectconfigs/",
"sentry.sdk.name": "sentry.python.django",
"sentry.sdk.version": "2.7.0",
"my.float.field": 101.2,
"my.int.field": 2000,
"my.neg.field": -100,
"my.neg.float.field": -101.2,
"my.true.bool.field": True,
"my.false.bool.field": False,
},
"measurements": {
"num_of_spans": {"value": 50.0},
"eap.measurement": {"value": random.choice([1, 100, 1000])},
**measurements,
},
"organization_id": organization_id,
"origin": "auto.http.django",
"project_id": 1,
"received": 1721319572.877828,
"retention_days": 90,
"segment_id": "8873a98879faf06d",
"sentry_tags": {
"category": "http",
"environment": "development",
"op": "http.server",
"platform": "python",
"sdk.name": "sentry.python.django",
"sdk.version": "2.7.0",
"status": "ok",
"status_code": "200",
"thread.id": "8522009600",
"thread.name": "uWSGIWorker1Core0",
"trace.status": "ok",
"transaction": "/api/0/relays/projectconfigs/",
"transaction.method": "POST",
"transaction.op": "http.server",
"user": "ip:127.0.0.1",
},
"span_id": "123456781234567D",
"tags": {
"http.status_code": "200",
"relay_endpoint_version": "3",
"relay_id": "88888888-4444-4444-8444-cccccccccccc",
"relay_no_cache": "False",
"relay_protocol_version": "3",
"relay_use_post_or_schedule": "True",
"relay_use_post_or_schedule_rejected": "version",
"spans_over_limit": "False",
"server_name": "blah",
"color": random.choice(["red", "green", "blue"]),
"location": random.choice(["mobile", "frontend", "backend"]),
**tags,
},
"trace_id": uuid.uuid4().hex,
"start_timestamp_ms": int(dt.timestamp()) * 1000 - int(random.gauss(1000, 200)),
"start_timestamp_precise": dt.timestamp(),
"end_timestamp_precise": dt.timestamp() + 1,
}


@pytest.mark.clickhouse_db
@pytest.mark.redis_db
@pytest.mark.skip(reason="can't test writing to GCS")
def test_extract_span_data() -> None:
BASE_TIME = datetime.utcnow().replace(
minute=0, second=0, microsecond=0
) - timedelta(minutes=180)
organization_ids = [0, 1]
spans_storage = get_storage(StorageKey("eap_spans"))
messages = [
_gen_message(BASE_TIME - timedelta(minutes=i), organization_id)
for organization_id in organization_ids
for i in range(20)
]

write_raw_unprocessed_events(spans_storage, messages) # type: ignore

assert (
run_job(
JobSpec(
"jobid",
"ExtractSpanData",
False,
{
"organization_ids": [0, 1],
"start_timestamp": (BASE_TIME - timedelta(minutes=30)).isoformat(),
"end_timestamp": (BASE_TIME + timedelta(hours=24)).isoformat(),
"table_name": "snuba_test.eap_spans_2_local",
"limit": 1000000,
"output_file_path": "scrubbed_spans_data.csv.gz",
"gcp_bucket_name": "test-bucket",
"allowed_keys": ["sentry.span.op"],
},
)
)
== JobStatus.FINISHED
)
Loading