-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbbi_stream.hh
155 lines (120 loc) · 4.67 KB
/
bbi_stream.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#ifndef DPJ_BBI_STREAM_HH_
#define DPJ_BBI_STREAM_HH_
#include <type_traits>
#include <vector>
#include <fstream>
#include <zlib.h>
#include "dpj/util/block_decompressor.hh"
#include "dpj/http/http_streambuf.hh"
#include "main_header.hh"
#include "total_summary_header.hh"
#include "zoom_header.hh"
#include "r_tree.hh"
#include "bbi_index.hh"
#include "contig_index.hh"
namespace bbi {
const uint32_t bed_magic = 0x8789F2EB;
const uint32_t wig_magic = 0x888FFC26;
class stream : public std::streambuf {
public:
enum { UNKNOWN, BED, WIG } bbi_type;
stream() : bbi_type{UNKNOWN} {}
void open(std::string resource) {
if (resource.substr(0, 7) == "http://") {
resource = resource.substr(7); // inefficient
auto pos = resource.find_first_of('/');
auto host = resource.substr(0, pos);
resource = resource.substr(pos);
input_stream_.reset(
new dpj::http::streambuf(host, "http", resource, false /* debug */));
} else {
std::unique_ptr<std::filebuf> fb{new std::filebuf{}};
fb->open(resource, std::ios_base::in);
if (!fb->is_open())
throw std::runtime_error("remote_bbi: could not open file: " + resource);
input_stream_ = std::move(fb);
}
// Unpacks main header and determines type.
//
unpack(main_header, input_stream_.get());
switch (main_header.magic) {
case bed_magic:
bbi_type = BED;
break;
case wig_magic:
bbi_type = WIG;
break;
default:
throw std::runtime_error("remote_bbi: bad magic number: "+std::to_string(main_header.magic));
}
// Unpacks total summary header
//
input_stream()->pubseekpos(main_header.total_summary_offset);
unpack(total_summary_header, input_stream_.get());
// Unpacks the number of records.
//
input_stream()->pubseekpos(main_header.full_data_offset);
input_stream()->sgetn((char*)&num_records, 8);
// Unpacks the zoom headers.
//
input_stream()->pubseekpos(main_header::byte_size);
for (int i = 0; i < main_header.zoom_levels; ++i) {
zoom_header z_h{0};
unpack(z_h, input_stream_.get());
zoom_headers.push_back(z_h);
}
}
std::streambuf* input_stream() { return input_stream_.get(); }
friend stream& seek(stream& s, r_tree::leaf_node ln) {
s.input_buf.resize(ln.data_size);
s.input_stream()->pubseekpos(ln.data_offset);
std::streamsize n = s.input_stream()->sgetn((char*)s.input_buf.data(), s.input_buf.size());
if (n != ln.data_size)
throw std::runtime_error("file::inflate_records failed to read comp_sz bytes");
if (s.main_header.uncompress_buf_size == 0) {
s.setg((char*)s.input_buf.data(), (char*)s.input_buf.data(),
(char*)s.input_buf.data() + s.input_buf.size());
} else {
auto p = s.decompressor.decompress(s.input_buf.data(),
s.input_buf.data() + s.input_buf.size());
s.setg((char*)p.first, (char*)p.first, (char*)p.second);
}
return s;
}
friend index index(stream& s, int level) {
if (level > s.zoom_headers.size()) {
level = static_cast<int>(s.zoom_headers.size());
std::cerr << "Warning: zoom level requested was greater than available.\n";
}
if (level == 0)
s.input_stream()->pubseekpos(s.main_header.full_index_offset);
else
s.input_stream()->pubseekpos(s.zoom_headers[level - 1].index_offset);
return s.input_stream();
}
friend contig_index ctig_index(stream& s) {
s.input_stream()->pubseekpos(s.main_header.bp_tree_offset);
return s.input_stream();
}
friend void print_header(stream& s, std::ostream& os) {
os << "\n**** main_header ****\n\n";
print(s.main_header, os);
}
friend void print_summary(stream& s, std::ostream& os) {
os << "\n**** total_summary_header ****\n\n";
print(s.total_summary_header, os);
}
friend void print_num_records(stream& s, std::ostream& os) {
os << "num records: " << s.num_records << '\n';
}
std::vector<zoom_header> zoom_headers;
private:
std::unique_ptr<std::streambuf> input_stream_;
main_header main_header;
total_summary_header total_summary_header;
std::size_t num_records;
std::vector<uint8_t> input_buf;
block_decompressor decompressor;
};
}
#endif /* DPJ_BBI_STREAM_HH_ */