Skip to content

Commit

Permalink
Add todo. Update state handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
ablakley-r7 committed Jan 29, 2025
1 parent 59fd744 commit 14226a4
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from .schema import MonitorSiemLogsInput, MonitorSiemLogsOutput, MonitorSiemLogsState, Input, Output, Component, State
from typing import Dict, List, Tuple
from datetime import datetime, timezone, timedelta

import copy

LOG_TYPES = ["receipt", "url protect", "attachment protect"]
MAX_LOOKBACK_DAYS = 7
Expand All @@ -27,28 +27,31 @@ def run(self, params={}, state={}, custom_config={}): # pylint: disable=unused-
self.logger.info(f"TASK: Received State: {state}")
existing_state = state.copy()
try:
run_condition = self.detect_run_condition(state)
# TODO: Additional error handling
run_condition = self.detect_run_condition(state.get("query_config", {}))
self.logger.info(f"TASK: Current run state is {run_condition}")
state = self.update_state(state, custom_config)
self.logger.info(f"NEW STATE: {state}")
now_date = datetime.now(tz=timezone.utc).date()
max_run_lookback_date = self.get_max_lookback_date(now_date, run_condition, bool(custom_config))
if not state:
# Check if all of the three necessary log types are always in the query config
initial_log_type_config = {"caught_up": False}
state = {"query_config": dict.fromkeys(LOG_TYPES, initial_log_type_config.copy())}
self.apply_custom_config(state, custom_config)
query_config = self.prepare_query_params(state.get("query_config"), max_run_lookback_date, now_date)
query_config = self.prepare_query_params(state.get("query_config", {}), max_run_lookback_date, now_date)
logs, query_config = self.get_all_logs(run_condition, query_config)
exit_state, has_more_pages = self.prepare_exit_state(query_config, now_date)
# TODO: Dedupe
self.logger.info(f"TASK: Total logs collected this run {len(logs)}")
exit_state, has_more_pages = self.prepare_exit_state(state, query_config, now_date)
return logs, exit_state, has_more_pages, 200, None
except APIException as error:
raise error
self.logger.info(
f"Error: An API exception has occurred. Status code: {error.status_code} returned. Cause: {error.cause}. Error data: {error.data}."
)
return [], existing_state, False, error.status_code, error
except PluginException as error:
raise error
self.logger.info(f"Error: A Plugin exception has occurred. Cause: {error.cause} Error data: {error.data}.")
return [], existing_state, False, error.status_code, error
except Exception as error:
raise error
self.logger.info(f"Error: Unknown exception has occurred. No results returned. Error Data: {error}")
return [], existing_state, False, 500, PluginException(preset=PluginException.Preset.UNKNOWN, data=error)

Expand All @@ -65,9 +68,24 @@ def detect_run_condition(self, query_config: Dict) -> str:
return PAGINATION_RUN
return SUBSEQUENT_RUN

def get_max_lookback_date(
self, now_date: datetime, run_condition: str, custom_config: bool
) -> datetime:
def update_state(self, state: Dict, custom_config: Dict) -> Dict:
"""
Initialise state, validate state, apply custom config
:param state:
:param custom_config:
:return:
"""
initial_log_type_config = {"caught_up": False}
if not state:
state = {"query_config": {log_type: copy.deepcopy(initial_log_type_config) for log_type in LOG_TYPES}}
self.apply_custom_config(state, custom_config)
else:
for log_type in LOG_TYPES:
if log_type not in state.get("query_config", {}).keys():
state["query_config"][log_type] = copy.deepcopy(initial_log_type_config)
return state

def get_max_lookback_date(self, now_date: datetime, run_condition: str, custom_config: bool) -> datetime:
"""
Get max lookback date for run condition
:param now_date:
Expand All @@ -79,7 +97,7 @@ def get_max_lookback_date(
if run_condition in [INITIAL_RUN] and not custom_config:
max_run_lookback_days = INITIAL_MAX_LOOKBACK_DAYS

max_run_lookback_date = (now_date - timedelta(days=max_run_lookback_days))
max_run_lookback_date = now_date - timedelta(days=max_run_lookback_days)
return max_run_lookback_date

def apply_custom_config(self, state: Dict, custom_config: Dict) -> None:
Expand All @@ -89,6 +107,7 @@ def apply_custom_config(self, state: Dict, custom_config: Dict) -> None:
:param custom_config:
:return: N/A
"""
# TODO: Additional custom config for page size, thread size, limit
current_query_config = state.get("query_config")
for log_type, lookback_date_string in custom_config.items():
self.logger.info(f"TASK: Supplied lookback date of {lookback_date_string} for {log_type} log type")
Expand All @@ -104,6 +123,8 @@ def prepare_query_params(self, query_config: Dict, max_lookback_date: Dict, now_
"""
for log_type, log_type_config in query_config.items():
query_date_str = log_type_config.get("query_date")
self.logger.info(f"PREPPING {log_type_config}")
self.logger.info(f"{log_type}, {query_date_str}")
if query_date_str:
query_date = datetime.strptime(query_date_str, "%Y-%m-%d").date()
if not query_date_str:
Expand Down Expand Up @@ -154,22 +175,21 @@ def get_all_logs(self, run_condition: str, query_config: Dict) -> Tuple[List, Di
self.logger.info(f"TASK: Query for {log_type} is caught up. Skipping as we are currently paginating")
return complete_logs, query_config

def prepare_exit_state(self, state: dict, now_date: datetime) -> Tuple[Dict, bool]:
def prepare_exit_state(self, state: dict, query_config: dict, now_date: datetime) -> Tuple[Dict, bool]:
"""
Prepare state and pagination for task completion. Format date time.
:param state:
:param query_config:
:param now_date:
:return: state, has_more_pages
"""
has_more_pages = False
for log_type_config in state.values():
for log_type_config in query_config.values():
query_date = log_type_config.get("query_date")
if log_type_config.get("caught_up") is True:
if isinstance(query_date, str):
query_date = datetime.strptime(query_date, "%Y-%m-%d").date()
if (not log_type_config.get("caught_up")) or query_date < now_date:
has_more_pages = True
if query_date:
if isinstance(query_date, str):
query_date = datetime.strptime(query_date, "%Y-%m-%d").date()
if query_date < now_date:
has_more_pages = True
log_type_config["query_date"] = query_date.strftime("%Y-%m-%d")
log_type_config["query_date"] = query_date.strftime("%Y-%m-%d")
state["query_config"] = query_config
return state, has_more_pages
2 changes: 2 additions & 0 deletions plugins/mimecast_v2/icon_mimecast_v2/util/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def get_siem_batches(self, log_type: str, query_date: str, next_page: str, page_
return urls, batch_response.get("@nextPage"), caught_up

def get_siem_logs_from_batch(self, url: str):
# TODO: Threading
response = requests.request(method=GET, url=url, stream=False)
with gzip.GzipFile(fileobj=BytesIO(response.content), mode="rb") as file_:
logs = []
Expand All @@ -89,6 +90,7 @@ def make_api_request(
if auth:
headers["Authorization"] = f"Bearer {self.access_token}"
request = Request(url=url, method=method, headers=headers, params=params, data=data, json=json)
# TODO: Handle rate limit, handle retry backoff
try:
response = make_request(
_request=request,
Expand Down

0 comments on commit 14226a4

Please sign in to comment.