Files
baffle-hub/app/models/event.rb
2025-11-09 20:58:13 +11:00

497 lines
14 KiB
Ruby

# frozen_string_literal: true
class Event < ApplicationRecord
# Normalized association for hosts (most valuable compression)
belongs_to :request_host, optional: true
# Enums for fixed value sets
enum :waf_action, {
allow: 0, # allow/pass
deny: 1, # deny/block
redirect: 2, # redirect
challenge: 3 # challenge (future implementation)
}, default: :allow, scopes: false
enum :request_method, {
get: 0, # GET
post: 1, # POST
put: 2, # PUT
patch: 3, # PATCH
delete: 4, # DELETE
head: 5, # HEAD
options: 6 # OPTIONS
}, default: :get, scopes: false
# Serialize segment IDs as array for easy manipulation in Railssqit
serialize :request_segment_ids, type: Array, coder: JSON
validates :event_id, presence: true, uniqueness: true
validates :timestamp, presence: true
scope :recent, -> { order(timestamp: :desc) }
scope :by_ip, ->(ip) { where(ip_address: ip) }
scope :by_user_agent, ->(user_agent) { where(user_agent: user_agent) }
scope :by_waf_action, ->(waf_action) { where(waf_action: waf_action) }
scope :blocked, -> { where(waf_action: ['block', 'deny']) }
scope :allowed, -> { where(waf_action: ['allow', 'pass']) }
scope :rate_limited, -> { where(waf_action: 'rate_limit') }
# Path prefix matching using range queries (uses B-tree index efficiently)
scope :with_path_prefix, ->(prefix_segment_ids) {
return none if prefix_segment_ids.blank?
# Use range queries instead of LIKE for index usage
# Example: [1,2] prefix matches [1,2], [1,2,3], [1,2,3,4], etc.
prefix_str = prefix_segment_ids.to_json # "[1,2]"
# For exact match: request_segment_ids = "[1,2]"
# For prefix match: "[1,2," <= request_segment_ids < "[1,3,"
# This works because JSON arrays sort lexicographically
# Build the range upper bound by incrementing last segment ID
upper_prefix = prefix_segment_ids[0..-2] + [prefix_segment_ids.last + 1]
upper_str = upper_prefix.to_json
lower_prefix_str = "#{prefix_str[0..-2]}," # "[1,2," - matches longer paths
# Use raw SQL to bypass serializer (it expects Array but we're comparing strings)
where("request_segment_ids = ? OR (request_segment_ids >= ? AND request_segment_ids < ?)",
prefix_str, lower_prefix_str, upper_str)
}
# Path depth queries
scope :path_depth, ->(depth) {
where("json_array_length(request_segment_ids) = ?", depth)
}
scope :path_depth_greater_than, ->(depth) {
where("json_array_length(request_segment_ids) > ?", depth)
}
# Helper methods
def path_depth
request_segment_ids&.length || 0
end
def reconstructed_path
return request_path if request_segment_ids.blank?
segments = PathSegment.where(id: request_segment_ids).index_by(&:id)
'/' + request_segment_ids.map { |id| segments[id]&.segment }.compact.join('/')
end
# Extract key fields from payload before saving
before_validation :extract_fields_from_payload
# Normalize event fields after extraction
after_validation :normalize_event_fields, if: :should_normalize?
def self.create_from_waf_payload!(event_id, payload)
# Normalize headers in payload during import phase
normalized_payload = normalize_payload_headers(payload)
# Create the WAF request event
create!(
event_id: event_id,
timestamp: parse_timestamp(normalized_payload["timestamp"]),
payload: normalized_payload,
# WAF-specific fields
ip_address: normalized_payload.dig("request", "ip"),
user_agent: normalized_payload.dig("request", "headers", "user-agent") || normalized_payload.dig("request", "headers", "User-Agent"),
# request_method will be set by extract_fields_from_payload + normalize_event_fields
request_path: normalized_payload.dig("request", "path"),
request_url: normalized_payload.dig("request", "url"),
# request_protocol will be set by extract_fields_from_payload + normalize_event_fields
response_status: normalized_payload.dig("response", "status_code"),
response_time_ms: normalized_payload.dig("response", "duration_ms"),
waf_action: normalize_action(normalized_payload["waf_action"]), # Normalize incoming action values
rule_matched: normalized_payload["rule_matched"],
blocked_reason: normalized_payload["blocked_reason"],
# Server/Environment info
server_name: normalized_payload["server_name"],
environment: normalized_payload["environment"],
# Geographic data
country_code: normalized_payload.dig("geo", "country_code"),
city: normalized_payload.dig("geo", "city"),
# WAF agent info
agent_version: normalized_payload.dig("agent", "version"),
agent_name: normalized_payload.dig("agent", "name")
)
end
# Normalize headers in payload to lower case during import phase
def self.normalize_payload_headers(payload)
return payload unless payload.is_a?(Hash)
# Create a deep copy to avoid modifying the original
normalized = Marshal.load(Marshal.dump(payload))
# Normalize request headers
if normalized.dig("request", "headers")&.is_a?(Hash)
normalized["request"]["headers"] = normalized["request"]["headers"].transform_keys(&:downcase)
end
# Normalize response headers if they exist
if normalized.dig("response", "headers")&.is_a?(Hash)
normalized["response"]["headers"] = normalized["response"]["headers"].transform_keys(&:downcase)
end
normalized
end
def self.normalize_action(action)
return "allow" if action.nil? || action.blank?
case action.to_s.downcase
when "allow", "pass", "allowed"
"allow"
when "deny", "block", "blocked", "deny_access"
"deny"
when "challenge"
"challenge"
when "redirect"
"redirect"
else
Rails.logger.warn "Unknown action '#{action}', defaulting to 'allow'"
"allow"
end
end
def self.parse_timestamp(timestamp)
case timestamp
when String
Time.parse(timestamp)
when Numeric
# Sentry timestamps can be in seconds with decimals
Time.at(timestamp)
when Time
timestamp
else
Time.current
end
rescue => e
Rails.logger.error "Failed to parse timestamp #{timestamp}: #{e.message}"
Time.current
end
def request_details
return {} unless payload.present?
request_data = payload.dig("request") || {}
{
ip: request_data["ip"],
method: request_data["method"],
path: request_data["path"],
url: request_data["url"],
protocol: request_data["protocol"],
headers: request_data["headers"] || {},
query: request_data["query"] || {},
body_size: request_data["body_size"]
}
end
def response_details
return {} unless payload.present?
response_data = payload.dig("response") || {}
{
status_code: response_data["status_code"],
duration_ms: response_data["duration_ms"],
size: response_data["size"]
}
end
def geo_details
return {} unless payload.present?
payload.dig("geo") || {}
end
def tags
payload&.dig("tags") || {}
end
def headers
raw_headers = payload&.dig("request", "headers") || {}
normalize_headers(raw_headers)
end
def query_params
payload&.dig("request", "query") || {}
end
def blocked?
waf_action.in?(['block', 'deny'])
end
def allowed?
waf_action.in?(['allow', 'pass'])
end
def rate_limited?
waf_action == 'rate_limit'
end
def challenged?
waf_action == 'challenge'
end
def rule_matched?
rule_matched.present?
end
# New path methods for normalization
def path_segments
return [] unless request_path.present?
request_path.split('/').reject(&:blank?)
end
def path_segments_array
@path_segments_array ||= path_segments
end
def request_hostname
return nil unless request_url.present?
URI.parse(request_url).hostname rescue nil
end
# Normalize headers to lower case keys during import phase
def normalize_headers(headers)
return {} unless headers.is_a?(Hash)
headers.transform_keys(&:downcase)
end
# Network range resolution methods
def matching_network_ranges
return [] unless ip_address.present?
NetworkRange.contains_ip(ip_address).map do |range|
{
range: range,
cidr: range.cidr,
prefix_length: range.prefix_length,
specificity: range.prefix_length,
intelligence: range.inherited_intelligence
}
end.sort_by { |r| -r[:specificity] } # Most specific first
end
def most_specific_range
matching_network_ranges.first
end
def broadest_range
matching_network_ranges.last
end
def network_intelligence
most_specific_range&.dig(:intelligence) || {}
end
def company
network_intelligence[:company]
end
def asn
network_intelligence[:asn]
end
def asn_org
network_intelligence[:asn_org]
end
def is_datacenter?
network_intelligence[:is_datacenter] || false
end
def is_proxy?
network_intelligence[:is_proxy] || false
end
def is_vpn?
network_intelligence[:is_vpn] || false
end
# IP validation
def valid_ipv4?
return false unless ip_address.present?
IPAddr.new(ip_address).ipv4?
rescue IPAddr::InvalidAddressError
false
end
def valid_ipv6?
return false unless ip_address.present?
IPAddr.new(ip_address).ipv6?
rescue IPAddr::InvalidAddressError
false
end
def valid_ip?
valid_ipv4? || valid_ipv6?
end
# Rules affecting this IP
def matching_rules
return Rule.none unless ip_address.present?
# Get all network ranges that contain this IP
range_ids = matching_network_ranges.map { |r| r[:range].id }
# Find rules for those ranges, ordered by priority (most specific first)
Rule.network_rules
.where(network_range_id: range_ids)
.enabled
.includes(:network_range)
.order('masklen(network_ranges.network) DESC')
end
def active_blocking_rules
matching_rules.where(action: 'deny')
end
def has_blocking_rules?
active_blocking_rules.exists?
end
# GeoIP enrichment methods (now uses network range data when available)
def enrich_geo_location!
return if ip_address.blank?
return if country_code.present? # Already has geo data
# First try to get from network range
network_info = network_intelligence
if network_info[:country].present?
update!(country_code: network_info[:country])
return
end
# Fallback to direct lookup
country = GeoIpService.lookup_country(ip_address)
update!(country_code: country) if country.present?
rescue => e
Rails.logger.error "Failed to enrich geo location for event #{id}: #{e.message}"
end
# Class method to enrich multiple events
def self.enrich_geo_location_batch(events = nil)
events ||= where(country_code: [nil, '']).where.not(ip_address: [nil, ''])
updated_count = 0
events.find_each do |event|
next if event.country_code.present?
# Try network range first
network_info = event.network_intelligence
if network_info[:country].present?
event.update!(country_code: network_info[:country])
updated_count += 1
next
end
# Fallback to direct lookup
country = GeoIpService.lookup_country(event.ip_address)
if country.present?
event.update!(country_code: country)
updated_count += 1
end
end
updated_count
end
# Lookup country code for this event's IP
def lookup_country
return country_code if country_code.present?
return nil if ip_address.blank?
# First try network range
network_info = network_intelligence
return network_info[:country] if network_info[:country].present?
# Fallback to direct lookup
GeoIpService.lookup_country(ip_address)
rescue => e
Rails.logger.error "GeoIP lookup failed for #{ip_address}: #{e.message}"
nil
end
# Check if event has valid geo location data
def has_geo_data?
country_code.present? || city.present? || network_intelligence[:country].present?
end
# Get full geo location details
def geo_location
network_info = network_intelligence
{
country_code: country_code || network_info[:country],
city: city,
ip_address: ip_address,
has_data: has_geo_data?,
network_intelligence: network_info
}
end
private
def should_normalize?
request_host_id.nil? || request_segment_ids.blank?
end
def normalize_event_fields
EventNormalizer.normalize_event!(self)
rescue => e
Rails.logger.error "Failed to normalize event #{id}: #{e.message}"
end
def extract_fields_from_payload
return unless payload.present?
# Extract WAF-specific fields for direct querying
request_data = payload.dig("request") || {}
response_data = payload.dig("response") || {}
self.ip_address = request_data["ip"]
# Extract user agent with header name standardization
headers = request_data["headers"] || {}
normalized_headers = normalize_headers(headers)
self.user_agent = normalized_headers["user-agent"] || normalized_headers["User-Agent"]
self.request_path = request_data["path"]
self.request_url = request_data["url"]
self.response_status = response_data["status_code"]
self.response_time_ms = response_data["duration_ms"]
self.rule_matched = payload["rule_matched"]
self.blocked_reason = payload["blocked_reason"]
# Store original values for normalization only if they don't exist yet
# This prevents overwriting during multiple callback runs
@raw_request_method ||= request_data["method"]
@raw_request_protocol ||= request_data["protocol"]
@raw_action ||= payload["waf_action"]
# Extract server/environment info
self.server_name = payload["server_name"]
self.environment = payload["environment"]
# Extract geographic data
geo_data = payload.dig("geo") || {}
self.country_code = geo_data["country_code"]
self.city = geo_data["city"]
# Extract agent info
agent_data = payload.dig("agent") || {}
self.agent_version = agent_data["version"]
self.agent_name = agent_data["name"]
end
end