Added checks to ensure API responses are as expected #7
							
								
								
									
										1
									
								
								.github/workflows/ci-branch-main.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.github/workflows/ci-branch-main.yml
									
									
									
									
										vendored
									
									
								
							@@ -7,6 +7,7 @@ on:
 | 
				
			|||||||
    paths-ignore:
 | 
					    paths-ignore:
 | 
				
			||||||
      - 'README.md'
 | 
					      - 'README.md'
 | 
				
			||||||
      - 'LICENSE.md'
 | 
					      - 'LICENSE.md'
 | 
				
			||||||
 | 
					      - 'CHANGELOG.md'
 | 
				
			||||||
      - '.gitignore'
 | 
					      - '.gitignore'
 | 
				
			||||||
      - 'renovate.json'
 | 
					      - 'renovate.json'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										1
									
								
								.github/workflows/ci-development.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.github/workflows/ci-development.yml
									
									
									
									
										vendored
									
									
								
							@@ -7,6 +7,7 @@ on:
 | 
				
			|||||||
    paths-ignore:
 | 
					    paths-ignore:
 | 
				
			||||||
      - 'README.md'
 | 
					      - 'README.md'
 | 
				
			||||||
      - 'LICENSE.md'
 | 
					      - 'LICENSE.md'
 | 
				
			||||||
 | 
					      - 'CHANGELOG.md'
 | 
				
			||||||
      - '.gitignore'
 | 
					      - '.gitignore'
 | 
				
			||||||
      - 'renovate.json'
 | 
					      - 'renovate.json'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										1
									
								
								.github/workflows/ci-pull-request.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.github/workflows/ci-pull-request.yml
									
									
									
									
										vendored
									
									
								
							@@ -4,6 +4,7 @@ on:
 | 
				
			|||||||
    paths-ignore:
 | 
					    paths-ignore:
 | 
				
			||||||
      - 'README.md'
 | 
					      - 'README.md'
 | 
				
			||||||
      - 'LICENSE.md'
 | 
					      - 'LICENSE.md'
 | 
				
			||||||
 | 
					      - 'CHANGELOG.md'
 | 
				
			||||||
      - '.gitignore'
 | 
					      - '.gitignore'
 | 
				
			||||||
      - 'renovate.json'
 | 
					      - 'renovate.json'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										5
									
								
								CHANGELOG.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										5
									
								
								CHANGELOG.md
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,5 @@
 | 
				
			|||||||
 | 
					# 1.1
 | 
				
			||||||
 | 
					- Added checks to ensure API responses are as expected
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# 1.0
 | 
				
			||||||
 | 
					- Initial release
 | 
				
			||||||
@@ -2,4 +2,4 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
"""MODULE: Specifies app version."""
 | 
					"""MODULE: Specifies app version."""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
VERSION = "1.0"
 | 
					VERSION = "1.1"
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -9,8 +9,13 @@ import requests
 | 
				
			|||||||
def get_ip_information(ipv4_address: ipaddress.IPv4Address) -> dict:
 | 
					def get_ip_information(ipv4_address: ipaddress.IPv4Address) -> dict:
 | 
				
			||||||
    """Retrieves information about a given IPv4 address from IP-API.com."""
 | 
					    """Retrieves information about a given IPv4 address from IP-API.com."""
 | 
				
			||||||
    api_endpoint = f"http://ip-api.com/json/{ipv4_address}"
 | 
					    api_endpoint = f"http://ip-api.com/json/{ipv4_address}"
 | 
				
			||||||
    resp = requests.get(api_endpoint).json()
 | 
					    try:
 | 
				
			||||||
    return resp
 | 
					        resp = requests.get(api_endpoint)
 | 
				
			||||||
 | 
					        resp.raise_for_status()
 | 
				
			||||||
 | 
					        ret = resp.json() if resp.json().get("status") == "success" else None
 | 
				
			||||||
 | 
					    except (requests.exceptions.JSONDecodeError, requests.exceptions.HTTPError):
 | 
				
			||||||
 | 
					        ret = None
 | 
				
			||||||
 | 
					    return ret
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def get_autonomous_system_number(as_info: str) -> str:
 | 
					def get_autonomous_system_number(as_info: str) -> str:
 | 
				
			||||||
