Skip to content

Commit

Permalink
move index scraping to pycurl
Browse files Browse the repository at this point in the history
we gonna use only curl and openssl mainly, its really diry right now but need some refactoring tomorrow but works
  • Loading branch information
noisecode3 committed Oct 21, 2024
1 parent c02f60a commit 0674139
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 35 deletions.
1 change: 1 addition & 0 deletions database/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
data.json
file_info.json
__pycache__
trle_cert.pem
66 changes: 66 additions & 0 deletions database/get_leaf_cert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""This module get the certificate for "broken" servers that don't follow
the standard handshake procedure, that is not sending the chain,
curl can still connect to this server by specifying the leaf and curl will
by default look for the chain in /etc/ssl/certs but requests module
require a bundle so that one would have to compile this bundle into the chain"""

import sys
import ssl
import socket
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization

def get_certificate(hostname, port=443):
"""OpenSSL with TCP get the certificate"""
context = ssl.create_default_context()
# Disable certificate verification for the first connection
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE

with socket.create_connection((hostname, port)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
# Get certificate info
cert_der = ssock.getpeercert(True)
if cert_der:
return x509.load_der_x509_certificate(cert_der, default_backend())
return None

def get_sha256_fingerprint(cert):
"""Identify the sum, we might want verify the certificate"""
cert_der = cert.public_bytes(serialization.Encoding.DER)
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
digest.update(cert_der)
return digest.finalize()

def get_serial_number_hex(cert):
"""Identify the serial, this can be use to look for the certificate"""
# Get the serial number in a byte format
serial_number_bytes = cert.serial_number \
.to_bytes((cert.serial_number.bit_length() + 7) // 8, 'big')
# Format it as a hex string
return ':'.join(f'{b:02X}' for b in serial_number_bytes)

def print_certificate_details(cert):
"""Log basic certificate information"""
fingerprint = get_sha256_fingerprint(cert)
fingerprint_hex = ':'.join(f'{b:02X}' for b in fingerprint)
serial_number_hex = get_serial_number_hex(cert)

print(f"SHA-256 fingerprint: {fingerprint_hex}")
print(f"Serial number: {serial_number_hex}")
print(f"Subject: {cert.subject}")
print(f"Issuer: {cert.issuer}")
print()


def run(url):
if url.startswith("https://www.trle.net"):
host = 'trle.net'
elif url.startswith("https://trcustoms.org"):
host = 'trcustoms.org'
else:
sys.exit(1)
certificate = get_certificate(host)
print_certificate_details(certificate)
return certificate # this is bytes data type
48 changes: 48 additions & 0 deletions database/ideas.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,54 @@ int main() {
return 0;
}

### C++ ###
here we download an image with curl should be strait forward be we document it
#include <iostream>
#include <curl/curl.h>
#include <fstream>

size_t WriteCallback(void* contents, size_t size, size_t nmemb, void* userp) {
std::ofstream* out = static_cast<std::ofstream*>(userp);
size_t totalSize = size * nmemb;
out->write(static_cast<char*>(contents), totalSize);
return totalSize;
}

int main() {
CURL* curl;
CURLcode res;
std::ofstream outFile("downloaded_image.jpg", std::ios::binary);

if (!outFile) {
std::cerr << "Failed to open file for writing." << std::endl;
return 1;
}

curl = curl_easy_init();
if (curl) {
// Set the URL
curl_easy_setopt(curl, CURLOPT_URL, "https://data.trcustoms.org/media/level_images/f5b2217a-28b6-4139-a285-174c83efc2c2.png");
// Set the write function
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback);
// Pass the output file stream to the callback function
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &outFile);
// Perform the request
res = curl_easy_perform(curl);
// Check for errors
if (res != CURLE_OK) {
std::cerr << "curl_easy_perform() failed: " << curl_easy_strerror(res) << std::endl;
}
// Cleanup
curl_easy_cleanup(curl);
} else {
std::cerr << "Failed to initialize CURL." << std::endl;
}

outFile.close();
return 0;
}


Never forget how we can test one function in python:
python3 -c "from index_scrape import get_trle_page; print(get_trle_page(0, True))"

119 changes: 84 additions & 35 deletions database/index_scrape.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@
import socket
import uuid
import time
import json
import logging
import tempfile
from io import BytesIO
from urllib.parse import urlparse, urlencode, parse_qs
from datetime import datetime
import pycurl
from bs4 import BeautifulSoup, Tag
from PIL import Image
import requests
from cryptography import x509
from cryptography.x509.oid import ExtensionOID
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization

import index_data

CERT = '/etc/ssl/certs/ca-certificates.crt'
MISCONFIGURED_SERVER = False

# Set up logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(message)s')
Expand Down Expand Up @@ -59,49 +60,97 @@ def get_response(url, content_type):
sys.exit(1)

max_retries = 3
delay = 20
retries = 0
response = None
curl = None
headers = None
response_buffer = None

while retries < max_retries:
try:
response = requests.get(url, verify=CERT, timeout=5)
response.raise_for_status() # Raises an HTTPError for bad responses (4xx/5xx)
break # Exit loop on success
except requests.exceptions.Timeout:
response_buffer = BytesIO()
headers_buffer = BytesIO()
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url)
curl.setopt(pycurl.WRITEDATA, response_buffer)
curl.setopt(pycurl.WRITEHEADER, headers_buffer)

# Set the path to the certificate for SSL/TLS verification
curl.setopt(pycurl.CAINFO, 'trle_cert.pem') # Use your certificate file
headers_list = [
'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept: */*',
'Referer: https://trcustoms.org/' # Change this to the appropriate referrer if needed
]
curl.setopt(pycurl.HTTPHEADER, headers_list)
# Perform the request
curl.perform()

# Get the response code
response_code = curl.getinfo(pycurl.RESPONSE_CODE)

# If the response is not 200 OK, retry
if response_code != 200:
retries += 1
time.sleep(3)
logging.warning(f"Retrying... Response code: {response_code}")
curl.close()
continue

# Get the headers
headers = headers_buffer.getvalue().decode('utf-8')


# Break the loop on success
break

except Exception as e:
retries += 1
logging.error("Request to %s timed out, retrying (%d/%d)...", url, retries, max_retries)
if retries < max_retries:
time.sleep(delay)
else:
logging.error(f"Request failed: {e}")
if retries >= max_retries:
logging.error("Max retries reached. Exiting.")
sys.exit(1)
except requests.exceptions.RequestException as response_error:
logging.error("Failed to retrieve content: %s", response_error)
sys.exit(1)

if response is None:
print("")
if curl is None:
logging.error("No curl instance")
sys.exit(1)

if headers is None:
logging.error("No headers received")
sys.exit(1)

# Get the Content-Type header once and reuse
response_content_type = response.headers.get('Content-Type', '').split(';')[0].strip()

if response_content_type == 'text/html':
return response.text
if response_content_type == 'application/json':
return response.json()
if response_content_type in ['image/jpeg', 'image/png']:
return response.content
if response_content_type == 'application/pkix-cert':
validate_pem(response.text)
return response.content

logging.error("Unexpected content type: %s, expected %s",
response_content_type,
content_type
)
sys.exit(1)
if response_buffer is None:
logging.error("No response received")
sys.exit(1)

# Extract Content-Type from the headers
response_content_type = None
for header in headers.splitlines():
if header.lower().startswith('content-type:'):
response_content_type = header.split(':', 1)[1].split(';')[0].strip()
break

# Validate and return the response based on content type
if response_content_type == 'text/html' and content_type == 'text/html':
response = response_buffer.getvalue().decode('utf-8') # Plain text
curl.close()
return response
elif response_content_type == 'application/json' and content_type == 'application/json':
response = response_buffer.getvalue().decode('utf-8')
curl.close()
return json.loads(response) # Parse and return JSON
elif response_content_type in ['image/jpeg', 'image/png'] and content_type in ['image/jpeg', 'image/png']:
response = response_buffer.getvalue()
curl.close()
return response # Return raw image data
elif response_content_type == 'application/pkix-cert' and content_type == 'application/pkix-cert':
response = response_buffer.getvalue()
curl.close()
# Add custom validation for certificates here if needed
return response # Return raw certificate data
else:
logging.error("Unexpected content type: %s, expected %s",
response_content_type, content_type)
sys.exit(1)


def validate_pem(pem):
Expand Down

0 comments on commit 0674139

Please sign in to comment.