Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/enrichment docs #33012 #33561

Merged
merged 11 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions examples/notebooks/beam-ml/bigtable_enrichment_transform.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -605,15 +605,39 @@
},
{
"cell_type": "markdown",
"metadata": {
"id": "F-xjiP_pHWZr"
},
"source": [
"metadata": {},
"source": [
"### What is a Cross-Join?\n",
"A cross-join is a Cartesian product operation where each row from one table is combined with every row from another table. It is useful when we want to create all possible combinations of two datasets.\n",
"\n",
"**Example:**\n",
"Table A:\n",
" | A1 | A2 |\n",
" |----|----|\n",
" | 1 | X |\n",
" | 2 | Y |\n",
"\n",
"Table B:\n",
" | B1 | B2 |\n",
" |----|----|\n",
" | 10 | P |\n",
" | 20 | Q |\n",
"\n",
"**Result of Cross-Join:**\n",
" | A1 | A2 | B1 | B2 |\n",
" |----|----|----|----|\n",
" | 1 | X | 10 | P |\n",
" | 1 | X | 20 | Q |\n",
" | 2 | Y | 10 | P |\n",
" | 2 | Y | 20 | Q |\n",
"\n",
"Cross-joins can be computationally expensive for large datasets, so use them judiciously.\n",
"\n",
"By default, the enrichment transform performs a [`cross_join`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment.html#apache_beam.transforms.enrichment.cross_join). This join returns the enriched row with the following fields: `sale_id`, `customer_id`, `product_id`, `quantity`, `price`, and `customer_location`.\n",
"\n",
"To make a prediction when running the ecommerce example, however, the trained model needs the following fields: `product_id`, `quantity`, `price`, `customer_id`, and `customer_location`.\n",
"\n",
"Therefore, to get the required fields for the ecommerce example, design a custom join function that takes two dictionaries as input and returns an enriched row that include these fields."
"Therefore, to get the required fields for the ecommerce example, design a custom join function that takes two dictionaries as input and returns an enriched row that include these fields.\n"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,55 @@ The following examples demonstrate how to create a pipeline that use the enrichm
| Vertex AI Feature Store (Legacy) | [Enrichment with Legacy Vertex AI Feature Store](/documentation/transforms/python/elementwise/enrichment-vertexai/#example-2-enrichment-with-vertex-ai-feature-store-legacy) |
{{< /table >}}

## BigQuery Support

The enrichment transform supports integration with **BigQuery** to dynamically enrich data using BigQuery datasets. By leveraging BigQuery as an external data source, users can execute efficient lookups for data enrichment directly in their Apache Beam pipelines.

To use BigQuery for enrichment:
- Configure your BigQuery table as the data source for the enrichment process.
- Ensure your pipeline has the appropriate credentials and permissions to access the BigQuery dataset.
- Specify the query to extract the data to be used for enrichment.

This integration is particularly beneficial for use cases that require augmenting real-time streaming data with information stored in BigQuery.

---

## Batching

To optimize requests to external services, the enrichment transform uses batching. Instead of performing a lookup for each individual element, the transform groups multiple elements into a batch and performs a single lookup for the entire batch.

### Advantages of Batching:
- **Improved Throughput**: Reduces the number of network calls.
- **Lower Latency**: Fewer round trips to the external service.
- **Cost Optimization**: Minimizes API call costs when working with paid external services.

Users can configure the batch size by specifying parameters in their pipeline setup. Adjusting the batch size can help fine-tune the balance between throughput and latency.

---

## Caching with `with_redis_cache`

For frequently used enrichment data, caching can significantly improve performance by reducing repeated calls to the remote service. Apache Beam's [`with_redis_cache`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment.html#apache_beam.transforms.enrichment.Enrichment.with_redis_cache) method allows you to integrate a Redis cache into the enrichment pipeline.

### Benefits of Caching:
- **Reduced Latency**: Fetches enrichment data from the cache instead of making network calls.
- **Improved Resilience**: Minimizes the impact of network outages or service downtimes.
- **Scalability**: Handles large volumes of enrichment requests efficiently.

To enable caching:
1. Set up a Redis instance accessible by your pipeline.
2. Use the `with_redis_cache` method to configure the cache in your enrichment transform.
3. Specify the time-to-live (TTL) for cache entries to ensure data freshness.

Example:
```python
from apache_beam.transforms.enrichment import Enrichment

# Enrichment pipeline with Redis cache
enriched_data = (input_data
| 'Enrich with Cache' >> Enrichment(my_enrichment_transform).with_redis_cache(host, port))


## Related transforms

Not applicable.
Expand Down
Loading