@@ -22,7 +27,11 @@ def get_autonomous_system_number(as_info: str) -> str:
 | 
				
			|||||||
def get_prefix_information(autonomous_system: int) -> list:
 | 
					def get_prefix_information(autonomous_system: int) -> list:
 | 
				
			||||||
    """Retrieves prefix information about a given autonomous system."""
 | 
					    """Retrieves prefix information about a given autonomous system."""
 | 
				
			||||||
    api_endpoint = f"https://api.hackertarget.com/aslookup/?q={str(autonomous_system)}"
 | 
					    api_endpoint = f"https://api.hackertarget.com/aslookup/?q={str(autonomous_system)}"
 | 
				
			||||||
    resp = requests.get(api_endpoint).text
 | 
					    try:
 | 
				
			||||||
    prefixes = resp.split("\n")
 | 
					        resp = requests.get(api_endpoint)
 | 
				
			||||||
 | 
					        resp.raise_for_status()
 | 
				
			||||||
 | 
					    except requests.exceptions.HTTPError:
 | 
				
			||||||
 | 
					        return None
 | 
				
			||||||
 | 
					    prefixes = resp.text.split("\n")
 | 
				
			||||||
    prefixes.pop(0)
 | 
					    prefixes.pop(0)
 | 
				
			||||||
    return prefixes
 | 
					    return prefixes
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -38,6 +38,9 @@ def main():
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    # Get information from API
 | 
					    # Get information from API
 | 
				
			||||||
    ip_info = get_ip_information(ip_address)
 | 
					    ip_info = get_ip_information(ip_address)
 | 
				
			||||||
 | 
					    if not ip_info:
 | 
				
			||||||
 | 
					        print("ERROR: could not retrieve IP information from API.")
 | 
				
			||||||
 | 
					        sys.exit(1)
 | 
				
			||||||
    as_number = get_autonomous_system_number(ip_info.get("as"))
 | 
					    as_number = get_autonomous_system_number(ip_info.get("as"))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Assemble list for table generation
 | 
					    # Assemble list for table generation
 | 
				
			||||||
@@ -56,7 +59,11 @@ def main():
 | 
				
			|||||||
    # If wanted, get prefix information
 | 
					    # If wanted, get prefix information
 | 
				
			||||||
    if args.prefixes:
 | 
					    if args.prefixes:
 | 
				
			||||||
        prefix_info = get_prefix_information(as_number)
 | 
					        prefix_info = get_prefix_information(as_number)
 | 
				
			||||||
        table_data.append(["Prefixes", generate_prefix_string(prefix_info)])
 | 
					        if not prefix_info:
 | 
				
			||||||
 | 
					            print("ERROR: could not retrieve prefix information from API.")
 | 
				
			||||||
 | 
					            sys.exit(1)
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            table_data.append(["Prefixes", generate_prefix_string(prefix_info)])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    print_table(table_data)
 | 
					    print_table(table_data)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -2,3 +2,4 @@ black
 | 
				
			|||||||
coverage
 | 
					coverage
 | 
				
			||||||
pylint
 | 
					pylint
 | 
				
			||||||
