Files
baffle-hub/app/services/network_range_generator.rb
Dan Milne 6433f6c5bb Updates
2025-11-14 16:35:49 +11:00

143 lines
5.2 KiB
Ruby

# frozen_string_literal: true
# Service for automatically creating network ranges for unmatched IPs
class NetworkRangeGenerator
include ActiveModel::Model
include ActiveModel::Attributes
# Minimum network sizes for different IP types
IPV4_MIN_SIZE = 24 # /24 = 256 IPs
IPV6_MIN_SIZE = 64 # /64 = 2^64 IPs (standard IPv6 allocation)
# Special network ranges to avoid
RESERVED_RANGES = [
IPAddr.new('10.0.0.0/8'), # Private
IPAddr.new('172.16.0.0/12'), # Private
IPAddr.new('192.168.0.0/16'), # Private
IPAddr.new('127.0.0.0/8'), # Loopback
IPAddr.new('169.254.0.0/16'), # Link-local
IPAddr.new('224.0.0.0/4'), # Multicast
IPAddr.new('240.0.0.0/4'), # Reserved
IPAddr.new('::1/128'), # IPv6 loopback
IPAddr.new('fc00::/7'), # IPv6 private
IPAddr.new('fe80::/10'), # IPv6 link-local
IPAddr.new('ff00::/8') # IPv6 multicast
].freeze
class << self
# Find or create a network range for the given IP address
def find_or_create_for_ip(ip_address, user: nil)
ip_str = ip_address.to_s
ip_obj = ip_address.is_a?(IPAddr) ? ip_address : IPAddr.new(ip_str)
# Check if IP already matches existing ranges
existing_range = NetworkRange.contains_ip(ip_str).first
if existing_range
# If we have an existing range and it's a /32 (single IP),
# create a larger network range instead for better analytics
if existing_range.masklen == 32
# Don't overwrite manually created or imported ranges
unless %w[manual user_created api_imported].include?(existing_range.source)
return create_appropriate_network(ip_obj, user: user)
end
end
return existing_range
end
# Create the appropriate network range for this IP
create_appropriate_network(ip_obj, user: user)
end
# Get the appropriate minimum network size for an IP
def minimum_network_size(ip_address)
return IPV6_MIN_SIZE if ip_address.ipv6?
# For IPv4, use larger networks for known datacenter/ranges
if datacenter_ip?(ip_address)
20 # /20 = 4096 IPs for large providers
else
IPV4_MIN_SIZE # /24 = 256 IPs for general use
end
end
# Check if IP is in a datacenter range
def datacenter_ip?(ip_address)
# Known major cloud provider ranges
cloud_ranges = [
IPAddr.new('3.0.0.0/8'), # AWS
IPAddr.new('52.0.0.0/8'), # AWS
IPAddr.new('54.0.0.0/8'), # AWS
IPAddr.new('13.0.0.0/8'), # AWS
IPAddr.new('104.16.0.0/12'), # Cloudflare
IPAddr.new('172.64.0.0/13'), # Cloudflare
IPAddr.new('104.24.0.0/14'), # Cloudflare
IPAddr.new('172.68.0.0/14'), # Cloudflare
IPAddr.new('108.170.0.0/16'), # Google
IPAddr.new('173.194.0.0/16'), # Google
IPAddr.new('209.85.0.0/16'), # Google
IPAddr.new('157.240.0.0/16'), # Facebook/Meta
IPAddr.new('31.13.0.0/16'), # Facebook/Meta
IPAddr.new('69.63.0.0/16'), # Facebook/Meta
IPAddr.new('173.252.0.0/16'), # Facebook/Meta
IPAddr.new('20.0.0.0/8'), # Microsoft Azure
IPAddr.new('40.64.0.0/10'), # Microsoft Azure
IPAddr.new('40.96.0.0/11'), # Microsoft Azure
IPAddr.new('40.112.0.0/12'), # Microsoft Azure
IPAddr.new('40.123.0.0/16'), # Microsoft Azure
IPAddr.new('40.124.0.0/14'), # Microsoft Azure
IPAddr.new('40.126.0.0/15'), # Microsoft Azure
]
cloud_ranges.any? { |range| range.include?(ip_address) }
end
private
# Create the appropriate network range containing the IP
def create_appropriate_network(ip_address, user: nil)
prefix_length = minimum_network_size(ip_address)
# Create the network range with the IP at the center if possible
network_cidr = create_network_with_ip(ip_address, prefix_length)
# Check if network already exists
existing = NetworkRange.find_by(network: network_cidr)
return existing if existing
# Create new network range
NetworkRange.create!(
network: network_cidr,
source: 'auto_generated',
creation_reason: "auto-generated for unmatched IP traffic",
user: user,
company: nil, # Will be filled by enrichment job
asn: nil,
country: nil,
is_datacenter: datacenter_ip?(ip_address),
is_vpn: false,
is_proxy: false
)
end
# Create a network CIDR that contains the given IP with specified prefix length
def create_network_with_ip(ip_address, prefix_length)
# Convert IP to integer and apply mask
ip_int = ip_address.to_i
if ip_address.ipv6?
# For IPv6, mask to prefix length
mask = (2**128 - 1) ^ ((2**(128 - prefix_length)) - 1)
network_int = ip_int & mask
result = IPAddr.new(network_int, Socket::AF_INET6).mask(prefix_length)
else
# For IPv4, mask to prefix length
mask = (2**32 - 1) ^ ((2**(32 - prefix_length)) - 1)
network_int = ip_int & mask
result = IPAddr.new(network_int, Socket::AF_INET).mask(prefix_length)
end
# Return the CIDR notation
result.to_s
end
end
end