import requests import re import time import urllib.parse import json import argparse from urllib3.exceptions import InsecureRequestWarning def main(): # CLI argument parser parser = argparse.ArgumentParser(description='Compare IP blacklist files against IOTA metadata. Blacklist files must contain a list of single IPv4/IPv6 addresses or IP subnet in CIDR notation.') # Required arguments parser.add_argument('-d', '--device_ip', required=True, help='Device IP address') parser.add_argument('-i', '--infile', required=True, nargs='+', help='Blacklist file(s)') # Optional arguments parser.add_argument('-u', '--device_username', default='admin', help='Device IP username (default: admin)') parser.add_argument('-p', '--device_password', default='admin', help='Device IP password (default: admin)') parser.add_argument('-l', '--query_time_window_s', type=int, default=600, help='Number of seconds from the current time to query IOTA metadata (default: 600)') args = parser.parse_args() query_time_window_end = int(time.time()) query_time_window_start = query_time_window_end - args.query_time_window_s results = {} session = requests.Session() session.auth = (args.device_username, args.device_password) # Suppress warnings from urllib3 requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) # Open blacklist files, iterate over them for blacklist_file in args.infile: count_fail = 0 print(f' - {blacklist_file}') with open(blacklist_file) as f: for line in f: # Skip comments or empty lines if re.match(r"^[;#\s]", line): continue # Current IP address or subnet ip = line.split()[0] print(f' - {ip:45}', end='') # Prepare CIDR subnet query if re.match(r"^(?:(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3}/[0-9]{1,2})|(?:[0-9a-fA-F\:]{2,39}/[0-9]{1,2}))$", ip): query = urllib.parse.quote(f"SELECT COUNT(DISTINCT FLOW_ID) AS matching_flows FROM flows.base WHERE (isIPAddressInRange(IP_SRC, '{ip}') OR isIPAddressInRange(IP_DST, '{ip}')) AND DATE_PKT_MS_FIRST >= toDateTime64({query_time_window_start}, 3) AND DATE_PKT_MS_FIRST <= toDateTime64({query_time_window_end}, 3) FORMAT JSON", safe='()*') # Prepare single IP address query elif re.match(r"^(?:(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F\:]{2,39}))$", ip): query = urllib.parse.quote(f"SELECT COUNT(DISTINCT FLOW_ID) AS matching_flows FROM flows.base WHERE (IP_SRC = '{ip}' OR IP_DST = '{ip}') AND DATE_PKT_MS_FIRST >= toDateTime64({query_time_window_start}, 3) AND DATE_PKT_MS_FIRST <= toDateTime64({query_time_window_end}, 3) FORMAT JSON", safe='()*') # Some unknown case else: print('Unhandled case') continue # Make the prepared query, save the response response = session.get(f'https://{args.device_ip}/api/datasources/proxy/3/?query={query}', verify=False) matching_flows = json.loads(response.content)['data'][0]['matching_flows'] # If handled CIDR subnet or single IP address, print result if matching_flows == "0": print('Ok') else: count_fail += 1 print(f'Match (flows: {matching_flows})') # Record results after completing file results[blacklist_file] = count_fail # Print results print('\nResults\n===') for file in results: print(f' - {file:47}{str(results[file])} matches') if __name__ == '__main__': main()