-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathparse.py
125 lines (109 loc) · 4.12 KB
/
parse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import gzip
from collections import defaultdict
from functools import lru_cache
from urllib.parse import urlparse
from clfparser import CLFParser
from fire import Fire
from jinja2 import DictLoader, Environment
from tldextract import extract
from tqdm import tqdm
from user_agents import parse as uaparse
from template import TEMPLATE
BOTS = [
"Amazonbot", "Applebot", "Bytespider", "FacebookBot", "Twitterbot",
"FriendlyCrawler", "ISSCyberRiskCrawler", "YisouSpider",
"Go-http-client", "http.rb", "scalaj-http",
"Python aiohttp", "Python Requests",
]
@lru_cache(maxsize=None)
def get_hostname(value):
r = extract(value)
if not r.suffix:
return r.domain
if r.subdomain in ['', 'www']:
return ".".join((r.domain, r.suffix))
return ".".join((r.subdomain, r.domain, r.suffix))
@lru_cache(maxsize=None)
def get_link(value):
a = urlparse(value)
netloc = a.netloc[4:] if a.netloc.startswith('www.') else a.netloc
path = a.path[:-1] if a.path.endswith('/') else a.path
return netloc + path
def generate_html(days, browsers, systems, bots, refs, lowest, html):
print('Generating HTML...', html)
env = Environment(loader=DictLoader({'template.html': TEMPLATE}))
template = env.get_template('template.html')
with open(html, 'w') as file:
output = template.render(
days=sorted(days.items()),
browsers=sorted(browsers.items(), key=lambda item: len(item[1])),
systems=sorted(systems.items(), key=lambda item: len(item[1])),
bots=sorted(bots.items(), key=lambda item: len(item[1])),
refs=sorted(refs.items(), key=lambda item: len(item[1])),
lowest=lowest
)
file.write(output)
def console_print(days, browsers, systems, bots, refs, lowest):
def display(key, value):
if len(value) > lowest:
print(str(len(value)).rjust(tabs), key)
dicts = [days, browsers, systems, refs]
tabs = len(str(max(len(v) for d in dicts for v in d.values())))
padding = "-" * tabs
print(padding, 'Days')
for key, value in sorted(days.items()):
display(key, value)
print()
print(padding, 'Browsers')
for key, value in sorted(browsers.items(), key=lambda item: len(item[1])):
display(key, value)
print()
print(padding, 'Operating Systems')
for key, value in sorted(systems.items(), key=lambda item: len(item[1])):
display(key, value)
print()
print(padding, 'Bots')
for key, value in sorted(bots.items(), key=lambda item: len(item[1])):
display(key, value)
print()
print(padding, 'Referrers')
for key, value in sorted(refs.items(), key=lambda item: len(item[1])):
display(key, value)
def parse(gz_path, lowest=0, html="", skip=""):
"""
Parse a gzipped log file.
Args:
gz_path (str): Path to a gzipped log file
lowest (int): Hide rows with lowest number of items
html (str): Specify a .html file path
skip (str): List referers to skip
"""
days = defaultdict(set)
browsers = defaultdict(set)
systems = defaultdict(set)
refs = defaultdict(set)
bots = defaultdict(set)
skipped = [i for i in skip.split(',') if i]
with gzip.open(gz_path, 'rt') as file:
for line in tqdm(file, unit="l"):
log = CLFParser.logDict(line)
ip = log['h']
agent = uaparse(log["Useragent"][1:-1])
if agent.is_bot or agent.browser.family in BOTS:
bots[agent.browser.family].add(ip)
else:
day = log['time'].strftime('%Y-%m-%d %A')
days[day].add(ip)
browsers[agent.browser.family].add(ip)
systems[agent.os.family].add(ip)
ref = log["Referer"][1:-1].lower()
link = get_link(ref)
hostname = get_hostname(ref)
if hostname not in skipped:
refs[link].add(ip)
if html:
generate_html(days, browsers, systems, bots, refs, lowest, html)
else:
console_print(days, browsers, systems, bots, refs, lowest)
if __name__ == "__main__":
Fire(parse)