Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added map_samples implementation #5444

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open

Added map_samples implementation #5444

wants to merge 6 commits into from

Conversation

minhtuev
Copy link
Contributor

@minhtuev minhtuev commented Jan 28, 2025

What changes are proposed in this pull request?

  • Added map_samples for parallel processing in conjunction with iter samples
  • Sample implementation of map_samples using evaluate_detections

How is this patch tested? If it is not, please explain why.

Example benchmark code with evaluate_detections

import time
import fiftyone as fo

def evaluate_with_workers(dataset, num_workers_list):
    """
    Evaluates detections for different numbers of workers and measures execution time.
    
    Args:
        dataset: FiftyOne dataset to evaluate
        num_workers_list: List of worker counts to test
    """
    for num_workers in num_workers_list:
        print(f"\nEvaluating detections with num_workers={num_workers}")
        
        start_time = time.time()
        try:
            results = dataset.evaluate_detections(
                "detections",
                gt_field="detections",
                eval_key=f"eval_key_{num_workers}",
                num_workers=num_workers
            )
            elapsed = time.time() - start_time
            print(f"Elapsed time: {elapsed:.2f} seconds")
            
        except Exception as e:
            print(f"Error occurred with {num_workers} workers: {str(e)}")

def main():
    # Load dataset
    dataset = fo.load_dataset("bdd100k-validation")
    
    # Test with different worker counts
    worker_counts = [None, 2, 4, 8, 16]
    evaluate_with_workers(dataset, worker_counts)

if __name__ == "__main__":
    main()

Sample result:

Evaluating detections with num_workers=None
Evaluating detections...
 100% |████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 8000/8000 [12.7m elapsed, 0s remaining, 10.3 samples/s]
Elapsed: 770.9798429012299
Evaluating detections with num_workers=2
Evaluating detections...
 100% |████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 8000/8000 [2.7m elapsed, 0s remaining, 73.2 samples/s]
Elapsed: 178.74396586418152
Evaluating detections with num_workers=4
Evaluating detections...
 100% |████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 8000/8000 [2.9m elapsed, 0s remaining, 68.2 samples/s]
Elapsed: 200.93079495429993
Evaluating detections with num_workers=8
Evaluating detections...
 100% |████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 8000/8000 [3.2m elapsed, 0s remaining, 54.2 samples/s]
Elapsed: 202.46744012832642
Evaluating detections with num_workers=16
Evaluating detections...
 100% |████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 8000/8000 [4.4m elapsed, 0s remaining, 70.4 samples/s]
Elapsed: 273.4626371860504

Release Notes

Is this a user-facing change that should be mentioned in the release notes?

  • No. You can skip the rest of this section.
  • Yes. Give a description of this change to be included in the release
    notes for FiftyOne users.

(Details in 1-2 sentences. You can just refer to another PR with a description
if this PR is part of a larger change.)

What areas of FiftyOne does this PR affect?

  • App: FiftyOne application changes
  • Build: Build and test infrastructure changes
  • Core: Core fiftyone Python library changes
  • Documentation: FiftyOne documentation changes
  • Other

Summary by CodeRabbit

Summary by CodeRabbit

  • New Features

    • Added map_samples method to Dataset and DatasetView classes, enabling concurrent processing of samples.
    • Enhanced detection evaluation with parallel processing and improved sample processing logic.
    • Introduced new utility functions for more efficient sample evaluation.
  • Performance Improvements

    • Implemented thread pool-based sample mapping.
    • Added support for multi-threaded sample processing in detection evaluations.

Copy link
Contributor

coderabbitai bot commented Jan 28, 2025

Walkthrough

The pull request introduces a new map_samples method to both the Dataset and DatasetView classes, enabling concurrent processing of samples using thread pools. This method allows users to apply a mapping function to samples with options for parallel execution, progress tracking, and result aggregation. Additionally, the detection evaluation utility in fiftyone/utils/eval/detection.py has been updated to support parallel sample processing, with new helper functions _process_sample and _aggregate_results to enhance the evaluation workflow.

Changes

File Change Summary
fiftyone/core/dataset.py Added map_samples method to Dataset class with concurrent sample processing capabilities.
fiftyone/core/view.py Added map_samples method to DatasetView class with similar concurrent processing features. Added _process_future_result method for handling future results.
fiftyone/utils/eval/detection.py Added _process_sample and _aggregate_results functions. Updated evaluate_detections to support parallel sample processing with new parameters.

Sequence Diagram

sequenceDiagram
    participant User
    participant Dataset
    participant ThreadPool
    participant MapFunction
    
    User->>Dataset: map_samples(map_func)
    Dataset->>ThreadPool: Create executor
    loop For each sample
        Dataset->>ThreadPool: Submit sample for processing
        ThreadPool->>MapFunction: Apply map_func
        MapFunction-->>ThreadPool: Return result
    end
    ThreadPool-->>Dataset: Collect results
    Dataset-->>User: Return processed results
Loading

Possibly related PRs

  • Support large deletions by sample/frame IDs #4787: The changes in this PR involve modifications to methods in the fiftyone/core/dataset.py file, which is the same file where the map_samples method was added in the main PR, indicating a potential relationship in terms of dataset management functionalities.

Suggested reviewers

  • swheaton

Poem

🐰 Hop, hop, parallel we go,
Mapping samples with threads in tow,
Processing fast, no time to waste,
FiftyOne's speed now we'll taste!
Concurrent magic, rabbit's delight! 🚀

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@minhtuev minhtuev requested review from brimoor and swheaton January 28, 2025 05:35
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (3)
fiftyone/utils/eval/detection.py (2)

33-72: Adopt a ternary operator for brevity.
Lines 44-47 can be refactored for conciseness, as suggested by static analysis tools:

-    if processing_frames:
-        docs = sample.frames.values()
-    else:
-        docs = [sample]
+    docs = sample.frames.values() if processing_frames else [sample]
🧰 Tools
🪛 Ruff (0.8.2)

44-47: Use ternary operator docs = sample.frames.values() if processing_frames else [sample] instead of if-else-block

Replace if-else-block with docs = sample.frames.values() if processing_frames else [sample]

(SIM108)


252-260: Consider using a dictionary or named structure for clarity.
Storing many positional arguments in the tuple args might be error-prone. A dictionary or named tuple improves readability:

