-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathLogger.py
71 lines (59 loc) · 2.69 KB
/
Logger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import datetime
import base64
import requests
from azure.storage.blob import BlobServiceClient
class Logger():
def __init__(self,credential,logConfig) -> None:
self.config = logConfig
try: # get logged in user
token = credential.get_token('https://graph.microsoft.com/.default')
headers = {'Authorization': 'Bearer ' + token.token}
response = requests.get('https://graph.microsoft.com/v1.0/me', headers=headers)
self.upn = response.json()['userPrincipalName']
except: # give a fake name
self.upn = 'noUser@local.local'
self.blob_service_client = BlobServiceClient(
account_url=f"https://{self.config['storageAccount']}.blob.core.windows.net",
credential=credential
)
self.container_client = self.blob_service_client.get_container_client(
container=self.config['container']
)
def log(self,type:str,message:str):
timestamp = datetime.datetime.utcnow().isoformat(timespec='seconds')
if isinstance(message,dict):
message = str(message)
msg_tab_escaped = message.replace('\t',' ')
bMessage = (f"{timestamp}\t{type}\t{self.upn}\t{msg_tab_escaped}\n").encode()
log_blob = self.blob_service_client.get_blob_client(
self.config['container'],
f"SDAdmin/SDAdmin_{timestamp[:10]}.log"
)
if not log_blob.exists():
log_blob.create_append_blob()
headers = '\t'.join(['timestamp','type','user','message']) + '\n'
log_blob.append_block(headers.encode())
log_blob.append_block(bMessage)
def _tsv2dict(self,tsv:str) -> dict:
lines = tsv.split('\n')
header = lines[0].split('\t')
data_lines = lines[1:-1]
result = []
for line in data_lines:
columns = line.split('\t')
row_dict = {header[i] if i < len(header) else f'Field{i}': col for i, col in enumerate(columns)}
result.append(row_dict)
return result
def list_log_file_names(self,b64:bool=True):
filenames = [b for b in self.container_client.list_blob_names(name_starts_with='SDAdmin/SDAdmin',)]
if b64:
return [base64.b64encode(fname.encode()).decode() for fname in filenames]
else:
return filenames
def get_log_file(self,filename,b64:bool=True):
if b64:
filename = base64.b64decode(filename.encode()).decode()
blobClient = self.container_client.get_blob_client(filename)
blobStr = blobClient.download_blob().readall().decode('utf-8')
blobDict = self._tsv2dict(blobStr)
return blobDict