-
Notifications
You must be signed in to change notification settings - Fork 60
/
Copy pathdata_loader.py
200 lines (162 loc) · 6.48 KB
/
data_loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# Copyright 2021 Fuzz Introspector Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Reads the data output from the fuzz introspector LLVM plugin."""
import os
import json
import logging
import multiprocessing
from typing import (
Any,
Dict,
List,
Optional,
)
from fuzz_introspector import constants
from fuzz_introspector import utils
from fuzz_introspector.datatypes import (fuzzer_profile, bug)
logger = logging.getLogger(name=__name__)
def read_fuzzer_data_file_to_profile(
cfg_file: str,
language: str) -> Optional[fuzzer_profile.FuzzerProfile]:
"""
For a given .data file (CFG) read the corresponding .yaml file
This is a bit odd way of doing it and should probably be improved.
"""
logger.info(" - loading %s", cfg_file)
target_data_f = cfg_file
if cfg_file.endswith('.txt'):
target_data_f = '/'.join(cfg_file.split('/')[:-1]) + '/report'
logging.info('target data f: %s' % (target_data_f))
if not os.path.isfile(target_data_f) and not os.path.isfile(target_data_f +
".yaml"):
logger.info('R1')
return None
data_dict_yaml = utils.data_file_read_yaml(target_data_f + ".yaml")
# Must be dictionary
if data_dict_yaml is None or not isinstance(data_dict_yaml, dict):
logger.info('Found no data yaml file')
if os.path.isfile('report.yaml'):
data_dict_yaml = utils.data_file_read_yaml('report.yaml')
if data_dict_yaml is None or not isinstance(data_dict_yaml, dict):
logger.info('Report.yaml is not a valid yaml file')
return None
else:
logger.info('Found no module yaml files')
return None
profile = fuzzer_profile.FuzzerProfile(cfg_file, data_dict_yaml, language)
if not profile.has_entry_point():
logger.info("Found no entrypoints")
logger.info("Returning profile")
return profile
def _load_profile(data_file: str, language: str, manager, semaphore=None):
"""Internal function used for multithreaded profile loading"""
if semaphore is not None:
semaphore.acquire()
profile = read_fuzzer_data_file_to_profile(data_file, language)
if profile is not None:
manager[data_file] = profile
else:
logger.error('profile is none')
if semaphore is not None:
semaphore.release()
def load_all_debug_files(target_folder: str):
"""Loads all .debug_info files"""
debug_info_files = utils.get_all_files_in_tree_with_regex(
target_folder, ".*debug_info$")
for file in debug_info_files:
print("debug info file: %s", file)
return debug_info_files
def find_all_debug_all_types_files(target_folder: str):
"""Loads all .debug_info files"""
debug_info_files = utils.get_all_files_in_tree_with_regex(
target_folder, ".*debug_all_types$")
for file in debug_info_files:
print("debug info file: %s", file)
return debug_info_files
def find_all_debug_function_files(target_folder: str):
"""Loads all debug_all_functions files"""
debug_info_files = utils.get_all_files_in_tree_with_regex(
target_folder, ".*debug_all_functions$")
for file in debug_info_files:
print("debug info file: %s", file)
return debug_info_files
def load_all_profiles(
target_folder: str,
language: str,
parallelise: bool = True) -> List[fuzzer_profile.FuzzerProfile]:
"""Loads all profiles in target_folder in a multi-threaded manner"""
if language == "jvm":
# Java targets tend to be quite large, so we try to avoid memory
# exhaustion here.
semaphore_count = 3
else:
semaphore_count = 6
profiles = []
data_files = utils.get_all_files_in_tree_with_regex(
target_folder, "fuzzerLogFile.*\.data$")
data_files.extend(
utils.get_all_files_in_tree_with_regex(target_folder,
"fuzzer-calltree-*"))
target_calltrees = utils.get_all_files_in_tree_with_regex(
target_folder, "targetCalltree.txt$")
logger.info(target_calltrees)
data_files.extend(target_calltrees)
logger.info(" - found %d profiles to load", len(data_files))
if parallelise:
manager = multiprocessing.Manager()
semaphore = multiprocessing.Semaphore(semaphore_count)
return_dict = manager.dict()
jobs = []
for data_file in data_files:
p = multiprocessing.Process(target=_load_profile,
args=(data_file, language, return_dict,
semaphore))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
for v in return_dict.values():
profiles.append(v)
else:
return_dict_gen: Dict[Any, Any] = dict()
for data_file in data_files:
_load_profile(data_file, language, return_dict_gen, None)
for v in return_dict_gen.values():
profiles.append(v)
return profiles
def try_load_input_bugs() -> List[bug.Bug]:
"""Loads input bugs as list. Returns empty list if none"""
if not os.path.isfile(constants.INPUT_BUG_FILE):
return []
return load_input_bugs(constants.INPUT_BUG_FILE)
def load_input_bugs(bug_file: str) -> List[bug.Bug]:
input_bugs: List[bug.Bug] = []
if not os.path.isfile(bug_file):
return input_bugs
# Read file line by line
with open(bug_file, "r") as f:
data = json.load(f)
if not isinstance(data, dict):
return input_bugs
if "bugs" not in data:
return input_bugs
for bug_dict in data["bugs"]:
try:
ib = bug.Bug(bug_dict['source_file'], bug_dict['source_line'],
bug_dict['function_name'], bug_dict['fuzzer_name'],
bug_dict['description'], bug_dict['bug_type'])
input_bugs.append(ib)
except Exception:
continue
return input_bugs