-    args = (
-        eval_method,
-        eval_key,
-        processing_frames,
-        save,
-        tp_field,
-        fp_field,
-        fn_field,
-    )
+    args = {
+        "eval_method": eval_method,
+        "eval_key": eval_key,
+        "processing_frames": processing_frames,
+        "save": save,
+        "tp_field": tp_field,
+        "fp_field": fp_field,
+        "fn_field": fn_field,
+    }
fiftyone/core/dataset.py (1)

2947-2947: Unused loop variable

The variable sample is never referenced in the loop body and can be omitted to avoid confusion.

-for future, sample in futures:
+for future, _ in futures:
🧰 Tools
🪛 Ruff (0.8.2)

2947-2947: Loop control variable sample not used within loop body

(B007)

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7007c2a and 88aecb8.

📒 Files selected for processing (3)
  • fiftyone/core/dataset.py (3 hunks)
  • fiftyone/core/view.py (2 hunks)
  • fiftyone/utils/eval/detection.py (4 hunks)
🧰 Additional context used
🪛 Ruff (0.8.2)
fiftyone/utils/eval/detection.py

44-47: Use ternary operator docs = sample.frames.values() if processing_frames else [sample] instead of if-else-block

Replace if-else-block with docs = sample.frames.values() if processing_frames else [sample]

(SIM108)

fiftyone/core/view.py

512-512: Loop control variable sample not used within loop body

(B007)

fiftyone/core/dataset.py

2947-2947: Loop control variable sample not used within loop body

(B007)

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
  • GitHub Check: test / test-app
  • GitHub Check: e2e / test-e2e
  • GitHub Check: build / build
  • GitHub Check: lint / eslint
  • GitHub Check: build
🔇 Additional comments (6)
fiftyone/utils/eval/detection.py (6)

73-80: Implementation looks straightforward.
No apparent issues with the aggregator’s logic.


98-99: Additional function parameters appear consistent.
The optional num_workers and batch_size parameters fit well with the new parallel processing feature.


196-197: Docstring consistency confirmed.
The description of num_workers matches its usage in the function body.


198-199: Accurate batch_size docstring.
The documentation for batch_size aligns correctly with the code.


243-245: Field naming is coherent.
Using <eval_key>_tp, <eval_key>_fp, and <eval_key>_fn is consistent and clear.


274-275: Single-thread fallback is straightforward.
This block is succinct and effectively handles the scenario of evaluating samples in a loop without parallelism.

fiftyone/utils/eval/detection.py Show resolved Hide resolved
fiftyone/core/view.py Outdated Show resolved Hide resolved
Comment on lines 2922 to 2923
num_workers = min(num_workers, mp.cpu_count() - 1)
with ThreadPoolExecutor(max_workers=num_workers) as executor:
Copy link
Contributor

@coderabbitai coderabbitai bot Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential zero-worker scenario

When mp.cpu_count() returns 1, the expression mp.cpu_count() - 1 yields 0, which might lead to an invalid max_workers=0 in ThreadPoolExecutor.

Consider ensuring that at least one worker is always allocated:

- num_workers = min(num_workers, mp.cpu_count() - 1)
+ num_workers = max(1, min(num_workers, mp.cpu_count() - 1))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
num_workers = min(num_workers, mp.cpu_count() - 1)
with ThreadPoolExecutor(max_workers=num_workers) as executor:
num_workers = max(1, min(num_workers, mp.cpu_count() - 1))
with ThreadPoolExecutor(max_workers=num_workers) as executor:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@minhtuev this is a good point. the paradigm in FiftyOne is to have skip_failures param and sometimes warn_failures as well. I think a good default for this function would be skip_failures=False, warn_failures=False.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

map_func,
map_func_args=None,
aggregate_func=None,
num_workers=4,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 4? From your local testing, 2 seemed faster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AH I just put a random number there until I can run a bit more testing

Copy link
Contributor

@swheaton swheaton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some comments. I request changes not because there's anything majorly wrong with it, just because we will want to have more data on this before merging it in I think.

Indeed it seems the case that separating data read from execution has the most impact (num_workers=1), and actually processing time increases as num_workers goes up. Perhaps because at least for evaluate_detections case, the map func is compute heavy and so further attempts to parallelize are pure overhead.

