Files
baffle-hub/app/models/ipv4_range.rb

172 lines
4.9 KiB
Ruby

# frozen_string_literal: true
# Ipv4Range - Stores IPv4 network ranges with IP intelligence metadata
#
# Optimized for fast range lookups using network_start/network_end integers.
# Stores metadata about IP ranges including ASN, company, geographic info,
# and flags for datacenter/proxy/VPN detection.
class Ipv4Range < ApplicationRecord
# Validations
validates :network_start, presence: true
validates :network_end, presence: true
validates :network_prefix, presence: true,
numericality: { greater_than_or_equal_to: 0, less_than_or_equal_to: 32 }
# Callbacks
before_validation :calculate_range, if: -> { cidr.present? }
# Scopes for common queries
scope :datacenter, -> { where(is_datacenter: true) }
scope :proxy, -> { where(is_proxy: true) }
scope :vpn, -> { where(is_vpn: true) }
scope :by_country, ->(country) { where(ip_api_country: country) }
scope :by_company, ->(company) { where(company: company) }
scope :by_asn, ->(asn) { where(asn: asn) }
# Virtual attribute for setting IP via CIDR notation
attr_accessor :cidr
# Find ranges that contain a specific IPv4 address
def self.contains_ip(ip_string)
ip_addr = IPAddr.new(ip_string)
raise ArgumentError, "Not an IPv4 address" unless ip_addr.ipv4?
ip_int = ip_addr.to_i
where("? BETWEEN network_start AND network_end", ip_int)
.order(network_prefix: :desc) # Most specific first
end
# Check if this range contains a specific IP
def contains_ip?(ip_string)
ip_addr = IPAddr.new(ip_string)
return false unless ip_addr.ipv4?
ip_int = ip_addr.to_i
ip_int >= network_start && ip_int <= network_end
end
# Get CIDR notation for this range
def to_cidr
return nil unless network_start.present?
ip_addr = IPAddr.new(network_start, Socket::AF_INET)
"#{ip_addr}/#{network_prefix}"
end
# String representation
def to_s
to_cidr || "Ipv4Range##{id}"
end
# Convenience methods for JSON fields
def abuser_scores_hash
abuser_scores ? JSON.parse(abuser_scores) : {}
rescue JSON::ParserError
{}
end
def abuser_scores_hash=(hash)
self.abuser_scores = hash.to_json
end
def additional_data_hash
additional_data ? JSON.parse(additional_data) : {}
rescue JSON::ParserError
{}
end
def additional_data_hash=(hash)
self.additional_data = hash.to_json
end
# GeoIP lookup methods
def geo_lookup_country!
return if ip_api_country.present? || geo2_country.present?
# Use the first IP in the range for lookup
sample_ip = IPAddr.new(network_start, Socket::AF_INET).to_s
country = GeoIpService.lookup_country(sample_ip)
if country.present?
# Update both country fields for redundancy
update!(ip_api_country: country, geo2_country: country)
country
end
rescue => e
Rails.logger.error "Failed to lookup geo location for IPv4 range #{id}: #{e.message}"
nil
end
def geo_lookup_country
return ip_api_country if ip_api_country.present?
return geo2_country if geo2_country.present?
# Use the first IP in the range for lookup
sample_ip = IPAddr.new(network_start, Socket::AF_INET).to_s
GeoIpService.lookup_country(sample_ip)
rescue => e
Rails.logger.error "Failed to lookup geo location for IPv4 range #{id}: #{e.message}"
nil
end
# Check if this range has any country information
def has_country_info?
ip_api_country.present? || geo2_country.present?
end
# Get the best available country code
def primary_country
ip_api_country || geo2_country
end
# Class method to lookup country for any IP in the range
def self.lookup_country_by_ip(ip_string)
range = contains_ip(ip_string).first
return nil unless range
range.geo_lookup_country
end
# Class method to enrich ranges without country data
def self.enrich_missing_geo_data(limit: 1000)
ranges_without_geo = where(ip_api_country: [nil, ''], geo2_country: [nil, ''])
.limit(limit)
updated_count = 0
geo_service = GeoIpService.new
ranges_without_geo.find_each do |range|
country = geo_service.lookup_country(IPAddr.new(range.network_start, Socket::AF_INET).to_s)
if country.present?
range.update!(ip_api_country: country, geo2_country: country)
updated_count += 1
end
end
updated_count
end
private
# Calculate network_start and network_end from CIDR notation
def calculate_range
return unless cidr.present?
ip_addr = IPAddr.new(cidr)
raise ArgumentError, "Not an IPv4 CIDR" unless ip_addr.ipv4?
# Get prefix from CIDR
self.network_prefix = cidr.split("/").last.to_i
# Calculate network range
first_ip = ip_addr.to_range.first
last_ip = ip_addr.to_range.last
self.network_start = first_ip.to_i
self.network_end = last_ip.to_i
rescue IPAddr::InvalidAddressError => e
errors.add(:cidr, "invalid IPv4 CIDR notation: #{e.message}")
end
end