2024-11-21 23:26:07 +01:00
|
|
|
"""N8N utils module."""
|
|
|
|
|
2023-04-05 21:57:31 +02:00
|
|
|
import requests
|
|
|
|
|
|
|
|
from app.utils.config import config
|
|
|
|
|
2024-11-24 11:05:22 +01:00
|
|
|
if config.sentry_enabled:
|
|
|
|
import sentry_sdk
|
|
|
|
|
2023-04-05 21:57:31 +02:00
|
|
|
|
|
|
|
def __n8n_post(data: dict) -> bool:
|
2024-04-21 17:17:00 +02:00
|
|
|
"""Post data to N8N webhook URL.
|
2024-08-04 19:47:43 +02:00
|
|
|
|
2024-04-21 17:17:00 +02:00
|
|
|
Args:
|
|
|
|
data (dict): Data to post to webhook URL.
|
2024-08-04 19:47:43 +02:00
|
|
|
|
2024-04-21 17:17:00 +02:00
|
|
|
Returns:
|
|
|
|
bool: True if successful, else False.
|
|
|
|
"""
|
2023-04-05 21:57:31 +02:00
|
|
|
headers: dict = {"Content-Type": "application/json"}
|
|
|
|
resp: requests.Response = requests.post(
|
|
|
|
url=config.n8n_webhook_url,
|
|
|
|
headers=headers,
|
|
|
|
json=data,
|
|
|
|
timeout=10,
|
|
|
|
verify=False,
|
|
|
|
)
|
|
|
|
return bool(resp.status_code == 200)
|
|
|
|
|
|
|
|
|
|
|
|
def submit_task(summary, description, completion_date, requestor) -> bool:
|
2024-04-21 17:17:00 +02:00
|
|
|
"""Submit task to N8N webhook URL.
|
2024-08-04 19:47:43 +02:00
|
|
|
|
2024-04-21 17:17:00 +02:00
|
|
|
Args:
|
|
|
|
summary (str): Summary of task.
|
|
|
|
description (str): Description of task.
|
|
|
|
completion_date (str): Completion date of task.
|
|
|
|
requestor (str): Requestor of task.
|
2024-08-04 19:47:43 +02:00
|
|
|
|
2024-04-21 17:17:00 +02:00
|
|
|
Returns:
|
|
|
|
bool: True if successful, else False.
|
|
|
|
"""
|
2024-11-24 11:05:22 +01:00
|
|
|
data: dict = {
|
|
|
|
"requestor": requestor,
|
|
|
|
"title": summary,
|
|
|
|
"description": description,
|
|
|
|
"completiondate": completion_date,
|
|
|
|
}
|
|
|
|
if not config.sentry_enabled:
|
|
|
|
_data = __n8n_post(data=data)
|
|
|
|
return _data
|
2024-04-21 17:17:00 +02:00
|
|
|
with sentry_sdk.start_transaction(name="submit_task"):
|
|
|
|
_data = __n8n_post(data=data)
|
2024-11-24 11:05:22 +01:00
|
|
|
return _data
|
2023-04-20 20:37:01 +02:00
|
|
|
|
|
|
|
|
|
|
|
def get_tasks(requestor) -> bool:
|
2024-04-21 17:17:00 +02:00
|
|
|
"""Get tasks from N8N webhook URL.
|
2024-08-04 19:47:43 +02:00
|
|
|
|
2024-04-21 17:17:00 +02:00
|
|
|
Args:
|
|
|
|
requestor (str): Requestor of tasks.
|
2024-08-04 19:47:43 +02:00
|
|
|
|
2024-04-21 17:17:00 +02:00
|
|
|
Returns:
|
|
|
|
bool: True if successful, else False.
|
|
|
|
"""
|
2024-11-24 11:05:22 +01:00
|
|
|
headers: dict = {"Content-Type": "application/json"}
|
|
|
|
if not config.sentry_enabled:
|
|
|
|
resp: requests.Response = requests.get(
|
|
|
|
url=config.n8n_webhook_url,
|
|
|
|
headers=headers,
|
|
|
|
timeout=10,
|
|
|
|
verify=False,
|
|
|
|
params={"requestor": requestor},
|
|
|
|
)
|
|
|
|
_data = bool(resp.status_code == 200)
|
|
|
|
return _data
|
2024-04-21 17:17:00 +02:00
|
|
|
with sentry_sdk.start_transaction(name="get_tasks"):
|
|
|
|
resp: requests.Response = requests.get(
|
|
|
|
url=config.n8n_webhook_url,
|
|
|
|
headers=headers,
|
|
|
|
timeout=10,
|
|
|
|
verify=False,
|
|
|
|
params={"requestor": requestor},
|
|
|
|
)
|
|
|
|
_data = bool(resp.status_code == 200)
|
2024-11-24 11:05:22 +01:00
|
|
|
return _data
|