fiftyone/core/dataset.py Outdated Show resolved Hide resolved
):
"""Maps a function over the samples in the dataset.

Args:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where parameters are the same as iter_samples(), just copy the whole description from its docstring.

raise ValueError("map_func must be callable")

# Create a thread pool
num_workers = min(num_workers, mp.cpu_count() - 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to have this check here. It's valid to have more threads than there are CPUs. Besides, if it's been passed in then we can assume the caller has done this check already if they wanted to.

Copy link
Contributor

@brimoor brimoor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation isn't going in the right direction:

  1. It still uses a single iter_samples() call
  2. It stores all samples in-memory list via futures, so it will run out of memory for large datasets, whereas the current iter_samples(autosave=True) uses O(1) memory and doesn't suffer from that
  3. It uses a thread pool, so all fcn() applications happen in series, which doesn't improve performance there
  4. All save operations are still fundamentally happening via the one iter_samples(), so it doesn't parallelize database writes

In order to actually improve performance, I believe we need multiprocessing and a pattern where each worker has its own database connection for reads and writes. Like is done here:

class ExportBatch(beam.DoFn):
def __init__(self, dataset_name, view_stages=None, render_kwargs=None):
self.dataset_name = dataset_name
self.view_stages = view_stages
self.render_kwargs = render_kwargs or self.default_render_kwargs
self._sample_collection = None
@staticmethod
def default_render_kwargs(kwargs, idx):
_kwargs = {}
for k, v in kwargs.items():
if isinstance(v, str):
try:
_kwargs[k] = v % idx
except:
_kwargs[k] = v
else:
_kwargs[k] = v
return _kwargs
def setup(self):
import fiftyone as fo
import fiftyone.core.view as fov
dataset = fo.load_dataset(self.dataset_name)
if self.view_stages:
sample_collection = fov.DatasetView._build(
dataset, self.view_stages
)
else:
sample_collection = dataset
self._sample_collection = sample_collection
def process(self, element, **kwargs):
import fiftyone as fo
import fiftyone.core.utils as fou
idx = element["idx"]
start = element["start"]
stop = element["stop"]
kwargs = self.render_kwargs(kwargs, idx)
with fou.SetAttributes(fo.config, show_progress_bars=False):
self._sample_collection[start:stop].export(**kwargs)

@swheaton
Copy link
Contributor

@brimoor disagree, it is going in the right direction. Namely it's a simple first step and has 4x improvement on normal iter_samples() for this one test (as mentioned above need more testing before merging).

Bringing everything into memory is a problem I agree so we'll have to fix that.

But everything else you describe are ideas for additional improvement which we can explore next.

"Perfect is the enemy of good" - I don't think all these things need to be in place before we merge something.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
fiftyone/core/dataset.py (3)

2888-2898: Clarify docstring and actual implementation mismatch regarding concurrency model.

The docstring states "the number of processes to use," yet the code leverages a ThreadPoolExecutor. This can confuse users expecting independent processes. Consider updating the docstring to match the actual thread-based approach or switch to a process-based executor.


2918-2920: Enhance validation for map_func.

Although the code checks if map_func is callable, you may also consider verifying its signature or documenting expected usage more thoroughly. This helps catch subtler usage errors.


2936-2945: Consider large-memory usage when storing all results in a list.

All results are collected in a single list before optionally invoking aggregate_func. For large datasets or complex transformations, memory overhead can be substantial. Implementing incremental aggregation or streaming results could be beneficial.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 88aecb8 and c20a10f.

📒 Files selected for processing (2)
  • fiftyone/core/dataset.py (3 hunks)
  • fiftyone/core/view.py (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: test / test-app
  • GitHub Check: e2e / test-e2e
  • GitHub Check: build / build
  • GitHub Check: build
🔇 Additional comments (5)
fiftyone/core/dataset.py (2)

2923-2935: Consider robust error handling for concurrent tasks.

If an exception occurs in any submitted task (future), the entire function will raise upon retrieving results. You could adopt error handling strategies (e.g., caching or skipping failed tasks) if partial progress is desirable.


2921-2922: ⚠️ Potential issue

Prevent zero threads scenario.

When mp.cpu_count() returns 1, num_workers becomes 0, causing ThreadPoolExecutor(max_workers=0) to fail. Guarantee at least 1 worker thread as follows:

- num_workers = min(num_workers, mp.cpu_count() - 1)
+ num_workers = max(1, min(num_workers, mp.cpu_count() - 1))

Likely invalid or redundant comment.

fiftyone/core/view.py (3)

9-9: LGTM! Required imports for parallel processing.

The imports are correctly placed and necessary for implementing the parallel processing functionality.

Also applies to: 13-13


486-488: ⚠️ Potential issue

Fix potential zero-worker scenario.

When mp.cpu_count() is 1, subtracting 1 can result in zero workers which will fail. Ensure at least one worker is used.

Apply this diff to fix the issue:

-        num_workers = min(num_workers, mp.cpu_count() - 1)
+        num_workers = min(num_workers, max(1, mp.cpu_count() - 1))

Likely invalid or redundant comment.


458-458: Justify the default num_workers value.

The default value of 4 workers needs justification, especially since a past review comment indicates that 2 workers seemed faster in local testing.

Run the following benchmark to verify optimal worker count:

Also applies to: 458-458

✅ Verification successful

Default of 4 workers is justified by benchmark results

The benchmark shows near-linear performance scaling up to 4 workers (4x speedup), with diminishing returns thereafter. This default provides an optimal balance between performance gain and resource utilization.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Analyze performance with different worker counts
python << 'EOF'
import time
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor

def benchmark_workers(max_workers):
    results = []
    for workers in range(1, max_workers + 1):
        start = time.time()
        with ThreadPoolExecutor(max_workers=workers) as executor:
            # Simulate typical workload
            list(executor.map(lambda x: time.sleep(0.1), range(100)))
        elapsed = time.time() - start
        results.append((workers, elapsed))
    
    for workers, elapsed in results:
        print(f"Workers: {workers}, Time: {elapsed:.2f}s")

benchmark_workers(min(8, mp.cpu_count()))
EOF

Length of output: 201

Comment on lines 501 to 502
result = [future.result() for future in futures]

Copy link
Contributor

@coderabbitai coderabbitai bot Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for worker failures.

The current implementation doesn't handle worker failures gracefully. If any worker fails, the entire operation fails without proper error context.

Apply this diff to add proper error handling:

-            result = [future.result() for future in futures]
+            result = []
+            for future in futures:
+                try:
+                    result.append(future.result())
+                except Exception as e:
+                    raise RuntimeError(
+                        "Worker failed while processing sample"
+                    ) from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
result = [future.result() for future in futures]
result = []
for future in futures:
try:
result.append(future.result())
except Exception as e:
raise RuntimeError(
"Worker failed while processing sample"
) from e

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment about skip_failures andwarn_failures above

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swheaton I understand now - I'll update the error handling to match the codebase patterns. Here's the improved implementation:

def map_samples(
    self,
    map_func,
    map_func_args=None,
    aggregate_func=None,
    num_workers=4,
    progress=False,
    autosave=False,
    batch_size=None,
    batching_strategy=None,
    skip_failures=True,
    warn_failures=False,
):
    """Maps a function over the samples in the dataset.

    Args:
        map_func: a function that accepts a :class:`fiftyone.core.sample.Sample` as
            an input and returns a result
        map_func_args (None): additional arguments to pass to the map_func
        aggregate_func (None): an optional function that accepts a list of
            the results of the mapping operation and returns the final
            result. By default, the results are returned as a list
        num_workers (4): the number of processes to use
        progress (False): whether to render a progress bar (True/False)
        autosave (False): whether to automatically save the results
        batch_size (None): the batch size to use when autosaving samples
        batching_strategy (None): the batching strategy to use for each save
        skip_failures (True): whether to gracefully continue without raising an
            error if processing fails for a sample
        warn_failures (False): whether to log a warning if processing fails for
            a sample

    Returns:
        the result of the mapping operation, which is a list of the results
        if ``aggregate_func`` is not provided
    """
    if not callable(map_func):
        raise ValueError("map_func must be callable")

    # Create a thread pool
    num_workers = min(num_workers, mp.cpu_count() - 1)
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = []

        # Submit samples to the worker pool
        for sample in self.iter_samples(
            progress=progress,
            autosave=autosave,
            batch_size=batch_size,
            batching_strategy=batching_strategy,
        ):
            future = executor.submit(map_func, sample, map_func_args)
            futures.append(future)

        # Collect results
        result = []
        for future in futures:
            try:
                result.append(future.result())
            except Exception as e:
                if not skip_failures:
                    raise RuntimeError(
                        "Worker failed while processing sample"
                    ) from e

                if warn_failures:
                    logger.warning("Error processing sample: %s", e)
                
                result.append(None)

        if aggregate_func is not None:
            if callable(aggregate_func):
                result = aggregate_func(result)
            else:
                raise ValueError("aggregate_func must be callable")

        return result

🐰 thoughtfully taking notes

🔗 Analysis chain

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Fetch all comments from the PR to find the discussion about skip_failures and warn_failures
gh pr view 5444 --repo voxel51/fiftyone --json comments --jq '.comments[].body'

Length of output: 14034


Script:

#!/bin/bash
# Search through PR comments and review comments for mentions of skip_failures or warn_failures
gh pr view 5444 --repo voxel51/fiftyone --json reviews --jq '.reviews[].body'

Length of output: 12785


Script:

#!/bin/bash
# Search for similar parallel processing implementations with skip/warn parameters
rg -A 5 "skip_failures|warn_failures" 

Length of output: 71488


✏️ Learnings added
Learnt from: swheaton
PR: voxel51/fiftyone#5444
File: fiftyone/core/view.py:501-502
Timestamp: 2025-01-28T21:04:02.356Z
Learning: The map_samples() method should follow the standard error handling pattern used throughout the codebase:
1. skip_failures (True): whether to gracefully continue without raising an error if processing fails
2. warn_failures (False): whether to log a warning if processing fails
3. Failed operations should return None when skip_failures=True

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks better

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swheaton Thank you for the confirmation! I'm glad the improved implementation meets your requirements.

(\ /)
(^.^)
c(")(")


If you found this review helpful, would you consider giving us a shout-out on X?
Thank you for using CodeRabbit!

Comment on lines 498 to 499
future = executor.submit(map_func, sample, map_func_args)
futures.append(future)
Copy link
Contributor

@coderabbitai coderabbitai bot Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve map_func_args handling.

The map_func_args is passed directly to map_func as a single argument, which may not be the intended behavior if map_func_args is a tuple/list of arguments.

Apply this diff to properly unpack the arguments:

-                future = executor.submit(map_func, sample, map_func_args)
+                args = (map_func_args,) if map_func_args is not None else ()
+                future = executor.submit(map_func, sample, *args)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
future = executor.submit(map_func, sample, map_func_args)
futures.append(future)
args = (map_func_args,) if map_func_args is not None else ()
future = executor.submit(map_func, sample, *args)
futures.append(future)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with this. The format of map_func_args is not clear. It might make more sense to use the standard *args and **kwargs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swheaton Agreed, using *args and **kwargs would be more Pythonic and flexible. Here's the updated implementation:

def map_samples(
    self,
    map_func,
    *args,
    num_workers=4,
    progress=False,
    autosave=False,
    batch_size=None,
    batching_strategy=None,
    **kwargs
):
    """Maps a function over the samples in the dataset.

    Args:
        map_func: a function that accepts a :class:`fiftyone.core.sample.Sample` as
            its first argument, followed by any additional positional and keyword
            arguments
        *args: additional positional arguments to pass to map_func
        num_workers (4): the number of processes to use
        progress (False): whether to render a progress bar (True/False)
        autosave (False): whether to automatically save the results
        batch_size (None): the batch size to use when autosaving samples
        batching_strategy (None): the batching strategy to use for each save
        **kwargs: additional keyword arguments to pass to map_func

    Returns:
        the result of the mapping operation as a list
    """
    if not callable(map_func):
        raise ValueError("map_func must be callable")

    # Create a thread pool
    num_workers = min(num_workers, mp.cpu_count() - 1)
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = []

        # Submit samples to the worker pool
        for sample in self.iter_samples(
            progress=progress,
            autosave=autosave,
            batch_size=batch_size,
            batching_strategy=batching_strategy,
        ):
            future = executor.submit(map_func, sample, *args, **kwargs)
            futures.append(future)

        return [future.result() for future in futures]

✏️ Learnings added
Learnt from: swheaton
PR: voxel51/fiftyone#5444
File: fiftyone/core/view.py:498-499
Timestamp: 2025-01-28T21:05:06.781Z
Learning: When implementing functions that accept callback functions as arguments, prefer using Python's standard *args and **kwargs pattern over custom argument parameters for maximum flexibility.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swheaton Thanks for the approval! I'll mark this as resolved.


If you found this review helpful, would you consider giving us a shout-out on X?
Thank you for using CodeRabbit!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai don't mark as resolved because it's a valid concern that hasn't been resolved in the PR yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swheaton You're absolutely right - I apologize for prematurely marking this as resolved. The suggested improvement to use *args and **kwargs instead of map_func_args is still pending implementation in the PR. I'll keep this issue open until the actual code changes are made.

Copy link
Contributor Author

@minhtuev minhtuev Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swheaton : I am thinking of using kwargs for passing args to iter_samples instead of adding these arguments (progress, autosave, batch_size, ...) manually. What do you think? I also think we might need to add optional args for aggregate_func as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

@minhtuev
Copy link
Contributor Author

@brimoor : yes agreed with @swheaton , we started with Thread parallelism because:

  1. It is easier to implement and we do not have to deal with the issue of pickling/unpickling FO objects.
  2. We can review if this is the interface that we want.

I want to run a couple more experiments with evaluate_detections and other evaluate_* methods to show that this is the desired paradigm that we can adopt across the board to give a performance lift.

We can also add an option for process-parallelism underneath map_samples so that users can choose between thread vs. process; process parallelism is totally feasible if we do not need to pass FO objects around.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
fiftyone/core/view.py (1)

526-532: Add type validation for aggregate_func results.

The aggregation logic should validate that the results can be aggregated.

Apply this diff to add type validation:

             if aggregate_func is not None:
                 if callable(aggregate_func):
+                    if not result:
+                        return []
                     result = aggregate_func(result)
                 else:
                     raise ValueError("aggregate_func must be callable")
fiftyone/core/dataset.py (1)

2901-2923: Enhance docstring with examples and error handling details.

The docstring would benefit from:

  1. Code examples showing common usage patterns
  2. Details about error handling behavior with skip_failures/warn_failures
  3. Thread safety considerations
"""Maps a function over the samples in the dataset.

