forked from its-a-feature/Mythic
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserver.py
32 lines (28 loc) · 1.2 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from app import apfell, dbloop, apfell_db, db_objects, use_ssl, listen_port, listen_ip, ssl_cert_path, ssl_key_path
import asyncio
import ssl
if __name__ == "__main__":
asyncio.set_event_loop(dbloop)
if use_ssl:
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(ssl_cert_path, keyfile=ssl_key_path)
server = apfell.create_server(host=listen_ip, port=listen_port, ssl=context, debug=False)
else:
server = apfell.create_server(host=listen_ip, port=listen_port, debug=False)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(server)
# thanks to some awesome people at the sanic community forums,
# we can now detect when the bound port is already in use
def callback(fut):
try:
fetch_count = fut.result()
except OSError as e:
print("probably the port set is being used")
fut.get_loop().stop()
task.add_done_callback(callback)
db_objects.database.allow_sync = True
try:
loop.run_until_complete(apfell_db.connect_async(loop=dbloop))
loop.run_forever()
except Exception as e:
loop.stop()