-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest_cmd.py
142 lines (107 loc) · 3.94 KB
/
test_cmd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import io
import os
import subprocess
from typing import List
import pytest
import pysh
from pysh import cmd
def test_pipeline():
assert list(
cmd.cat(b'/etc/shells') | cmd.splitlines()
# sh { cat /etc/shells | split -l }
) == open('/etc/shells', 'rb').read().rstrip(b"\n").split(b"\n")
assert (
pysh.slurp(cmd.run('git rev-parse {}', 'dbccdbe6f~2'))
# sh { git rev-parse ${commitish} | slurp }
) == b'91a20bf6b4a72f1f84b2f57cf38b3f771dd35fda'
# Input and output optional; check nothing blows up.
cmd.devnull()()
def test_echo():
assert (
pysh.slurp(cmd.echo(b'hello', b'world'))
) == b'hello world'
def test_decode():
world = '\N{WORLD MAP}'.encode()
assert pysh.slurp(
cmd.echo(b'hello', world) | cmd.decode() | cmd.encode()
) == b'hello ' + world
def test_splitlines():
def echo_splitlines(s: str) -> List[str]:
return [item.decode() for item in
(cmd.echo(s.encode(), ln=False)
| cmd.splitlines())]
def check_splitlines(s: str, l: List[str]) -> None:
chopped = s[:-1] if s.endswith('\n') else s
assert echo_splitlines(s) == chopped.split('\n') == l
check_splitlines('1', ['1'])
check_splitlines('1\n', ['1'])
check_splitlines('1\n\n', ['1', ''])
check_splitlines('\n1\n\n2\n', ['', '1', '', '2'])
def test_splitlines_chunks():
'''Manipulate chunk boundaries in `splitlines` input.'''
class WriteChunks:
def __init__(self, ss: List[str]) -> None:
self.bs = [s.encode() for s in reversed(ss)]
def read1(self, size: int = -1) -> bytes:
if self.bs:
return self.bs.pop()
return b''
def resplit(ss: List[str]) -> List[str]:
return [item.decode() for item in
cmd.splitlines().thunk(WriteChunks(ss), None)]
def check_resplit(ss: List[str]) -> None:
assert resplit(ss) == resplit([''.join(ss)])
check_resplit(['\n', '\n'])
check_resplit(['1', '\n'])
check_resplit(['1', '\n', '\n'])
check_resplit(['1\n', '\n'])
check_resplit(['1', '\n\n'])
check_resplit(['1', '\n', '2'])
check_resplit(['1', '\n2'])
check_resplit(['1\n', '2'])
def test_run():
# (Commits in this repo's history.)
assert list(cmd.run('git log --abbrev=9 --format={} {}', '%p', '9cdfc6d46')
| cmd.splitlines()) \
== [b'a515d0250', b'c90596c89', b'']
assert (
pysh.slurp(cmd.echo(b'hello') | cmd.run('tr h H'))
# sh { echo hello | tr h H | slurp }
) == b'Hello'
assert pysh.slurp(
cmd.run('git log --oneline --reverse --abbrev=9')
| cmd.run('grep -m1 {}', 'yield')
| cmd.run('perl -lane {}', 'print $F[0]')
) == b'91a20bf6b'
def test_run_input_file():
readfd, writefd = os.pipe()
os.write(writefd, b'abc')
os.close(writefd)
buf = io.BytesIO()
with os.fdopen(readfd) as f:
cmd.run('perl -lpe print').thunk(f, buf)
assert bytes(buf.getbuffer()) == b'abc\nabc\n'
def test_run_check():
with pytest.raises(subprocess.CalledProcessError):
pysh.slurp(cmd.run('false'))
assert pysh.slurp(cmd.run('false', _check=False)) == b''
def test_run_stderr(capfd):
# On `capfd`, see pytest docs:
# http://doc.pytest.org/en/latest/capture.html#accessing-captured-output-from-a-test-function
assert list(
cmd.run('sh -c {}', 'echo a; echo ERR >&2; echo b')
| cmd.splitlines()
) == [b'a', b'b']
assert capfd.readouterr() == ('', 'ERR\n')
assert list(
cmd.run('sh -c {}', 'echo a; echo ERR >&2; echo b',
_stderr=subprocess.DEVNULL)
| cmd.splitlines()
) == [b'a', b'b']
assert capfd.readouterr() == ('', '')
assert list(
cmd.run('sh -c {}', 'echo a; echo ERR >&2; echo b',
_stderr=subprocess.STDOUT)
| cmd.splitlines()
) == [b'a', b'ERR', b'b']
assert capfd.readouterr() == ('', '')