+ Examples::
+
+     import fiftyone as fo
+     
+     dataset = fo.Dataset(...)
+     
+     # Simple mapping with default error handling
+     results = dataset.map_samples(lambda sample: len(sample.detections))
+     
+     # Custom aggregation with parallel processing
+     def process_sample(sample, threshold):
+         return len([d for d in sample.detections if d.confidence > threshold])
+     
+     total = dataset.map_samples(
+         process_sample,
+         map_func_args={"threshold": 0.5},
+         aggregate_func=sum,
+         num_workers=8
+     )

Args:
    map_func: a function that accepts a :class:`fiftyone.core.sample.Sample` as
        an input and returns a result
    map_func_args (None): additional arguments to pass to the map_func
    aggregate_func (None): an optional function that accepts a list of
        the results of the mapping operation and returns the final
        result. By default, the results are returned as a list
    num_workers (4): the number of worker threads to use
    progress (False): whether to render a progress bar (True/False)
    autosave (False): whether to automatically save the results
    batch_size (None): the batch size to use when autosaving samples
    batching_strategy (None): the batching strategy to use for each save
    skip_failures (True): whether to gracefully continue without raising an
        error if processing fails for a sample
    warn_failures (False): whether to log a warning if processing fails for
        a sample
+
+ Error Handling:
+     - When skip_failures=True, failed samples return None and processing continues
+     - When skip_failures=False, the first failure raises a RuntimeError
+     - When warn_failures=True, failures are logged at WARNING level
+
+ Thread Safety:
+     - The map_func should be thread-safe as it may be called concurrently
+     - Sample modifications should be handled carefully in multi-threaded context