pytest
 | 
					pytest
 | 
				
			||||||
 | 
					requests-mock
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -2,6 +2,8 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
"""MODULE: Provides test cases for app/ip_info.py."""
 | 
					"""MODULE: Provides test cases for app/ip_info.py."""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import requests_mock
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from app.ip_info import (  # pragma: no cover
 | 
					from app.ip_info import (  # pragma: no cover
 | 
				
			||||||
    get_ip_information,
 | 
					    get_ip_information,
 | 
				
			||||||
    get_autonomous_system_number,
 | 
					    get_autonomous_system_number,
 | 
				
			||||||
@@ -16,6 +18,24 @@ def test_get_ip_information() -> None:
 | 
				
			|||||||
    assert ip_info.get("status") == "success" and ip_info.get("query") == test_query
 | 
					    assert ip_info.get("status") == "success" and ip_info.get("query") == test_query
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def test_get_ip_information_broken_api_response() -> None:
 | 
				
			||||||
 | 
					    """TEST: ensure that None is returned if the IP API response is broken."""
 | 
				
			||||||
 | 
					    test_query = "1.2.3.4"
 | 
				
			||||||
 | 
					    with requests_mock.Mocker() as mocker:
 | 
				
			||||||
 | 
					        mocker.get(f"http://ip-api.com/json/{test_query}", text="error")
 | 
				
			||||||
 | 
					        resp = get_ip_information(test_query)
 | 
				
			||||||
 | 
					        assert not resp
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def test_get_ip_information_bad_response() -> None:
 | 
				
			||||||
 | 
					    """TEST: ensure that None is returned if the IP API returns code 404."""
 | 
				
			||||||
 | 
					    test_query = "1.2.3.4"
 | 
				
			||||||
 | 
					    with requests_mock.Mocker() as mocker:
 | 
				
			||||||
 | 
					        mocker.get(f"http://ip-api.com/json/{test_query}", status_code=404)
 | 
				
			||||||
 | 
					        resp = get_ip_information(test_query)
 | 
				
			||||||
 | 
					        assert not resp
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def test_get_autonomous_system_number() -> None:
 | 
					def test_get_autonomous_system_number() -> None:
 | 
				
			||||||
    """TEST: ensure that AS information is parsed into AS number correctly."""
 | 
					    """TEST: ensure that AS information is parsed into AS number correctly."""
 | 
				
			||||||
    as_info = "AS5089 Virgin Media Limited"
 | 
					    as_info = "AS5089 Virgin Media Limited"
 | 
				
			||||||
@@ -28,3 +48,27 @@ def test_get_prefix_information() -> None:
 | 
				
			|||||||
    autonomous_system = "AS109"
 | 
					    autonomous_system = "AS109"
 | 
				
			||||||
    prefixes = get_prefix_information(autonomous_system)
 | 
					    prefixes = get_prefix_information(autonomous_system)
 | 
				
			||||||
    assert "144.254.0.0/16" in prefixes
 | 
					    assert "144.254.0.0/16" in prefixes
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def test_get_prefix_information_broken_api_response() -> None:
 | 
				
			||||||
 | 
					    """TEST: ensure that None is returned if the prefix API response is broken."""
 | 
				
			||||||
 | 
					    autonomous_system = "AS109"
 | 
				
			||||||
 | 
					    with requests_mock.Mocker() as mocker:
 | 
				
			||||||
 | 
					        mocker.get(
 | 
				
			||||||
 | 
					            f"https://api.hackertarget.com/aslookup/?q={str(autonomous_system)}",
 | 
				
			||||||
 | 
					            text="error",
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        resp = get_prefix_information(autonomous_system)
 | 
				
			||||||
 | 
					        assert not resp
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def test_get_prefix_information_bad_response() -> None:
 | 
				
			||||||
 | 
					    """TEST: ensure that None is returned if the prefix API returns code 404."""
 | 
				
			||||||
 | 
					    autonomous_system = "AS109"
 | 
				
			||||||
 | 
					    with requests_mock.Mocker() as mocker:
 | 
				
			||||||
 | 
					        mocker.get(
 | 
				
			||||||
 | 
					            f"https://api.hackertarget.com/aslookup/?q={str(autonomous_system)}",
 | 
				
			||||||
 | 
					            status_code=404,
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        resp = get_prefix_information(autonomous_system)
 | 
				
			||||||
 | 
					        assert not resp
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user