Detecting blacklisted IP addresses through IOTA API

IOTA devices provide a RESTful API which allows direct access to the data stored on the device. This can be useful for integrating into various scenarios. In this case, to filter for blacklisted IPs in the currently captured content. Use cases may be to analyze if certain internal IPs are not propagated beyond a specific access level, or if certain devices are being accessed from an area where they should not be accessed from.

  • Basic knowledge of Python programming.
  • Access to an IOTA device from Profitap.
  • Connect the IOTA device to the network segment you want to monitor.
  • Identifying the correct segment is critical here. Ensure, with a visual inspection of the traffic on the IOTA device, that the correct segment is being used during capture.
  • Open a web browser and enter the IP address of the IOTA device.
    • Example: https://<iota_device_ip_address> (Replace with the actual IP address of your IOTA device).
  • Log in using the correct credentials. This script uses a username/password combination that should allow it to see the necessary information.
  • Set up the network interfaces for packet capture.
    • Choose the interfaces you want to monitor from the device's web interface.
  • Configure filters if needed to focus on specific traffic types.
    • You can set filters to capture only specific types of traffic, such as HTTP or FTP, depending on your needs.

The query engine allows any user to interact with the IOTA device in a machine-readable fashion to query the metadata database. This is done using a RESTful API access with a username/password combination.

To ensure that sensitive information is not propagated and that no information on the device can be manipulated by the user, it is recommended to create a new special user. In this case, a user has been created with the username “apitest”. Note that this user has been automatically added to the organization as a “Viewer”. Nothing else needs to be configured, and this role will prevent any miscommunication by only allowing read-only access.

The solution used here is for creating a dedicated user/password combination for using the script. As it is recommended to protect direct access to the IOTA from any outside interference and to use a HTTPS connection, this is fine. However, please note that it is a good practice to ensure that the user/password combination is not used in any other context and the user has only a Viewer role (which a new user has by default).

The script:

iota_ip_blacklist_check.py
import requests
import re
import time
import urllib.parse
import json
import argparse
from urllib3.exceptions import InsecureRequestWarning
 
def main():
    # CLI options
    parser = argparse.ArgumentParser(description='Compare IP blacklist files against IOTA metadata. Blacklist files must contain a list of single IPv4/IPv6 addresses or IP subnet in CIDR notation.')
    required_args = parser.add_argument_group('required')
 
    required_args.add_argument('-d', '--device_ip', help='Device IP address')
    required_args.add_argument('-i', '--infile', nargs='+', help = 'Blacklist file(s)')
    parser.add_argument('-u', '--device_username', default='admin', help='Device IP username (default: admin)')
    parser.add_argument('-p', '--device_password', default='admin', help='Device IP password (default: admin)')
    parser.add_argument('-l', '--query_time_window_s', type=int, default=600, help='Number of seconds from the current time to query IOTA metadata (default: 600)')
 
    args = parser.parse_args()
    if args.device_ip:
        iota_ip_address = args.device_ip
    if args.device_username:
        iota_username = args.device_username
    if args.device_password:
        iota_password = args.device_password
    if args.query_time_window_s:
        query_time_window_s = args.query_time_window_s
    if args.infile:
        blacklist_files = args.infile
 
    query_time_window_end = int(time.time())
    query_time_window_start = query_time_window_end - query_time_window_s
 
    results = {}
 
    session = requests.Session()
    session.auth = (iota_username, iota_password)
 
    # Suppress warnings from urllib3
    requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
 
    # Open blacklist files, iterate over them
    for blacklist_file in blacklist_files:
        count_fail = 0
        print(f' - {blacklist_file}')
        with open(blacklist_file) as f:
            for line in f:
 
                # Skip comments or empty lines
                if re.match(r"^[;#\s]", line):
                    continue
 
                # Current IP address or subnet
                ip = line.split()[0]
                print(f'   - {ip:45}', end='')
 
                # Handle CIDR subnet query
                if re.match(r"^(?:(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3}/[0-9]{1,2})|(?:[0-9a-fA-F\:]{2,39}/[0-9]{1,2}))$", ip):
                    query = urllib.parse.quote(f"SELECT COUNT(DISTINCT FLOW_ID) AS matching_flows FROM flows.base WHERE (isIPAddressInRange(IP_SRC, '{ip}') OR isIPAddressInRange(IP_DST, '{ip}')) AND DATE_PKT_MS_FIRST >= toDateTime64({query_time_window_start}, 3) AND DATE_PKT_MS_FIRST <= toDateTime64({query_time_window_end}, 3) FORMAT JSON", safe='()*')
                    response = session.get('https://'+iota_ip_address+'/api/datasources/proxy/3/?query='+query, verify=False)
                    matching_flows = json.loads(response.content)['data'][0]['matching_flows']
 
                    if matching_flows == "0":
                        print('Ok')
                    else:
                        count_fail += 1
                        print(f'Match (flows: {matching_flows})')
 
                # Handle single IP address query
                elif re.match(r"^(?:(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F\:]{2,39}))$", ip):
                    query = urllib.parse.quote(f"SELECT COUNT(DISTINCT FLOW_ID) AS matching_flows FROM flows.base WHERE (IP_SRC = '{ip}' OR IP_DST = '{ip}') AND DATE_PKT_MS_FIRST >= toDateTime64({query_time_window_start}, 3) AND DATE_PKT_MS_FIRST <= toDateTime64({query_time_window_end}, 3) FORMAT JSON", safe='()*')
                    response = session.get('https://'+iota_ip_address+'/api/datasources/proxy/3/?query='+query, verify=False)
                    matching_flows = json.loads(response.content)['data'][0]['matching_flows']
 
                    if matching_flows == "0":
                        print('Ok')
                    else:
                        count_fail += 1
                        print(f'Match (flows: {matching_flows})')
 
                # Some unknown case
                else:
                    print('Unhandled case')
 
        # Record results after completing file
        results[blacklist_file] = count_fail
 
    # Print results
    print('\nResults\n===')
    for file in results:
        print(f' - {file:47}{str(results[file])} matches')
 