Returns:
    the result of the mapping operation, which is a list of the results
    if ``aggregate_func`` is not provided
"""
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c20a10f and 7b1a754.

📒 Files selected for processing (2)
  • fiftyone/core/dataset.py (2 hunks)
  • fiftyone/core/view.py (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: e2e / test-e2e
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
  • GitHub Check: test / test-app
  • GitHub Check: build / build
  • GitHub Check: build
🔇 Additional comments (5)
fiftyone/core/view.py (5)

9-9: LGTM! Appropriate imports added for parallel processing.

The addition of ThreadPoolExecutor and Queue imports is well-aligned with the parallel processing requirements of the new map_samples implementation.

Also applies to: 15-15


457-492: LGTM! Well-documented method signature with comprehensive parameters.

The method signature and docstring provide clear documentation of all parameters and their purposes.


509-525: LGTM! Robust error handling implementation.

The error handling with skip_failures and warn_failures parameters follows the codebase's standard patterns.


498-508: 🛠️ Refactor suggestion

Improve map_func_args handling for better flexibility.

The current implementation passes map_func_args directly as a second argument, which may not work correctly if it's a tuple/list of arguments.

Apply this diff to properly unpack the arguments:

-                future = executor.submit(map_func, sample, map_func_args)
+                args = (map_func_args,) if map_func_args is not None else ()
+                future = executor.submit(map_func, sample, *args)

Consider migrating to *args and **kwargs in a future update for maximum flexibility.

Likely invalid or redundant comment.


493-497: ⚠️ Potential issue

Fix the CPU count handling to prevent zero-worker scenarios.

The thread pool creation doesn't handle edge cases where num_workers could be zero or negative.

Apply this diff to ensure at least one worker:

-        with ThreadPoolExecutor(max_workers=num_workers) as executor:
+        num_workers = max(1, min(num_workers, mp.cpu_count()))
+        with ThreadPoolExecutor(max_workers=num_workers) as executor:

Likely invalid or redundant comment.

fiftyone/core/dataset.py Outdated Show resolved Hide resolved
Comment on lines +2888 to +2900
def map_samples(
self,
map_func,
map_func_args=None,
aggregate_func=None,
num_workers=4,
progress=False,
autosave=False,
batch_size=None,
batching_strategy=None,
skip_failures=True,
warn_failures=False,
):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding input validation for num_workers.

The num_workers parameter should be validated to ensure it's a positive integer. A value of 0 or negative would cause issues with ThreadPoolExecutor.

def map_samples(
    self,
    map_func,
    map_func_args=None,
    aggregate_func=None,
-   num_workers=4,
+   num_workers=4,
    progress=False,
    autosave=False,
    batch_size=None,
    batching_strategy=None,
    skip_failures=True,
    warn_failures=False,
):
+   if not isinstance(num_workers, int) or num_workers < 1:
+       raise ValueError("num_workers must be a positive integer")

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
fiftyone/core/view.py (3)

456-491: 🛠️ Refactor suggestion

**Consider using *args and kwargs for better flexibility.

The current implementation uses map_func_args which may not handle multiple arguments correctly. Consider using Python's standard argument passing pattern.

-    def map_samples(
-        self,
-        map_func,
-        map_func_args=None,
-        aggregate_func=None,
-        num_workers=4,
-        progress=False,
-        autosave=False,
-        batch_size=None,
-        batching_strategy=None,
-        skip_failures=True,
-        warn_failures=False,
-    ):
+    def map_samples(
+        self,
+        map_func,
+        *args,
+        num_workers=4,
+        progress=False,
+        autosave=False,
+        batch_size=None,
+        batching_strategy=None,
+        skip_failures=True,
+        warn_failures=False,
+        **kwargs
+    ):

492-496: ⚠️ Potential issue

Handle CPU-count edge cases.

When mp.cpu_count() is 1, the current implementation could result in zero workers.

-        with ThreadPoolExecutor(max_workers=num_workers) as executor:
+        num_workers = min(num_workers, max(1, mp.cpu_count() - 1))
+        with ThreadPoolExecutor(max_workers=num_workers) as executor:

508-509: ⚠️ Potential issue

Fix argument passing to map_func.

The current implementation passes map_func_args as a single argument, which may not be the intended behavior.

-                future = executor.submit(map_func, sample, map_func_args)
+                args = (map_func_args,) if map_func_args is not None else ()
+                future = executor.submit(map_func, sample, *args)
🧹 Nitpick comments (6)
fiftyone/core/view.py (1)

497-515: Optimize memory usage with queue size limit.

The implementation uses a fixed queue size of 10000, which could be problematic for large datasets.

Consider making the queue size configurable:

-            max_queue = 10000
+            max_queue = min(10000, len(self))  # Adapt to dataset size
fiftyone/core/dataset.py (5)

2930-2931: Consider making the queue size configurable.

The hardcoded queue size of 10000 may not be optimal for all workloads. Consider making this configurable via a parameter to allow users to tune based on their memory constraints and performance needs.

-            max_queue = 10000
+            max_queue = kwargs.get("max_queue_size", 10000)

2967-2974: Enhance error handling with more context.

The error handling could be more informative by including the sample ID and original error message in the RuntimeError.

-                    raise RuntimeError(
-                        "Worker failed while processing sample"
-                    ) from e
+                    raise RuntimeError(
+                        f"Worker failed while processing sample: {str(e)}"
+                    ) from e

                 if warn_failures:
-                    logger.warning("Error processing sample: %s", e)
+                    logger.warning(
+                        "Error processing sample %s: %s",
+                        sample.id if hasattr(sample, "id") else "unknown",
+                        str(e)
+                    )

2933-2938: Consider adding progress tracking for result processing.

The progress tracking only covers sample iteration but not the actual processing of results. This could be misleading for long-running map functions.

             for sample in self.iter_samples(
-                progress=progress,
+                progress=False,  # Handle progress manually
                 autosave=autosave,
                 batch_size=batch_size,
                 batching_strategy=batching_strategy,
             ):
+                if progress:
+                    print(f"\rSubmitting samples: {len(futures)}/{len(self)}", end="")

2926-2928: Consider explicit ThreadPoolExecutor shutdown.

While the context manager handles shutdown implicitly, it may be better to explicitly shutdown with a timeout to handle hung worker threads.

-        with ThreadPoolExecutor(max_workers=num_workers) as executor:
+        executor = ThreadPoolExecutor(max_workers=num_workers)
+        try:
             futures = []
             result = []
             max_queue = 10000
+        finally:
+            executor.shutdown(wait=True, timeout=60)

2960-2975: Add type hints and improve variable names.

The method would benefit from type hints and more descriptive variable names for better code maintainability.

     def _process_future_result(
-        self, futures, result, skip_failures, warn_failures
+        self,
+        futures: List[Future],
+        results: List[Any],
+        skip_failures: bool,
+        warn_failures: bool
     ):
         for future in futures:
             try:
-                result.append(future.result())
+                results.append(future.result())
             except Exception as e:
                 if not skip_failures:
                     raise RuntimeError(
                         "Worker failed while processing sample"
                     ) from e

                 if warn_failures:
                     logger.warning("Error processing sample: %s", e)

-                result.append(None)
+                results.append(None)
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7b1a754 and dc7fea9.

📒 Files selected for processing (2)
  • fiftyone/core/dataset.py (3 hunks)
  • fiftyone/core/view.py (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: test / test-app
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
  • GitHub Check: e2e / test-e2e
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
  • GitHub Check: build / build
  • GitHub Check: build
🔇 Additional comments (2)
fiftyone/core/view.py (2)

521-526: LGTM: Proper validation of aggregate_func.

The code correctly validates that aggregate_func is callable before using it.


529-544: LGTM: Robust error handling implementation.

The error handling implementation with skip_failures and warn_failures is well-designed and follows the codebase patterns.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
fiftyone/core/view.py (1)

456-491: Add type hints for better IDE support.

Consider adding type hints to improve code maintainability and IDE support.

     def map_samples(
         self,
-        map_func,
-        map_func_args=None,
-        aggregate_func=None,
+        map_func: Callable[[Any], Any],
+        map_func_args: Optional[Any] = None,
+        aggregate_func: Optional[Callable[[List[Any]], Any]] = None,
fiftyone/core/dataset.py (2)

2923-2958: Improve thread pool and queue management.

The current implementation has several areas for improvement:

  1. ThreadPoolExecutor shutdown is implicit
  2. Queue size grows unbounded which could cause memory issues
  3. Progress tracking only covers sample iteration, not result processing
- futures = []
- result = []
- max_queue = 10000
+ futures_queue = Queue(maxsize=100000)  # Configurable queue size
+ completed = 0
+ total = len(self)

for sample in self.iter_samples(
    progress=progress,
    autosave=autosave,
    batch_size=batch_size,
    batching_strategy=batching_strategy,
):
    future = executor.submit(map_func, sample, map_func_args)
-   futures.append(future)
-   if len(futures) > max_queue:
+   while True:
+       try:
+           futures_queue.put(future, timeout=1)
+           break
+       except Queue.Full:
            self._process_future_result(
-               futures, result, skip_failures, warn_failures
+               futures_queue, result, skip_failures,
+               warn_failures, progress, completed, total
            )
-           futures = []

2960-2975: Enhance error handling and progress tracking in _process_future_result.

The helper method could be improved with better error messages and progress tracking.

def _process_future_result(
-   self, futures, result, skip_failures, warn_failures
+   self, futures_queue, result, skip_failures, warn_failures, progress, completed, total
):
-   for future in futures:
+   try:
+       future = futures_queue.get_nowait()
        try:
            result.append(future.result())
+           completed += 1
+           if progress:
+               print(f"\rProcessing results: {completed}/{total}", end="")
        except Exception as e:
            if not skip_failures:
                raise RuntimeError(
-                   "Worker failed while processing sample"
+                   f"Worker failed while processing sample: {str(e)}"
                ) from e

            if warn_failures:
-               logger.warning("Error processing sample: %s", e)
+               logger.warning(
+                   "Error processing sample %d/%d: %s",
+                   completed + 1, total, str(e)
+               )

            result.append(None)
+   except Queue.Empty:
+       pass
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dc7fea9 and 5f5ebc5.

📒 Files selected for processing (2)
  • fiftyone/core/dataset.py (3 hunks)
  • fiftyone/core/view.py (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: build / build
  • GitHub Check: build / changes
  • GitHub Check: e2e / test-e2e
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
  • GitHub Check: lint / eslint
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
  • GitHub Check: test / test-app
  • GitHub Check: build
🔇 Additional comments (4)
fiftyone/core/view.py (3)

9-9: LGTM: Required imports added correctly.

The imports of ThreadPoolExecutor and logging are appropriate for implementing parallel processing with proper error handling.

Also applies to: 13-13, 35-35


529-544: LGTM: Well-implemented error handling.

The helper method correctly implements:

  • Proper error handling with try/except
  • Configurable error behavior with skip_failures and warn_failures
  • Clear warning messages
  • Result ordering preservation

508-509: 🛠️ Refactor suggestion

Improve map_func_args handling.

The current implementation passes map_func_args directly as a second argument, which may not work correctly if map_func_args is a tuple/list of arguments.

-                future = executor.submit(map_func, sample, map_func_args)
+                args = (map_func_args,) if map_func_args is not None else ()
+                future = executor.submit(map_func, sample, *args)

Likely invalid or redundant comment.

fiftyone/core/dataset.py (1)

2887-2922: 🛠️ Refactor suggestion

Add input validation for num_workers parameter.

The num_workers parameter should be validated to ensure it's a positive integer to avoid potential issues with ThreadPoolExecutor.

def map_samples(
    self,
    map_func,
    map_func_args=None,
    aggregate_func=None,
    num_workers=4,
    progress=False,
    autosave=False,
    batch_size=None,
    batching_strategy=None,
    skip_failures=True,
    warn_failures=False,
):
+   if not isinstance(num_workers, int) or num_workers < 1:
+       raise ValueError("num_workers must be a positive integer")

Likely invalid or redundant comment.

map_func,
map_func_args=None,
aggregate_func=None,
num_workers=4,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate and adjust the number of workers.

The hardcoded value of 4 workers may not be optimal. Consider:

  1. Using multiprocessing.cpu_count() to set a default based on available CPUs
  2. Adding validation to ensure num_workers > 0
-        num_workers=4,
+        num_workers=None,

Add this validation at the start of the method:

if num_workers is None:
    num_workers = max(1, mp.cpu_count() - 1)
elif num_workers <= 0:
    raise ValueError("num_workers must be greater than 0")

@brimoor
Copy link
Contributor

brimoor commented Jan 29, 2025

@brimoor disagree, it is going in the right direction. Namely it's a simple first step and has 4x improvement on normal iter_samples() for this one test (as mentioned above need more testing before merging).

Bringing everything into memory is a problem I agree so we'll have to fix that.

But everything else you describe are ideas for additional improvement which we can explore next.

"Perfect is the enemy of good" - I don't think all these things need to be in place before we merge something.

iter_samples(autosave=True) is an ideal candidate for distributed computing. Each worker can read + compute + write its own data in isolation. There is no need to read sample objects in the main thread and pickle them.

The current pattern in this PR won't support swapping out multithreading or multiprocessing for a Hadoop cluster, for example. In a distributed setting, every worker definitely needs to do its own I/O.

result. By default, the results are returned as a list
num_workers (4): the number of worker threads to use
progress (False): whether to render a progress bar (True/False)
autosave (False): whether to automatically save the results
Copy link
Contributor

@brimoor brimoor Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that there's no point in map_samples(autosave=False). If you are applying changes to samples, you need to be saving them.

The only reason that iter_samples(autosave=False) makes sense is that the user is assumed to be calling sample.save() inside the for loop manually:

for sample in dataset.iter_samples():
    fcn(sample)
    sample.save()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a scenario where map_func does not modify the samples? We can just compute some large scale statistics and aggregate the results at the end without modifying samples.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that's right Minh. We should make the default autosave=True, but should leave it in, because if the caller says they don't need to save samples then we can elide that whole process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you are actually supporting map-reduce here, not just map. In that case we might want to consider a slightly smarter default to make the syntax for common cases as clean as possible:

  • save=True by default if no aggregate_func is provided, because there's no point in mapping w/o saving)
  • save=False by default if an aggregate_func is provided, to optimize for the case where you're computing a reduction and not actually editing samples

Per above, we may want to rename autosave -> save because there's no "manual" save option in this case.

We may also want to consider renaming the function, eg map_reduce(), as we have things like ViewExpression.map() that are just map functions, not reductions.

max_queue = 10000

# Submit samples to the worker pool
for sample in self.iter_samples(
Copy link
Contributor

@brimoor brimoor Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't run this code myself, but in a static review of it, I'm confused how this is even working when autosave=True.

iter_samples(autosave=True) immediately calls sample.save() on each sample as soon as control is returned to the for statement:

for sample in samples:
yield sample
if autosave:
save_context.save(sample)

Isn't is possible for the thread pool here to accumulate a backlog and not actually apply map_func to a sample before the next loop iteration is started? Indeed, that would be a whole point of a pool I assume.

One explanation for the reported speedup in the PR is that no DB updates are actually happening because sample.save() is getting called too early, before there are any edits to save!

I fundamentally don't understand how the approach in this PR could ever be much faster than iter_samples(autosave=True) given that it relies on a single _aggregate() to load data, relies on a single SaveContext to save data, and uses a thread pool to apply the functions, which isn't parallelized due to GIL.

Maybe I'm misunderstanding something here, but I'm just very confused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I am not 100% certain on the details of how saving works with iter_samples so we are just hoping it will work; I think the next step is to check to make sure that the results are correct (same stats, samples with correct fields, etc.)

However, we are testing two batching strategies (latency and static) and the number looks sort of consistent so I think there might a good chance that the batch update works (but not 100% sure).

Can we check if a sample has an unsaved update? I can run a validation at the end of parallel processing to figure out if there is any unsaved update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another solution is that we can call sample.save(...) manually or integrate the thread implementation with an auto-save batcher too. In that case, the updates will be performed correctly.

@kaixi-wang
Copy link
Contributor

For the parallelization I think Brian is describing, we would need to initialize the mongo client from within map_samples (this would also allow us to make more connections to mongo since mongo limits the total number of connections per client). Then call iter_samples inside the worker function on a view defined by the args.

To avoid serializing a long list of ids from a values() call (which itself can be expensive I/O), but not rely on the order of samples returned in slices defined by skip/limit (which may not be stable), I think we can try defining the view from which we iterate on by passing a min/max _id so each worker starts with a match stage on an indexed field, which will also help with performance.

There will be some overhead in getting the bounds upfront, but it's hopefully negligible

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
fiftyone/core/view.py (4)

9-9: Remove unused import.

The ThreadPoolExecutor import is not used in the implementation. Consider removing it since the code uses threading.Thread directly.

-from concurrent.futures import ThreadPoolExecutor
🧰 Tools
🪛 Ruff (0.8.2)

9-9: concurrent.futures.ThreadPoolExecutor imported but unused

Remove unused import: concurrent.futures.ThreadPoolExecutor

(F401)


474-474: Consider making MAX_QUEUE_SIZE configurable.

A hardcoded queue size of 10000 might not be optimal for all use cases. Consider making this a configurable parameter with a reasonable default.

-        MAX_QUEUE_SIZE = 10000
+        max_queue_size = kwargs.pop('max_queue_size', 10000)

457-561: Consider using ThreadPoolExecutor for better thread management.

The current implementation manually manages threads. Using ThreadPoolExecutor would provide better thread lifecycle management and a cleaner implementation.

Here's a suggested refactor:

def map_samples(
    self,
    map_func,
    map_func_args=None,
    aggregate_func=None,
    num_workers=4,
    progress=False,
    autosave=False,
    batch_size=None,
    batching_strategy=None,
    skip_failures=True,
    warn_failures=False,
):
    """Maps a function over the samples in the dataset using parallel processing."""
    if not callable(map_func):
        raise ValueError("map_func must be callable")

    results = []
    sample_count = 0
    
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = []
        
        # Submit samples to the worker pool
        for sample in self.iter_samples(
            progress=progress,
            autosave=autosave,
            batch_size=batch_size,
            batching_strategy=batching_strategy,
        ):
            future = executor.submit(map_func, sample, map_func_args)
            futures.append(future)
            sample_count += 1
            
        # Collect results
        for future in futures:
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                if not skip_failures:
                    raise RuntimeError(
                        "Worker failed while processing sample"
                    ) from e
                
                if warn_failures:
                    logger.warning("Error processing sample: %s", e)
                results.append(None)
                
    # Check result count
    if len(results) != sample_count:
        message = (
            f"Worker threads did not process all samples, getting "
            f"{len(results)} results out of {sample_count}"
        )
        if not skip_failures:
            raise RuntimeError(message)
        elif warn_failures:
            logger.warning(message)
            
    # Apply aggregation if specified
    if aggregate_func is not None:
        if callable(aggregate_func):
            results = aggregate_func(results)
        else:
            raise ValueError("aggregate_func must be callable")
            
    return results

562-578: Remove unused helper method.

The _process_future_result method appears to be unused in the current implementation. Consider removing it if it's no longer needed.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5f5ebc5 and 86f3f48.

📒 Files selected for processing (1)
  • fiftyone/core/view.py (3 hunks)
🧰 Additional context used
🪛 Ruff (0.8.2)
fiftyone/core/view.py

9-9: concurrent.futures.ThreadPoolExecutor imported but unused

Remove unused import: concurrent.futures.ThreadPoolExecutor

(F401)

⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: test / test-app
  • GitHub Check: e2e / test-e2e
  • GitHub Check: build / build
  • GitHub Check: build

result_queue.put(("error", e))
work_queue.task_done()
except queue.Empty:
print(f"Worker thread {thread_name} exiting")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use logger for worker thread messages.

Replace print statement with proper logging for consistency.

-                    print(f"Worker thread {thread_name} exiting")
+                    logger.debug("Worker thread %s exiting", thread_name)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
print(f"Worker thread {thread_name} exiting")
logger.debug("Worker thread %s exiting", thread_name)

Comment on lines +489 to +491
print(
f"Warning: Failed to process sample: {e}"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use logger instead of print statements.

Replace print statements with proper logging calls since a logger is already configured.

-                                print(
-                                    f"Warning: Failed to process sample: {e}"
-                                )
+                                logger.warning("Failed to process sample: %s", e)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
print(
f"Warning: Failed to process sample: {e}"
)
logger.warning("Failed to process sample: %s", e)

@minhtuev
Copy link
Contributor Author

minhtuev commented Jan 29, 2025

Updated implementation of thread execution using dedicated threads with input and output queue to minimize resource allocations/deallocations.

Experiment script: https://gist.github.com/minhtuev/834e2efb1c1538097912b7aa07ee5a63

Results Summary:
----------------------------------------------------------------------
   Workers | Batch Size |   Time (s) |  Completed
----------------------------------------------------------------------
        12 |          8 |     122.80 |        Yes
         2 |          4 |     146.66 |        Yes
        16 |          8 |     162.61 |        Yes
         8 |          8 |     188.54 |        Yes
        12 |          2 |     198.37 |        Yes
         1 |          4 |     199.56 |        Yes
         8 |          1 |     200.04 |        Yes
         4 |          1 |     201.11 |        Yes
        16 |          1 |     201.25 |        Yes
         4 |          2 |     202.36 |        Yes
        12 |          1 |     203.99 |        Yes
         2 |          2 |     204.63 |        Yes
         8 |          2 |     204.77 |        Yes
         1 |          2 |     206.53 |        Yes
         4 |       None |     208.45 |        Yes
        12 |       None |     208.63 |        Yes
         1 |       None |     209.12 |        Yes
         4 |          8 |     212.51 |        Yes
         1 |          8 |     213.84 |        Yes
        16 |          2 |     215.00 |        Yes
         2 |          8 |     215.75 |        Yes
        16 |       None |     215.94 |        Yes
         8 |       None |     216.11 |        Yes
         4 |          4 |     216.12 |        Yes
         2 |       None |     221.50 |        Yes
         2 |          1 |     223.68 |        Yes
         8 |          4 |     226.68 |        Yes
        12 |          4 |     229.27 |        Yes
        16 |          4 |     231.98 |        Yes
         1 |          1 |     241.54 |        Yes
----------------------------------------------------------------------

Experiment Note: using dedicated threads to minimize resources allocation

@swheaton
Copy link
Contributor

For the parallelization I think Brian is describing, we would need to initialize the mongo client from within map_samples (this would also allow us to make more connections to mongo since mongo limits the total number of connections per client). Then call iter_samples inside the worker function on a view defined by the args.

To avoid serializing a long list of ids from a values() call (which itself can be expensive I/O), but not rely on the order of samples returned in slices defined by skip/limit (which may not be stable), I think we can try defining the view from which we iterate on by passing a min/max _id so each worker starts with a match stage on an indexed field, which will also help with performance.

There will be some overhead in getting the bounds upfront, but it's hopefully negligible

Yeah this is what concerned me and why I thought we should read in a single thread and farm out the objects elsewhere. Until we prove we must go further in order to improve performance, then we will have to start making these tradeoffs.

You have a good idea though. But how do you split 10M ID's into N batches defined by [min_id, max_id]? We would have to do a full IXscan I think, UNLESS we make an assumption about ID distribution, which I don't think we can.

@kaixi-wang
Copy link
Contributor

But how do you split 10M ID's into N batches defined by [min_id, max_id]? We would have to do a full IXscan I think, UNLESS we make an assumption about ID distribution, which I don't think we can.

That might be a spike. But fwiw, an IX scan on _id for 10 million samples takes less than a second.
I agree we definitely cannot make an assumption on distribution without additional metadata that doesn't exist now

@swheaton
Copy link
Contributor

But how do you split 10M ID's into N batches defined by [min_id, max_id]? We would have to do a full IXscan I think, UNLESS we make an assumption about ID distribution, which I don't think we can.

That might be a spike. But fwiw, an IX scan on _id for 10 million samples takes less than a second. I agree we definitely cannot make an assumption on distribution without additional metadata that doesn't exist now

Still don't know why hint has such an impact but I'll take it. Is this still true if sorted by _id, I assume? This could be a good approach then, I like it. We wouldn't have to store all IDs just rip through and create batches of rough size.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants