Skip to content

Commit

Permalink
wip on kislyuk#84
Browse files Browse the repository at this point in the history
  • Loading branch information
vergenzt committed Apr 17, 2020
1 parent b392deb commit 57e46e2
Showing 1 changed file with 65 additions and 40 deletions.
105 changes: 65 additions & 40 deletions yq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from __future__ import absolute_import, division, print_function, unicode_literals

import sys, argparse, subprocess, json
import sys, argparse, subprocess, json, os, os.path, tempfile
from collections import OrderedDict
from datetime import datetime, date, time

Expand All @@ -27,7 +27,46 @@ def default(self, o):
return o.isoformat()
return json.JSONEncoder.default(self, o)

def decode_docs(jq_output, json_decoder):
def mktempfifo(input_stream, suffix, program_name=sys.argv[0]):
mktemp_kwargs = {}
if input_stream == sys.stdin:
return None
(dir, basename) = os.path.split(input_stream.name)
mktemp_kwargs = {
"dir": dir,
"prefix": basename + ".tmp_",
"suffix": suffix
}
fifo_name = tempfile.mktemp(**mktemp_kwargs)
try:
os.mkfifo(fifo_name)
return fifo_name
except FileExistsError as e:
msg = "{}: Error creating fifo {} for file {}: {}. Skipping."
sys.stderr.print(msg.format(program_name, fifo_name, input_stream.name, type(e).__name__))
return None

def stream_input_docs(input_streams_with_targets, input_format):
for input_stream, target in input_streams_with_targets:
with open(input_fifo_name, 'w') as input_fifo:
for input_doc in load_input_docs(input_stream, input_format):
json.dump(input_doc, input_fifo)

def load_input_docs(input_stream, input_format):
if input_format == "yaml":
loader = get_loader(use_annotations=use_annotations)
for doc in yaml.load_all(input_stream, Loader=loader):
yield doc
elif input_format == "xml":
import xmltodict
yield xmltodict.parse(input_stream.read(), disable_entities=True)
elif input_format == "toml":
import toml
yield toml.load(input_stream)
else:
raise Exception("Unknown input format")

def decode_output_docs(jq_output, json_decoder):
while jq_output:
doc, pos = json_decoder.raw_decode(jq_output)
jq_output = jq_output[pos + 1:]
Expand Down Expand Up @@ -62,6 +101,11 @@ def close(self):
def __getattr__(self, a):
return getattr(self.fh, a)

class LazyFifodInputStream:
def __init__(self, input_stream):
self.input_stream = input_stream


def cli(args=None, input_format="yaml", program_name="yq"):
parser = get_parser(program_name, __doc__)
argcomplete.autocomplete(parser)
Expand Down Expand Up @@ -140,9 +184,16 @@ def yq(input_streams=None, output_stream=None, input_format="yaml", output_forma
exit_func = sys.exit
converting_output = True if output_format != "json" else False

def lazy_fifo(jq_stdin)
input_streams_with_lazy_fifos = [
(input_stream, mktempfifo(input_stream, ".json", program_name))
for input_stream
in input_streams
]

try:
# Note: universal_newlines is just a way to induce subprocess to make stdin a text buffer and encode it for us
jq = subprocess.Popen(["jq"] + list(jq_args),
jq = subprocess.Popen(["jq"] + list(jq_args) + [fifo_name for _, fifo_name in input_streams_with_fifo_names],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE if converting_output else None,
universal_newlines=True)
Expand All @@ -151,34 +202,27 @@ def yq(input_streams=None, output_stream=None, input_format="yaml", output_forma
exit_func(msg.format(program_name, type(e).__name__, e))

try:
stream_input_docs(input_streams_with_lazy_fifos, input_format)

if converting_output
decode_output_stream(

if converting_output:
# TODO: enable true streaming in this branch (with asyncio, asyncproc, a multi-shot variant of
# subprocess.Popen._communicate, etc.)
# See https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
use_annotations = True if output_format == "annotated_yaml" else False
input_docs = []
for input_stream in input_streams:
if input_format == "yaml":
loader = get_loader(use_annotations=use_annotations)
input_docs.extend(yaml.load_all(input_stream, Loader=loader))
elif input_format == "xml":
import xmltodict
input_docs.append(xmltodict.parse(input_stream.read(), disable_entities=True))
elif input_format == "toml":
import toml
input_docs.append(toml.load(input_stream))
else:
raise Exception("Unknown input format")
input_payload = "\n".join(json.dumps(doc, cls=JSONDateTimeEncoder) for doc in input_docs)
jq_out, jq_err = jq.communicate(input_payload)



json_decoder = json.JSONDecoder(object_pairs_hook=OrderedDict)
if output_format == "yaml" or output_format == "annotated_yaml":
yaml.dump_all(decode_docs(jq_out, json_decoder), stream=output_stream,
yaml.dump_all(decode_output_docs(jq_out, json_decoder), stream=output_stream,
Dumper=get_dumper(use_annotations=use_annotations, indentless=indentless_lists),
width=width, allow_unicode=True, default_flow_style=False)
elif output_format == "xml":
import xmltodict
for doc in decode_docs(jq_out, json_decoder):
for doc in decode_output_docs(jq_out, json_decoder):
if xml_root:
doc = {xml_root: doc}
elif not isinstance(doc, OrderedDict):
Expand All @@ -197,7 +241,7 @@ def yq(input_streams=None, output_stream=None, input_format="yaml", output_forma
output_stream.write(b"\n" if sys.version_info < (3, 0) else "\n")
elif output_format == "toml":
import toml
for doc in decode_docs(jq_out, json_decoder):
for doc in decode_output_docs(jq_out, json_decoder):
if not isinstance(doc, OrderedDict):
msg = "{}: Error converting JSON to TOML: cannot represent non-object types at top level."
exit_func(msg.format(program_name))
Expand All @@ -210,26 +254,7 @@ def yq(input_streams=None, output_stream=None, input_format="yaml", output_forma
# For Python 3, write the unicode to the buffer directly.
toml.dump(doc, output_stream)
else:
if input_format == "yaml":
loader = get_loader(use_annotations=False)
for input_stream in input_streams:
for doc in yaml.load_all(input_stream, Loader=loader):
json.dump(doc, jq.stdin, cls=JSONDateTimeEncoder)
jq.stdin.write("\n")
elif input_format == "xml":
import xmltodict
for input_stream in input_streams:
json.dump(xmltodict.parse(input_stream.read(), disable_entities=True), jq.stdin)
jq.stdin.write("\n")
elif input_format == "toml":
import toml
for input_stream in input_streams:
json.dump(toml.load(input_stream), jq.stdin)
jq.stdin.write("\n")
else:
raise Exception("Unknown input format")

jq.stdin.close()
jq.wait()
for input_stream in input_streams:
input_stream.close()
Expand Down

0 comments on commit 57e46e2

Please sign in to comment.