if __name__ == '__main__':
    main()

1. Import necessary libraries

import requests
import re
import time
import urllib.parse
import json
import argparse
from urllib3.exceptions import InsecureRequestWarning

2. Set up configuration

    # CLI options
    parser = argparse.ArgumentParser(description='Compare IP blacklist files against IOTA metadata. Blacklist files must contain a list of single IPv4/IPv6 addresses or IP subnet in CIDR notation.')
    required_args = parser.add_argument_group('required')
 
    required_args.add_argument('-d', '--device_ip', help='Device IP address')
    required_args.add_argument('-i', '--infile', nargs='+', help = 'Blacklist file(s)')
    parser.add_argument('-u', '--device_username', default='apitest', help='Device IP username (default: apitest)')
    parser.add_argument('-p', '--device_password', default='apitest', help='Device IP password (default: apitest)')
    parser.add_argument('-l', '--query_time_window_s', type=int, default=600, help='Number of seconds from the current time to query IOTA metadata (default: 600)')
  • -d / –device_ip
    The IP of the IOTA device.
  • -i / –infile
    A text file with IP addresses to look for. CIDR subnet queries for address ranges may also be provided. The file may contain comments prefixed with either a ; or #. Any empty line or lines started with spaces will be omitted. By providing this parameter multiple times, multiple input files may be handed over to this script.
  • -u / –device_username
    The username defined in the step above.
  • -p / –device_password
    The password defined in the step above.
  • -l / –query_time_window_s
    How much time in the past should the query occur. The default is 600 seconds, which corresponds to 10 minutes.

The input file:

As an example, here is an input file, which will search for a specific subnet as well as all the IP addresses for the internal DNS server.

infile.txt
# IP Address of DNS server
192.168.1.250

# Host range for internal testing
10.40.0.0/24

3. Main logic to detect blacklisted IPs

The main logic opens the blacklist file, parses it step by step, and queries the internal database for either a corresponding subnet query or a direct IP address query.

query = urllib.parse.quote(f"SELECT COUNT(DISTINCT FLOW_ID) AS matching_flows FROM flows.base WHERE (isIPAddressInRange(IP_SRC, '{ip}') OR isIPAddressInRange(IP_DST, '{ip}')) AND DATE_PKT_MS_FIRST >= toDateTime64({query_time_window_start}, 3) AND DATE_PKT_MS_FIRST <= toDateTime64({query_time_window_end}, 3) FORMAT JSON", safe='()*')
                    response = session.get('https://'+iota_ip_address+'/api/datasources/proxy/3/?query='+query, verify=False)
                    matching_flows = json.loads(response.content)['data'][0]['matching_flows']

First, the query is set up. In this case, we only need to look for the number of entries found, which is why we are performing a COUNT operation. The database is queried in SQL format.

After the query has been created, a session is set up, which will use an HTTP GET operation to query the information. The result is presented as JSON data, and the first entry is being read.

For each IP address found, the number of matching entries is printed, and for each file, the total number of matching entries is printed.

It is recommended that the script is run in a Python environment. This can be done with the following commands:

python3 -m venv ./Development/iota
source ./Development/iota/bin/activate
python3 -m pip install argparse requests

This will create a Python3 venv environment (see here for more information) and install the required libraries that are not present in a default environment.

Then, assuming the Python script, as well as the infile file, is in the current directory, the following will run the script:

In this case, the IOTA was positioned to capture on the outbound WAN connection. In the last 10 minutes, no outbound DNS query was done from the internal DNS server, but 4 times, the internal test network sent data over the WAN connection.

In this example, the script is being used as a whitelist test, but it can easily be configured as a blacklist test as well.

By following this guide, you can effectively utilize the IOTA device and its REST API to monitor network traffic and detect blacklisted IP addresses using Python. This approach provides a powerful tool for maintaining network security in an automated monitoring environment.

  • Last modified: May 21, 2024