Skip to content

Commit

Permalink
#545 Adding a callback stream filter to ease stream manipulation
Browse files Browse the repository at this point in the history
  • Loading branch information
nyamsprod committed Jan 3, 2025
1 parent b6a735b commit e19551b
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ All Notable changes to `Csv` will be documented in this file
### Added

- Adding the `TabularDataReader::map` method.
- Adding `CallbackStreamFilter` class

### Deprecated

Expand Down
52 changes: 52 additions & 0 deletions docs/9.0/interoperability/callback-stream-filter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
layout: default
title: Dynamic Stream Filter
---

# Callback Stream Filter

<p class="message-info">Available since version <code>9.22.0</code></p>

Sometimes you may encounter a scenario where you need to create a specific stream filter
to resolve a specific issue. Instead of having to put up with the hassle of creating a
fully fledge stream filter, we are introducing a `CallbackStreamFilter`. This filter
is a PHP stream filter which enables applying a callable onto the stream prior to it
being actively consumed by the CSV process.

## Usage with CSV objects

Out of the box, the filter can not work, it requires a unique name and a callback to be usable.
Once registered you can re-use the filter with CSV documents or with a resource.

let's imagine we have a CSV document with the return carrier character as the end of line character.
This type of document is parsable by the package but only if you enable the deprecated `auto_detect_line_endings`.

If you no longer want to rely on that feature since it emits a deprecation warning you can use the new
`CallbackStreamFilter` instead by swaping the offending character with a modern alternative.

```php
use League\Csv\CallbackStreamFilter;
use League\Csv\Reader;

$csv = "title1,title2,title3\rcontent11,content12,content13\rcontent21,content22,content23\r";

$document = Reader::createFromString($csv);
CallbackStreamFilter::addTo(
$document,
'swap.carrier.return',
fn (string $bucket): string => str_replace("\r", "\n", $bucket)
);
$document->setHeaderOffset(0);
return $document->first();
// returns ['title1' => 'content11', 'title2' => 'content12', 'title3' => 'content13']
```

The `addTo` method register the filter with the unique `swap.carrier.return` name and then attach
it to the CSV document object on read.

<p class="message-warning">On read, the CSV document content is <strong>never changed or replaced</strong>.
Conversely, the changes <strong>are persisted during writing</strong>.</p>

Of course the `CallbackStreamFilter` can be use in other different scenario or with PHP stream resources.


1 change: 1 addition & 0 deletions docs/_data/menu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ version:
Force Enclosure : '/9.0/interoperability/enclose-field/'
Handling Delimiter : '/9.0/interoperability/swap-delimiter/'
Formula Injection : '/9.0/interoperability/escape-formula-injection/'
Callback Stream Filter : '/9.0/interoperability/callback-stream-filter/'
Converting Records:
Overview: '/9.0/converter/'
Charset Converter: '/9.0/converter/charset/'
Expand Down
140 changes: 140 additions & 0 deletions src/CallbackStreamFilter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
<?php

/**
* League.Csv (https://csv.thephpleague.com)
*
* (c) Ignace Nyamagana Butera <nyamsprod@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace League\Csv;

use Closure;
use php_user_filter;
use RuntimeException;
use TypeError;

use function array_key_exists;
use function is_resource;

final class CallbackStreamFilter extends php_user_filter
{
private const FILTER_NAME = 'string.league.csv.stream.callback.filter';

public static function getFiltername(string $name): string
{
return self::FILTER_NAME.'.'.$name;
}

/**
* Static method to register the class as a stream filter.
*/
public static function register(string $name): void
{
$filtername = self::getFiltername($name);
if (!in_array($filtername, stream_get_filters(), true)) {
stream_filter_register($filtername, self::class);
}
}

/**
* Static method to attach the stream filter to a CSV Reader or Writer instance.
*/
public static function addTo(AbstractCsv $csv, string $name, callable $callback): void
{
self::register($name);

$csv->addStreamFilter(self::getFiltername($name), [
'name' => $name,
'callback' => $callback instanceof Closure ? $callback : $callback(...),
]);
}

