# frozen_string_literal: true # Service for automatically creating network ranges for unmatched IPs class NetworkRangeGenerator include ActiveModel::Model include ActiveModel::Attributes # Minimum network sizes for different IP types IPV4_MIN_SIZE = 24 # /24 = 256 IPs IPV6_MIN_SIZE = 64 # /64 = 2^64 IPs (standard IPv6 allocation) # Special network ranges to avoid RESERVED_RANGES = [ IPAddr.new('10.0.0.0/8'), # Private IPAddr.new('172.16.0.0/12'), # Private IPAddr.new('192.168.0.0/16'), # Private IPAddr.new('127.0.0.0/8'), # Loopback IPAddr.new('169.254.0.0/16'), # Link-local IPAddr.new('224.0.0.0/4'), # Multicast IPAddr.new('240.0.0.0/4'), # Reserved IPAddr.new('::1/128'), # IPv6 loopback IPAddr.new('fc00::/7'), # IPv6 private IPAddr.new('fe80::/10'), # IPv6 link-local IPAddr.new('ff00::/8') # IPv6 multicast ].freeze # Special network ranges to avoid RESERVED_RANGES = [ IPAddr.new('10.0.0.0/8'), # Private IPAddr.new('172.16.0.0/12'), # Private IPAddr.new('192.168.0.0/16'), # Private IPAddr.new('127.0.0.0/8'), # Loopback IPAddr.new('169.254.0.0/16'), # Link-local IPAddr.new('224.0.0.0/4'), # Multicast IPAddr.new('240.0.0.0/4'), # Reserved IPAddr.new('::1/128'), # IPv6 loopback IPAddr.new('fc00::/7'), # IPv6 private IPAddr.new('fe80::/10'), # IPv6 link-local IPAddr.new('ff00::/8') # IPv6 multicast ].freeze class << self # Find or create a network range for the given IP address def find_or_create_for_ip(ip_address, user: nil) ip_str = ip_address.to_s ip_obj = ip_address.is_a?(IPAddr) ? ip_address : IPAddr.new(ip_str) # Check if IP already matches existing ranges existing_range = NetworkRange.contains_ip(ip_str).first if existing_range # If we have an existing range and it's a /32 (single IP), # create a larger network range instead for better analytics if existing_range.masklen == 32 # Don't overwrite manually created or imported ranges unless %w[manual user_created api_imported].include?(existing_range.source) return create_appropriate_network(ip_obj, user: user) end end return existing_range end # Create the appropriate network range for this IP create_appropriate_network(ip_obj, user: user) end # Get the appropriate minimum network size for an IP def minimum_network_size(ip_address) return IPV6_MIN_SIZE if ip_address.ipv6? # For IPv4, use larger networks for known datacenter/ranges if datacenter_ip?(ip_address) 20 # /20 = 4096 IPs for large providers else IPV4_MIN_SIZE # /24 = 256 IPs for general use end end # Check if IP is in a datacenter range def datacenter_ip?(ip_address) # Known major cloud provider ranges cloud_ranges = [ IPAddr.new('3.0.0.0/8'), # AWS IPAddr.new('52.0.0.0/8'), # AWS IPAddr.new('54.0.0.0/8'), # AWS IPAddr.new('13.0.0.0/8'), # AWS IPAddr.new('104.16.0.0/12'), # Cloudflare IPAddr.new('172.64.0.0/13'), # Cloudflare IPAddr.new('104.24.0.0/14'), # Cloudflare IPAddr.new('172.68.0.0/14'), # Cloudflare IPAddr.new('108.170.0.0/16'), # Google IPAddr.new('173.194.0.0/16'), # Google IPAddr.new('209.85.0.0/16'), # Google IPAddr.new('157.240.0.0/16'), # Facebook/Meta IPAddr.new('31.13.0.0/16'), # Facebook/Meta IPAddr.new('69.63.0.0/16'), # Facebook/Meta IPAddr.new('173.252.0.0/16'), # Facebook/Meta IPAddr.new('20.0.0.0/8'), # Microsoft Azure IPAddr.new('40.64.0.0/10'), # Microsoft Azure IPAddr.new('40.96.0.0/11'), # Microsoft Azure IPAddr.new('40.112.0.0/12'), # Microsoft Azure IPAddr.new('40.123.0.0/16'), # Microsoft Azure IPAddr.new('40.124.0.0/14'), # Microsoft Azure IPAddr.new('40.126.0.0/15'), # Microsoft Azure ] cloud_ranges.any? { |range| range.include?(ip_address) } end private # Create the appropriate network range containing the IP def create_appropriate_network(ip_address, user: nil) prefix_length = minimum_network_size(ip_address) # Create the network range with the IP at the center if possible network_cidr = create_network_with_ip(ip_address, prefix_length) # Check if network already exists existing = NetworkRange.find_by(network: network_cidr) return existing if existing # Create new network range NetworkRange.create!( network: network_cidr, source: 'auto_generated', creation_reason: "auto-generated for unmatched IP traffic", user: user, company: nil, # Will be filled by enrichment job asn: nil, country: nil, is_datacenter: datacenter_ip?(ip_address), is_vpn: false, is_proxy: false ) end # Create a network CIDR that contains the given IP with specified prefix length def create_network_with_ip(ip_address, prefix_length) # Convert IP to integer and apply mask ip_int = ip_address.to_i if ip_address.ipv6? # For IPv6, mask to prefix length mask = (2**128 - 1) ^ ((2**(128 - prefix_length)) - 1) network_int = ip_int & mask result = IPAddr.new(network_int, Socket::AF_INET6).mask(prefix_length) else # For IPv4, mask to prefix length mask = (2**32 - 1) ^ ((2**(32 - prefix_length)) - 1) network_int = ip_int & mask result = IPAddr.new(network_int, Socket::AF_INET).mask(prefix_length) end # Return the CIDR notation result.to_s end end end