First commit!
This commit is contained in:
287
app/models/event.rb
Normal file
287
app/models/event.rb
Normal file
@@ -0,0 +1,287 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class Event < ApplicationRecord
|
||||
belongs_to :project
|
||||
|
||||
# Normalized association for hosts (most valuable compression)
|
||||
belongs_to :request_host, optional: true
|
||||
|
||||
# Enums for fixed value sets
|
||||
enum :waf_action, {
|
||||
allow: 0, # allow/pass
|
||||
deny: 1, # deny/block
|
||||
redirect: 2, # redirect
|
||||
challenge: 3 # challenge (future implementation)
|
||||
}, default: :allow, scopes: false
|
||||
|
||||
enum :request_method, {
|
||||
get: 0, # GET
|
||||
post: 1, # POST
|
||||
put: 2, # PUT
|
||||
patch: 3, # PATCH
|
||||
delete: 4, # DELETE
|
||||
head: 5, # HEAD
|
||||
options: 6 # OPTIONS
|
||||
}, default: :get, scopes: false
|
||||
|
||||
# Serialize segment IDs as array for easy manipulation in Railssqit
|
||||
serialize :request_segment_ids, type: Array, coder: JSON
|
||||
|
||||
validates :event_id, presence: true, uniqueness: true
|
||||
validates :timestamp, presence: true
|
||||
|
||||
scope :recent, -> { order(timestamp: :desc) }
|
||||
scope :by_ip, ->(ip) { where(ip_address: ip) }
|
||||
scope :by_user_agent, ->(user_agent) { where(user_agent: user_agent) }
|
||||
scope :by_waf_action, ->(waf_action) { where(waf_action: waf_action) }
|
||||
scope :blocked, -> { where(waf_action: ['block', 'deny']) }
|
||||
scope :allowed, -> { where(waf_action: ['allow', 'pass']) }
|
||||
scope :rate_limited, -> { where(waf_action: 'rate_limit') }
|
||||
|
||||
# Path prefix matching using range queries (uses B-tree index efficiently)
|
||||
scope :with_path_prefix, ->(prefix_segment_ids) {
|
||||
return none if prefix_segment_ids.blank?
|
||||
|
||||
# Use range queries instead of LIKE for index usage
|
||||
# Example: [1,2] prefix matches [1,2], [1,2,3], [1,2,3,4], etc.
|
||||
prefix_str = prefix_segment_ids.to_json # "[1,2]"
|
||||
|
||||
# For exact match: request_segment_ids = "[1,2]"
|
||||
# For prefix match: "[1,2," <= request_segment_ids < "[1,3,"
|
||||
# This works because JSON arrays sort lexicographically
|
||||
|
||||
# Build the range upper bound by incrementing last segment ID
|
||||
upper_prefix = prefix_segment_ids[0..-2] + [prefix_segment_ids.last + 1]
|
||||
upper_str = upper_prefix.to_json
|
||||
lower_prefix_str = "#{prefix_str[0..-2]}," # "[1,2," - matches longer paths
|
||||
|
||||
# Use raw SQL to bypass serializer (it expects Array but we're comparing strings)
|
||||
where("request_segment_ids = ? OR (request_segment_ids >= ? AND request_segment_ids < ?)",
|
||||
prefix_str, lower_prefix_str, upper_str)
|
||||
}
|
||||
|
||||
# Path depth queries
|
||||
scope :path_depth, ->(depth) {
|
||||
where("json_array_length(request_segment_ids) = ?", depth)
|
||||
}
|
||||
|
||||
scope :path_depth_greater_than, ->(depth) {
|
||||
where("json_array_length(request_segment_ids) > ?", depth)
|
||||
}
|
||||
|
||||
# Helper methods
|
||||
def path_depth
|
||||
request_segment_ids&.length || 0
|
||||
end
|
||||
|
||||
def reconstructed_path
|
||||
return request_path if request_segment_ids.blank?
|
||||
|
||||
segments = PathSegment.where(id: request_segment_ids).index_by(&:id)
|
||||
'/' + request_segment_ids.map { |id| segments[id]&.segment }.compact.join('/')
|
||||
end
|
||||
|
||||
# Extract key fields from payload before saving
|
||||
before_validation :extract_fields_from_payload
|
||||
|
||||
# Normalize event fields after extraction
|
||||
after_validation :normalize_event_fields, if: :should_normalize?
|
||||
|
||||
def self.create_from_waf_payload!(event_id, payload, project)
|
||||
# Create the WAF request event
|
||||
create!(
|
||||
project: project,
|
||||
event_id: event_id,
|
||||
timestamp: parse_timestamp(payload["timestamp"]),
|
||||
payload: payload,
|
||||
|
||||
# WAF-specific fields
|
||||
ip_address: payload.dig("request", "ip"),
|
||||
user_agent: payload.dig("request", "headers", "User-Agent"),
|
||||
request_method: payload.dig("request", "method")&.downcase,
|
||||
request_path: payload.dig("request", "path"),
|
||||
request_url: payload.dig("request", "url"),
|
||||
request_protocol: payload.dig("request", "protocol"),
|
||||
response_status: payload.dig("response", "status_code"),
|
||||
response_time_ms: payload.dig("response", "duration_ms"),
|
||||
waf_action: normalize_action(payload["waf_action"]), # Normalize incoming action values
|
||||
rule_matched: payload["rule_matched"],
|
||||
blocked_reason: payload["blocked_reason"],
|
||||
|
||||
# Server/Environment info
|
||||
server_name: payload["server_name"],
|
||||
environment: payload["environment"],
|
||||
|
||||
# Geographic data
|
||||
country_code: payload.dig("geo", "country_code"),
|
||||
city: payload.dig("geo", "city"),
|
||||
|
||||
# WAF agent info
|
||||
agent_version: payload.dig("agent", "version"),
|
||||
agent_name: payload.dig("agent", "name")
|
||||
)
|
||||
end
|
||||
|
||||
def self.normalize_action(action)
|
||||
return "allow" if action.nil? || action.blank?
|
||||
|
||||
case action.to_s.downcase
|
||||
when "allow", "pass", "allowed"
|
||||
"allow"
|
||||
when "deny", "block", "blocked", "deny_access"
|
||||
"deny"
|
||||
when "challenge"
|
||||
"challenge"
|
||||
when "redirect"
|
||||
"redirect"
|
||||
else
|
||||
Rails.logger.warn "Unknown action '#{action}', defaulting to 'allow'"
|
||||
"allow"
|
||||
end
|
||||
end
|
||||
|
||||
def self.parse_timestamp(timestamp)
|
||||
case timestamp
|
||||
when String
|
||||
Time.parse(timestamp)
|
||||
when Numeric
|
||||
# Sentry timestamps can be in seconds with decimals
|
||||
Time.at(timestamp)
|
||||
when Time
|
||||
timestamp
|
||||
else
|
||||
Time.current
|
||||
end
|
||||
rescue => e
|
||||
Rails.logger.error "Failed to parse timestamp #{timestamp}: #{e.message}"
|
||||
Time.current
|
||||
end
|
||||
|
||||
def request_details
|
||||
return {} unless payload.present?
|
||||
|
||||
request_data = payload.dig("request") || {}
|
||||
{
|
||||
ip: request_data["ip"],
|
||||
method: request_data["method"],
|
||||
path: request_data["path"],
|
||||
url: request_data["url"],
|
||||
protocol: request_data["protocol"],
|
||||
headers: request_data["headers"] || {},
|
||||
query: request_data["query"] || {},
|
||||
body_size: request_data["body_size"]
|
||||
}
|
||||
end
|
||||
|
||||
def response_details
|
||||
return {} unless payload.present?
|
||||
|
||||
response_data = payload.dig("response") || {}
|
||||
{
|
||||
status_code: response_data["status_code"],
|
||||
duration_ms: response_data["duration_ms"],
|
||||
size: response_data["size"]
|
||||
}
|
||||
end
|
||||
|
||||
def geo_details
|
||||
return {} unless payload.present?
|
||||
|
||||
payload.dig("geo") || {}
|
||||
end
|
||||
|
||||
def tags
|
||||
payload&.dig("tags") || {}
|
||||
end
|
||||
|
||||
def headers
|
||||
payload&.dig("request", "headers") || {}
|
||||
end
|
||||
|
||||
def query_params
|
||||
payload&.dig("request", "query") || {}
|
||||
end
|
||||
|
||||
def blocked?
|
||||
waf_action.in?(['block', 'deny'])
|
||||
end
|
||||
|
||||
def allowed?
|
||||
waf_action.in?(['allow', 'pass'])
|
||||
end
|
||||
|
||||
def rate_limited?
|
||||
waf_action == 'rate_limit'
|
||||
end
|
||||
|
||||
def challenged?
|
||||
waf_action == 'challenge'
|
||||
end
|
||||
|
||||
def rule_matched?
|
||||
rule_matched.present?
|
||||
end
|
||||
|
||||
# New path methods for normalization
|
||||
def path_segments
|
||||
return [] unless request_path.present?
|
||||
request_path.split('/').reject(&:blank?)
|
||||
end
|
||||
|
||||
def path_segments_array
|
||||
@path_segments_array ||= path_segments
|
||||
end
|
||||
|
||||
def request_hostname
|
||||
return nil unless request_url.present?
|
||||
URI.parse(request_url).hostname rescue nil
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def should_normalize?
|
||||
request_host_id.nil? || request_segment_ids.blank?
|
||||
end
|
||||
|
||||
def normalize_event_fields
|
||||
EventNormalizer.normalize_event!(self)
|
||||
rescue => e
|
||||
Rails.logger.error "Failed to normalize event #{id}: #{e.message}"
|
||||
end
|
||||
|
||||
def extract_fields_from_payload
|
||||
return unless payload.present?
|
||||
|
||||
# Extract WAF-specific fields for direct querying
|
||||
request_data = payload.dig("request") || {}
|
||||
response_data = payload.dig("response") || {}
|
||||
|
||||
self.ip_address = request_data["ip"]
|
||||
self.user_agent = request_data.dig("headers", "User-Agent")
|
||||
self.request_path = request_data["path"]
|
||||
self.request_url = request_data["url"]
|
||||
self.response_status = response_data["status_code"]
|
||||
self.response_time_ms = response_data["duration_ms"]
|
||||
self.rule_matched = payload["rule_matched"]
|
||||
self.blocked_reason = payload["blocked_reason"]
|
||||
|
||||
# Store original values for normalization (these will be normalized to IDs)
|
||||
@raw_request_method = request_data["method"]
|
||||
@raw_request_protocol = request_data["protocol"]
|
||||
@raw_action = payload["waf_action"]
|
||||
|
||||
# Extract server/environment info
|
||||
self.server_name = payload["server_name"]
|
||||
self.environment = payload["environment"]
|
||||
|
||||
# Extract geographic data
|
||||
geo_data = payload.dig("geo") || {}
|
||||
self.country_code = geo_data["country_code"]
|
||||
self.city = geo_data["city"]
|
||||
|
||||
# Extract agent info
|
||||
agent_data = payload.dig("agent") || {}
|
||||
self.agent_version = agent_data["version"]
|
||||
self.agent_name = agent_data["name"]
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user