-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
278 lines (206 loc) · 8.09 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
"""
Este script é um exemplo de como baixar arquivos de um bucket S3,
zipar e reenviar para o mesmo bucket.
"""
import os
import json
import time
import stat
import shutil
import zipfile
# from concurrent.futures import ThreadPoolExecutor
import boto3
# MAX_THREADS = 100
FAKE_FOLDER = "./data"
FAKE_OUTPUT_DIR = "./output"
QUEUE_NAME = "dev-queue-zip-sample"
QUEUE_RECEIVE_MAX_ATTEMPTS = 3
QUEUE_RECEIVE_MAX_MESSAGES = 1
QUEUE_RECEIVE_TIMEOUT_SECONDS = 3
QUEUE_RECEIVE_VISIBILITY_TIMEOUT = 30
QUEUE_RECEIVE_CURRENT_ATTEMPT = 0
class Message:
"""Representa uma mensagem recebida da fila do SQS."""
def __init__(self, id: str, zip_name: str, zip_folder: str):
self.id = id
self.zip_name = zip_name
self.zip_folder = zip_folder
def __repr__(self):
"""Retorna a representação da mensagem."""
return f"""
Message(id={self.id},
zip_name={self.zip_name},
zip_folder={self.zip_folder})
"""
def to_json(self):
"""Retorna a representação JSON da mensagem."""
return json.dumps(self.__dict__)
# boto3.setup_default_session(
# region_name='us-east-1',
# aws_access_key_id='localstack',
# aws_secret_access_key='localstack'
# )
s3 = boto3.client("s3")
sqs = boto3.client("sqs")
queue = sqs.get_queue_url(QueueName=QUEUE_NAME)
queue_url = queue["QueueUrl"]
def download_file(bucket_name, key, current_file_name):
"""
Downloads a file from an S3 bucket.
"""
s3.download_file(bucket_name, key, current_file_name)
print(f" -> Downloading {key} to {current_file_name}...")
def download_s3_files(bucket_name, folder):
"""
Downloads files from an S3 bucket folder, excluding zip files and directories.
"""
download_start = time.time()
print(f" -> Downloading files from {bucket_name}/{folder}...")
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder)
# tasks = []
# with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
# with ThreadPoolExecutor() as executor:
if "Contents" not in response:
print(" -> ** No files to download")
return
for obj in response["Contents"]:
key = obj["Key"]
if key.find("zip") != -1:
continue
if key.endswith("/"):
continue
current_dir = os.path.abspath(f"./{FAKE_FOLDER}/{folder}")
os.makedirs(current_dir, exist_ok=True)
current_file_name = os.path.join(current_dir, f"{os.path.basename(key)}")
download_file(bucket_name, key, current_file_name)
# tasks.append(executor.submit(download_file, s3, bucket_name, key, current_file_name))
# for task in tasks:
# task.result()
# print(" -> Downloaded all files")
download_end = time.time()
print(f" -> Download finished in {download_end - download_start:2f} seconds")
def zip_s3_files(folder, zip_name):
"""
Zips all files in the specified folder and saves the zip file in the output directory.
"""
zip_start = time.time()
print(f" -> Zipping files in ${folder}...")
current_dir = os.path.abspath(f"./{folder}")
os.makedirs(FAKE_OUTPUT_DIR, exist_ok=True)
zip_file_name = os.path.join(FAKE_OUTPUT_DIR, f"{zip_name}")
with zipfile.ZipFile(zip_file_name, "w") as zipf:
for root, _, files in os.walk(current_dir):
for file in files:
file_name = os.path.join(root, file)
arc_name = os.path.relpath(file_name, current_dir)
zipf.write(file_name, arc_name)
print(f" -> Zipping {file_name} to {arc_name}...")
zip_end = time.time()
print(f" -> Zip finished in {zip_end - zip_start:2f} seconds")
def upload_s3_files(bucket_name, folder, zip_name):
"""Uploads a zip file to an S3 bucket."""
upload_start = time.time()
print(f" -> Uploading files to {bucket_name}/{folder}...")
current_dir = os.path.abspath(FAKE_OUTPUT_DIR)
zip_file_name = os.path.join(current_dir, f"{zip_name}")
with open(zip_file_name, "rb") as data:
s3.upload_fileobj(data, bucket_name, f"{folder}/{zip_name}")
print(
f" -> Uploading {zip_file_name} to {bucket_name}/{folder}/{zip_name} ..."
)
upload_end = time.time()
print(f" -> Upload finished in {upload_end - upload_start:2f} seconds")
def deserialize_message(json_payload: str):
"""
Deserializa uma mensagem JSON em um objeto
"""
try:
data = json.loads(json_payload)
return Message(
id=data["id"], zip_name=data["zip_name"], zip_folder=data["zip_folder"]
)
except (KeyError, json.JSONDecodeError) as e:
print(f" @@ Erro ao deserializar a mensagem: {e}")
return None
def handle_remove_readonly(func, path, excinfo):
# Modifica as permissões para leitura/escrita e tenta novamente
os.chmod(path, stat.S_IWRITE)
func(path)
def process_zip(msg: Message, bucket_name: str):
"""
Processa uma mensagem de zip.
"""
# VARIAVEIS PADROES
folder = msg.id
# REMOVER PASTAS
shutil.rmtree(
f"./{FAKE_FOLDER}/{folder}", ignore_errors=True, onexc=handle_remove_readonly
)
shutil.rmtree(
f"./{FAKE_OUTPUT_DIR}", ignore_errors=True, onexc=handle_remove_readonly
)
# BUSCA INFO ADICIONAIS NO BANCO
# XXXXX
zip_name = msg.zip_name
# FAZER DOWNLOAD DOS ARQUIVOS QUE ESTAO NO S3
download_s3_files(bucket_name, folder)
# FAZER O ZIP DOS ARQUIVOS QUE FORAM BAIXADOS DO S3
zip_s3_files(folder, zip_name)
# FAZER O UPLOAD DO ZIP PARA O S3 NOVAMENTE
upload_s3_files(bucket_name, folder, zip_name)
# CRIAR / ATUALIZA O REGISTRO NO BANCO DE DADOS
# XXXXX
# PROCESSAR MENSAGENS DA FILA
try:
BUCKET_NAME = "zip-sample"
# Receber mensagens da fila
print(f"# Recebendo mensagens da fila: {queue_url}")
while QUEUE_RECEIVE_CURRENT_ATTEMPT < QUEUE_RECEIVE_MAX_ATTEMPTS:
print(
f" + Tentativa {QUEUE_RECEIVE_CURRENT_ATTEMPT + 1} de {QUEUE_RECEIVE_MAX_ATTEMPTS}..."
)
# Receber mensagens da fila
messages = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=QUEUE_RECEIVE_MAX_MESSAGES,
WaitTimeSeconds=QUEUE_RECEIVE_TIMEOUT_SECONDS, # Espera para receber a msg
VisibilityTimeout=QUEUE_RECEIVE_VISIBILITY_TIMEOUT, # Msg ficará invisível para outros
)
if "Messages" in messages:
print(f" -> Total de {len(messages['Messages'])} mensagens recebidas.")
for message in messages["Messages"]:
print(f" -> Mensagem recebida: {message['Body']}")
# Deserializar a mensagem
msg = deserialize_message(message["Body"])
if not msg:
print(f" @@ Erro ao deserializar a mensagem. Message: {msg}")
continue
# Processar a mensagem
process_zip(msg, BUCKET_NAME)
# Excluir mensagem após processá-la
receipt_handle = message["ReceiptHandle"]
print(f" -> Excluindo mensagem da fila: {receipt_handle}")
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)
# Reiniciar a contagem de tentativas
print(f" -> Reiniciando contagem de tentativas.")
QUEUE_RECEIVE_CURRENT_ATTEMPT = 0
else:
print(" -> Nenhuma mensagem encontrada.")
QUEUE_RECEIVE_CURRENT_ATTEMPT += 1
except Exception as e:
print(f"@ Erro ao processar mensagens: {e}")
# não commit a msg no sqs
else:
# Se não houver mensagens após 3 tentativas, finaliza o programa
print("# Finalizando o programa após 3 tentativas sem mensagens.")
finally:
print("# Encerrando o programa...")
# num_vcpus = os.cpu_count()
# print(f"Número de vCPUs disponíveis: {num_vcpus}")
# fator_sobrecarga = 3
# def printX(x):
# print(x)
# max_threads = num_vcpus * fator_sobrecarga
# print(f"Max threads: {max_threads}")
# with ThreadPoolExecutor(max_threads) as executor:
# executor.map(printX, range(1000))