/**
* @param resource $stream
* @param callable(string): string $callback
*
* @throws TypeError
* @throws RuntimeException
*
* @return resource
*/
public static function appendTo(mixed $stream, string $name, callable $callback): mixed
{
self::register($name);

is_resource($stream) || throw new TypeError('Argument passed must be a stream resource, '.gettype($stream).' given.');
'stream' === ($type = get_resource_type($stream)) || throw new TypeError('Argument passed must be a stream resource, '.$type.' resource given');

set_error_handler(fn (int $errno, string $errstr, string $errfile, int $errline) => true);
$filter = stream_filter_append($stream, self::getFiltername($name), params: [
'name' => $name,
'callback' => $callback instanceof Closure ? $callback : $callback(...),
]);
restore_error_handler();

if (!is_resource($filter)) {
throw new RuntimeException('Could not append the registered stream filter: '.self::getFiltername($name));
}

return $filter;
}

/**
* @param resource $stream
* @param callable(string): string $callback
*
* @throws TypeError
* @throws RuntimeException
*
* @return resource
*/
public static function prependTo(mixed $stream, string $name, callable $callback): mixed
{
self::register($name);

is_resource($stream) || throw new TypeError('Argument passed must be a stream resource, '.gettype($stream).' given.');
'stream' === ($type = get_resource_type($stream)) || throw new TypeError('Argument passed must be a stream resource, '.$type.' resource given');

$filtername = self::getFiltername($name);
set_error_handler(fn (int $errno, string $errstr, string $errfile, int $errline) => true);
$filter = stream_filter_append($stream, $filtername, params: [
'name' => $name,
'callback' => $callback instanceof Closure ? $callback : $callback(...),
]);
restore_error_handler();

if (!is_resource($filter)) {
throw new RuntimeException('Could not append the registered stream filter: '.self::getFiltername($name));
}

return $filter;
}

public function onCreate(): bool
{
return is_array($this->params) &&
array_key_exists('name', $this->params) &&
self::getFiltername($this->params['name']) === $this->filtername &&
array_key_exists('callback', $this->params) &&
$this->params['callback'] instanceof Closure
;
}

public function filter($in, $out, &$consumed, bool $closing): int
{
/** @var Closure(string): string $callback */
$callback = $this->params['callback']; /* @phpstan-ignore-line */
while (null !== ($bucket = stream_bucket_make_writeable($in))) {
$bucket->data = ($callback)($bucket->data);
$consumed += $bucket->datalen;
stream_bucket_append($out, $bucket);
}

return PSFS_PASS_ON;
}
}
65 changes: 65 additions & 0 deletions src/CallbackStreamFilterTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?php

/**
* League.Csv (https://csv.thephpleague.com)
*
* (c) Ignace Nyamagana Butera <nyamsprod@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace League\Csv;

use PHPUnit\Framework\Attributes\Test;
use PHPUnit\Framework\TestCase;

use function str_replace;

final class CallbackStreamFilterTest extends TestCase
{
#[Test]
public function it_can_swap_the_delimiter_on_read(): void
{
$document = <<<CSV
observedOn💩temperature💩place
2023-10-01💩18💩Yamoussokro
2023-10-02💩21💩Yamoussokro
2023-10-03💩15💩Yamoussokro
2023-10-01💩22💩Abidjan
2023-10-02💩19💩Abidjan
2023-10-03💩💩Abidjan
CSV;

$reader = Reader::createFromString($document);
$reader->setDelimiter("\x02");
CallbackStreamFilter::addTo(
$reader,
'swap.delemiter.in',
fn (string $bucket): string => str_replace('💩', "\x02", $bucket)
);
$reader->setHeaderOffset(0);

self::assertSame(
['observedOn' => '2023-10-01', 'temperature' => '18', 'place' => 'Yamoussokro'],
$reader->first()
);
}

#[Test]
public function it_can_swap_the_delimiter_on_write(): void
{
$writer = Writer::createFromString();
$writer->setDelimiter("\x02");
CallbackStreamFilter::addTo(
$writer,
'swap.delemiter.out',
fn (string $bucket): string => str_replace("\x02", '💩', $bucket)
);

$writer->insertOne(['observeedOn' => '2023-10-01', 'temperature' => '18', 'place' => 'Yamoussokro']);
self::assertSame('2023-10-01💩18💩Yamoussokro'."\n", $writer->toString());
}
}

0 comments on commit e19551b

Please sign in to comment.