# frozen_string_literal: true class Event < ApplicationRecord belongs_to :project # Normalized association for hosts (most valuable compression) belongs_to :request_host, optional: true # Enums for fixed value sets enum :waf_action, { allow: 0, # allow/pass deny: 1, # deny/block redirect: 2, # redirect challenge: 3 # challenge (future implementation) }, default: :allow, scopes: false enum :request_method, { get: 0, # GET post: 1, # POST put: 2, # PUT patch: 3, # PATCH delete: 4, # DELETE head: 5, # HEAD options: 6 # OPTIONS }, default: :get, scopes: false # Serialize segment IDs as array for easy manipulation in Railssqit serialize :request_segment_ids, type: Array, coder: JSON validates :event_id, presence: true, uniqueness: true validates :timestamp, presence: true scope :recent, -> { order(timestamp: :desc) } scope :by_ip, ->(ip) { where(ip_address: ip) } scope :by_user_agent, ->(user_agent) { where(user_agent: user_agent) } scope :by_waf_action, ->(waf_action) { where(waf_action: waf_action) } scope :blocked, -> { where(waf_action: ['block', 'deny']) } scope :allowed, -> { where(waf_action: ['allow', 'pass']) } scope :rate_limited, -> { where(waf_action: 'rate_limit') } # Path prefix matching using range queries (uses B-tree index efficiently) scope :with_path_prefix, ->(prefix_segment_ids) { return none if prefix_segment_ids.blank? # Use range queries instead of LIKE for index usage # Example: [1,2] prefix matches [1,2], [1,2,3], [1,2,3,4], etc. prefix_str = prefix_segment_ids.to_json # "[1,2]" # For exact match: request_segment_ids = "[1,2]" # For prefix match: "[1,2," <= request_segment_ids < "[1,3," # This works because JSON arrays sort lexicographically # Build the range upper bound by incrementing last segment ID upper_prefix = prefix_segment_ids[0..-2] + [prefix_segment_ids.last + 1] upper_str = upper_prefix.to_json lower_prefix_str = "#{prefix_str[0..-2]}," # "[1,2," - matches longer paths # Use raw SQL to bypass serializer (it expects Array but we're comparing strings) where("request_segment_ids = ? OR (request_segment_ids >= ? AND request_segment_ids < ?)", prefix_str, lower_prefix_str, upper_str) } # Path depth queries scope :path_depth, ->(depth) { where("json_array_length(request_segment_ids) = ?", depth) } scope :path_depth_greater_than, ->(depth) { where("json_array_length(request_segment_ids) > ?", depth) } # Helper methods def path_depth request_segment_ids&.length || 0 end def reconstructed_path return request_path if request_segment_ids.blank? segments = PathSegment.where(id: request_segment_ids).index_by(&:id) '/' + request_segment_ids.map { |id| segments[id]&.segment }.compact.join('/') end # Extract key fields from payload before saving before_validation :extract_fields_from_payload # Normalize event fields after extraction after_validation :normalize_event_fields, if: :should_normalize? def self.create_from_waf_payload!(event_id, payload, project) # Normalize headers in payload during import phase normalized_payload = normalize_payload_headers(payload) # Create the WAF request event create!( project: project, event_id: event_id, timestamp: parse_timestamp(normalized_payload["timestamp"]), payload: normalized_payload, # WAF-specific fields ip_address: normalized_payload.dig("request", "ip"), user_agent: normalized_payload.dig("request", "headers", "user-agent") || normalized_payload.dig("request", "headers", "User-Agent"), # request_method will be set by extract_fields_from_payload + normalize_event_fields request_path: normalized_payload.dig("request", "path"), request_url: normalized_payload.dig("request", "url"), # request_protocol will be set by extract_fields_from_payload + normalize_event_fields response_status: normalized_payload.dig("response", "status_code"), response_time_ms: normalized_payload.dig("response", "duration_ms"), waf_action: normalize_action(normalized_payload["waf_action"]), # Normalize incoming action values rule_matched: normalized_payload["rule_matched"], blocked_reason: normalized_payload["blocked_reason"], # Server/Environment info server_name: normalized_payload["server_name"], environment: normalized_payload["environment"], # Geographic data country_code: normalized_payload.dig("geo", "country_code"), city: normalized_payload.dig("geo", "city"), # WAF agent info agent_version: normalized_payload.dig("agent", "version"), agent_name: normalized_payload.dig("agent", "name") ) end # Normalize headers in payload to lower case during import phase def self.normalize_payload_headers(payload) return payload unless payload.is_a?(Hash) # Create a deep copy to avoid modifying the original normalized = Marshal.load(Marshal.dump(payload)) # Normalize request headers if normalized.dig("request", "headers")&.is_a?(Hash) normalized["request"]["headers"] = normalized["request"]["headers"].transform_keys(&:downcase) end # Normalize response headers if they exist if normalized.dig("response", "headers")&.is_a?(Hash) normalized["response"]["headers"] = normalized["response"]["headers"].transform_keys(&:downcase) end normalized end def self.normalize_action(action) return "allow" if action.nil? || action.blank? case action.to_s.downcase when "allow", "pass", "allowed" "allow" when "deny", "block", "blocked", "deny_access" "deny" when "challenge" "challenge" when "redirect" "redirect" else Rails.logger.warn "Unknown action '#{action}', defaulting to 'allow'" "allow" end end def self.parse_timestamp(timestamp) case timestamp when String Time.parse(timestamp) when Numeric # Sentry timestamps can be in seconds with decimals Time.at(timestamp) when Time timestamp else Time.current end rescue => e Rails.logger.error "Failed to parse timestamp #{timestamp}: #{e.message}" Time.current end def request_details return {} unless payload.present? request_data = payload.dig("request") || {} { ip: request_data["ip"], method: request_data["method"], path: request_data["path"], url: request_data["url"], protocol: request_data["protocol"], headers: request_data["headers"] || {}, query: request_data["query"] || {}, body_size: request_data["body_size"] } end def response_details return {} unless payload.present? response_data = payload.dig("response") || {} { status_code: response_data["status_code"], duration_ms: response_data["duration_ms"], size: response_data["size"] } end def geo_details return {} unless payload.present? payload.dig("geo") || {} end def tags payload&.dig("tags") || {} end def headers raw_headers = payload&.dig("request", "headers") || {} normalize_headers(raw_headers) end def query_params payload&.dig("request", "query") || {} end def blocked? waf_action.in?(['block', 'deny']) end def allowed? waf_action.in?(['allow', 'pass']) end def rate_limited? waf_action == 'rate_limit' end def challenged? waf_action == 'challenge' end def rule_matched? rule_matched.present? end # New path methods for normalization def path_segments return [] unless request_path.present? request_path.split('/').reject(&:blank?) end def path_segments_array @path_segments_array ||= path_segments end def request_hostname return nil unless request_url.present? URI.parse(request_url).hostname rescue nil end # Normalize headers to lower case keys during import phase def normalize_headers(headers) return {} unless headers.is_a?(Hash) headers.transform_keys(&:downcase) end # Network range resolution methods def matching_network_ranges return [] unless ip_address.present? NetworkRange.contains_ip(ip_address).map do |range| { range: range, cidr: range.cidr, prefix_length: range.prefix_length, specificity: range.prefix_length, intelligence: range.inherited_intelligence } end.sort_by { |r| -r[:specificity] } # Most specific first end def most_specific_range matching_network_ranges.first end def broadest_range matching_network_ranges.last end def network_intelligence most_specific_range&.dig(:intelligence) || {} end def company network_intelligence[:company] end def asn network_intelligence[:asn] end def asn_org network_intelligence[:asn_org] end def is_datacenter? network_intelligence[:is_datacenter] || false end def is_proxy? network_intelligence[:is_proxy] || false end def is_vpn? network_intelligence[:is_vpn] || false end # IP validation def valid_ipv4? return false unless ip_address.present? IPAddr.new(ip_address).ipv4? rescue IPAddr::InvalidAddressError false end def valid_ipv6? return false unless ip_address.present? IPAddr.new(ip_address).ipv6? rescue IPAddr::InvalidAddressError false end def valid_ip? valid_ipv4? || valid_ipv6? end # Rules affecting this IP def matching_rules return Rule.none unless ip_address.present? # Get all network ranges that contain this IP range_ids = matching_network_ranges.map { |r| r[:range].id } # Find rules for those ranges, ordered by priority (most specific first) Rule.network_rules .where(network_range_id: range_ids) .enabled .includes(:network_range) .order('masklen(network_ranges.network) DESC') end def active_blocking_rules matching_rules.where(action: 'deny') end def has_blocking_rules? active_blocking_rules.exists? end # GeoIP enrichment methods (now uses network range data when available) def enrich_geo_location! return if ip_address.blank? return if country_code.present? # Already has geo data # First try to get from network range network_info = network_intelligence if network_info[:country].present? update!(country_code: network_info[:country]) return end # Fallback to direct lookup country = GeoIpService.lookup_country(ip_address) update!(country_code: country) if country.present? rescue => e Rails.logger.error "Failed to enrich geo location for event #{id}: #{e.message}" end # Class method to enrich multiple events def self.enrich_geo_location_batch(events = nil) events ||= where(country_code: [nil, '']).where.not(ip_address: [nil, '']) updated_count = 0 events.find_each do |event| next if event.country_code.present? # Try network range first network_info = event.network_intelligence if network_info[:country].present? event.update!(country_code: network_info[:country]) updated_count += 1 next end # Fallback to direct lookup country = GeoIpService.lookup_country(event.ip_address) if country.present? event.update!(country_code: country) updated_count += 1 end end updated_count end # Lookup country code for this event's IP def lookup_country return country_code if country_code.present? return nil if ip_address.blank? # First try network range network_info = network_intelligence return network_info[:country] if network_info[:country].present? # Fallback to direct lookup GeoIpService.lookup_country(ip_address) rescue => e Rails.logger.error "GeoIP lookup failed for #{ip_address}: #{e.message}" nil end # Check if event has valid geo location data def has_geo_data? country_code.present? || city.present? || network_intelligence[:country].present? end # Get full geo location details def geo_location network_info = network_intelligence { country_code: country_code || network_info[:country], city: city, ip_address: ip_address, has_data: has_geo_data?, network_intelligence: network_info } end private def should_normalize? request_host_id.nil? || request_segment_ids.blank? end def normalize_event_fields EventNormalizer.normalize_event!(self) rescue => e Rails.logger.error "Failed to normalize event #{id}: #{e.message}" end def extract_fields_from_payload return unless payload.present? # Extract WAF-specific fields for direct querying request_data = payload.dig("request") || {} response_data = payload.dig("response") || {} self.ip_address = request_data["ip"] # Extract user agent with header name standardization headers = request_data["headers"] || {} normalized_headers = normalize_headers(headers) self.user_agent = normalized_headers["user-agent"] || normalized_headers["User-Agent"] self.request_path = request_data["path"] self.request_url = request_data["url"] self.response_status = response_data["status_code"] self.response_time_ms = response_data["duration_ms"] self.rule_matched = payload["rule_matched"] self.blocked_reason = payload["blocked_reason"] # Store original values for normalization only if they don't exist yet # This prevents overwriting during multiple callback runs @raw_request_method ||= request_data["method"] @raw_request_protocol ||= request_data["protocol"] @raw_action ||= payload["waf_action"] # Extract server/environment info self.server_name = payload["server_name"] self.environment = payload["environment"] # Extract geographic data geo_data = payload.dig("geo") || {} self.country_code = geo_data["country_code"] self.city = geo_data["city"] # Extract agent info agent_data = payload.dig("agent") || {} self.agent_version = agent_data["version"] self.agent_name = agent_data["name"] end end