diff --git a/app/controllers/events_controller.rb b/app/controllers/events_controller.rb index 012bce0..08ea8eb 100644 --- a/app/controllers/events_controller.rb +++ b/app/controllers/events_controller.rb @@ -20,37 +20,71 @@ class EventsController < ApplicationController end def index - @events = Event.includes(:network_range, :rule).order(timestamp: :desc) - Rails.logger.debug "Found #{@events.count} total events" - Rails.logger.debug "Action: #{params[:waf_action]}" + # Build filters hash from params + filters = {} + filters[:ip] = params[:ip] if params[:ip].present? + filters[:waf_action] = params[:waf_action] if params[:waf_action].present? + filters[:country] = params[:country] if params[:country].present? + filters[:rule_id] = params[:rule_id] if params[:rule_id].present? + filters[:company] = params[:company] if params[:company].present? + filters[:network_type] = params[:network_type] if params[:network_type].present? + filters[:asn] = params[:asn] if params[:asn].present? + filters[:exclude_bots] = params[:exclude_bots] if params[:exclude_bots] == "true" - # Apply filters - @events = @events.by_ip(params[:ip]) if params[:ip].present? - @events = @events.by_waf_action(params[:waf_action]) if params[:waf_action].present? - @events = @events.by_country(params[:country]) if params[:country].present? - @events = @events.where(rule_id: params[:rule_id]) if params[:rule_id].present? + # Handle network_cidr filter (requires NetworkRange lookup) + if params[:network_cidr].present? + range = NetworkRange.find_by(network: params[:network_cidr]) + filters[:network_range_id] = range.id if range + end - # Network-based filters (now using denormalized columns) - @events = @events.by_company(params[:company]) if params[:company].present? - @events = @events.by_network_type(params[:network_type]) if params[:network_type].present? - @events = @events.by_asn(params[:asn]) if params[:asn].present? - @events = @events.by_network_cidr(params[:network_cidr]) if params[:network_cidr].present? + # Try DuckDB first, fallback to PostgreSQL if unavailable + result = EventDdb.search(filters, page: params[:page]&.to_i || 1, per_page: 50) - # Bot filtering - @events = @events.exclude_bots if params[:exclude_bots] == "true" + if result + # DuckDB query succeeded + @pagy = Pagy.new(count: result[:total_count], page: result[:page], items: result[:per_page]) + @events = result[:events] - Rails.logger.debug "Events count after filtering: #{@events.count}" + # Load network_range associations for events that have network_range_id + network_range_ids = @events.map(&:network_range_id).compact.uniq + if network_range_ids.any? + network_ranges = NetworkRange.where(id: network_range_ids).index_by(&:id) + @events.each do |event| + event.network_range = network_ranges[event.network_range_id] if event.network_range_id + end + end - # Debug info - Rails.logger.debug "Events count before pagination: #{@events.count}" + # Load rule associations if needed + rule_ids = @events.map(&:rule_id).compact.uniq + if rule_ids.any? + rules = Rule.where(id: rule_ids).index_by(&:id) + @events.each do |event| + event.rule = rules[event.rule_id] if event.rule_id + end + end - # Paginate - @pagy, @events = pagy(@events, items: 50) + Rails.logger.debug "[DuckDB] Found #{result[:total_count]} total events, showing page #{result[:page]}" + else + # Fallback to PostgreSQL + Rails.logger.warn "[EventsController] DuckDB unavailable, falling back to PostgreSQL" - # Network ranges are now preloaded via includes(:network_range) - # The denormalized network_range_id makes this much faster than IP containment lookups + @events = Event.includes(:network_range, :rule).order(timestamp: :desc) - Rails.logger.debug "Events count after pagination: #{@events.count}" - Rails.logger.debug "Pagy info: #{@pagy.count} total, #{@pagy.pages} pages" + # Apply filters using ActiveRecord scopes + @events = @events.by_ip(params[:ip]) if params[:ip].present? + @events = @events.by_waf_action(params[:waf_action]) if params[:waf_action].present? + @events = @events.by_country(params[:country]) if params[:country].present? + @events = @events.where(rule_id: params[:rule_id]) if params[:rule_id].present? + @events = @events.by_company(params[:company]) if params[:company].present? + @events = @events.by_network_type(params[:network_type]) if params[:network_type].present? + @events = @events.by_asn(params[:asn]) if params[:asn].present? + @events = @events.by_network_cidr(params[:network_cidr]) if params[:network_cidr].present? + @events = @events.exclude_bots if params[:exclude_bots] == "true" + + # Paginate + @pagy, @events = pagy(@events, items: 50) + + Rails.logger.debug "[PostgreSQL] Events count: #{@pagy.count} total, #{@pagy.pages} pages" + end end end \ No newline at end of file diff --git a/app/models/event_ddb.rb b/app/models/event_ddb.rb index 3758159..caf8864 100644 --- a/app/models/event_ddb.rb +++ b/app/models/event_ddb.rb @@ -6,6 +6,25 @@ require 'ostruct' # Provides an ActiveRecord-like interface for querying DuckDB events table # Falls back to PostgreSQL Event model if DuckDB is unavailable class EventDdb + # Enum mappings from integer to string (matching Event model) + ACTION_MAP = { + 0 => "deny", + 1 => "allow", + 2 => "redirect", + 3 => "challenge", + 4 => "log" + }.freeze + + METHOD_MAP = { + 0 => "get", + 1 => "post", + 2 => "put", + 3 => "patch", + 4 => "delete", + 5 => "head", + 6 => "options" + }.freeze + class << self # Get DuckDB service def service @@ -624,5 +643,151 @@ class EventDdb Rails.logger.error "[EventDdb] Error in bot_traffic_timeline: #{e.message}" nil end + + # Search events with filters and pagination + # Returns { total_count:, events:[], page:, per_page: } + # Supports filters: ip, waf_action, country, rule_id, company, asn, network_type, network_range_id, exclude_bots + def search(filters = {}, page: 1, per_page: 50) + service.with_connection do |conn| + # Build WHERE clause + where_clause, params = build_where_clause(filters) + + # Get total count + count_sql = "SELECT COUNT(*) FROM events#{where_clause}" + count_result = conn.query(count_sql, *params) + total_count = count_result.first&.first || 0 + + # Get paginated results + offset = (page - 1) * per_page + + data_sql = <<~SQL + SELECT + id, timestamp, ip_address, network_range_id, country, company, + asn, asn_org, is_datacenter, is_vpn, is_proxy, is_bot, + waf_action, request_method, response_status, rule_id, + request_path, user_agent, tags + FROM events + #{where_clause} + ORDER BY timestamp DESC + LIMIT ? OFFSET ? + SQL + + result = conn.query(data_sql, *params, per_page, offset) + + # Convert rows to event-like objects + events = result.to_a.map { |row| row_to_event(row) } + + { + total_count: total_count, + events: events, + page: page, + per_page: per_page + } + end + rescue StandardError => e + Rails.logger.error "[EventDdb] Error in search: #{e.message}" + Rails.logger.error e.backtrace.join("\n") + nil + end + + private + + # Build WHERE clause and params from filters hash + # Returns [where_clause_string, params_array] + def build_where_clause(filters) + conditions = [] + params = [] + + if filters[:ip].present? + conditions << "ip_address = ?" + params << filters[:ip] + end + + if filters[:waf_action].present? + # Convert string action to integer + action_int = ACTION_MAP.key(filters[:waf_action].to_s) + if action_int + conditions << "waf_action = ?" + params << action_int + end + end + + if filters[:country].present? + conditions << "country = ?" + params << filters[:country] + end + + if filters[:rule_id].present? + conditions << "rule_id = ?" + params << filters[:rule_id].to_i + end + + if filters[:company].present? + conditions << "company ILIKE ?" + params << "%#{filters[:company]}%" + end + + if filters[:asn].present? + conditions << "asn = ?" + params << filters[:asn].to_i + end + + if filters[:network_range_id].present? + conditions << "network_range_id = ?" + params << filters[:network_range_id].to_i + end + + # Network type filter + if filters[:network_type].present? + case filters[:network_type].to_s.downcase + when "datacenter" + conditions << "is_datacenter = true" + when "vpn" + conditions << "is_vpn = true" + when "proxy" + conditions << "is_proxy = true" + when "standard" + conditions << "(is_datacenter = false AND is_vpn = false AND is_proxy = false)" + end + end + + # Bot filtering + if filters[:exclude_bots] == true || filters[:exclude_bots] == "true" + conditions << "is_bot = false" + end + + where_clause = conditions.any? ? " WHERE #{conditions.join(' AND ')}" : "" + [where_clause, params] + end + + # Convert DuckDB row array to event-like OpenStruct + def row_to_event(row) + OpenStruct.new( + id: row[0], + timestamp: row[1], + ip_address: row[2], + network_range_id: row[3], + country: row[4], + company: row[5], + asn: row[6], + asn_org: row[7], + is_datacenter: row[8], + is_vpn: row[9], + is_proxy: row[10], + is_bot: row[11], + waf_action: ACTION_MAP[row[12]] || "unknown", + request_method: METHOD_MAP[row[13]], + response_status: row[14], + rule_id: row[15], + request_path: row[16], + user_agent: row[17], + tags: row[18] || [], + # Add helper method for country lookup + lookup_country: row[4], + # Network range will be loaded separately in controller + network_range: nil, + rule: nil + ) + end end end diff --git a/app/models/waf_policy.rb b/app/models/waf_policy.rb index 7a6a8ab..0f958e6 100644 --- a/app/models/waf_policy.rb +++ b/app/models/waf_policy.rb @@ -159,16 +159,30 @@ validate :targets_must_be_array return nil end - rule = Rule.create!( - waf_rule_type: 'network', - waf_action: policy_action.to_sym, - network_range: network_range, - waf_policy: self, - user: user, - source: "policy", - metadata: build_rule_metadata(network_range), - priority: network_range.prefix_length - ) + # Try to create the rule, handling duplicates gracefully + begin + rule = Rule.create!( + waf_rule_type: 'network', + waf_action: policy_action.to_sym, + network_range: network_range, + waf_policy: self, + user: user, + source: "policy", + metadata: build_rule_metadata(network_range), + priority: network_range.prefix_length + ) + rescue ActiveRecord::RecordNotUnique + # Rule already exists (created by another job or earlier in this job) + # Find and return the existing rule + Rails.logger.debug "Rule already exists for #{network_range.cidr} with policy #{name}" + return Rule.find_by( + waf_rule_type: 'network', + waf_action: policy_action, + network_range: network_range, + waf_policy: self, + source: "policy" + ) + end # Handle redirect/challenge specific data if redirect_action? && additional_data['redirect_url'] diff --git a/app/services/analytics_duckdb_service.rb b/app/services/analytics_duckdb_service.rb index f0717a3..3be0b3b 100644 --- a/app/services/analytics_duckdb_service.rb +++ b/app/services/analytics_duckdb_service.rb @@ -35,6 +35,9 @@ class AnalyticsDuckdbService is_proxy BOOLEAN, is_bot BOOLEAN, waf_action INTEGER, + request_method INTEGER, + response_status INTEGER, + rule_id BIGINT, request_path VARCHAR, user_agent VARCHAR, tags VARCHAR[] @@ -122,6 +125,9 @@ class AnalyticsDuckdbService :is_proxy, :is_bot, :waf_action, + :request_method, + :response_status, + :rule_id, :request_path, :user_agent, :tags @@ -144,6 +150,9 @@ class AnalyticsDuckdbService event_data["is_proxy"], event_data["is_bot"], event_data["waf_action"], + event_data["request_method"], + event_data["response_status"], + event_data["rule_id"], event_data["request_path"], event_data["user_agent"], event_data["tags"] || [] diff --git a/script/backfill_duckdb_new_columns.rb b/script/backfill_duckdb_new_columns.rb new file mode 100644 index 0000000..17a781b --- /dev/null +++ b/script/backfill_duckdb_new_columns.rb @@ -0,0 +1,152 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +# One-time backfill script to populate new columns in existing DuckDB events +# This uses DuckDB's JOIN-based UPDATE for maximum performance + +require 'csv' +require 'tempfile' + +puts "DuckDB Column Backfill Script (JOIN-based UPDATE)" +puts "=" * 60 +puts "This will update existing DuckDB events with data from PostgreSQL" +puts "using a fast JOIN-based approach" +puts + +BATCH_SIZE = 50_000 + +AnalyticsDuckdbService.instance.with_connection do |conn| + # Get total events in DuckDB + puts "Step 1: Counting events to backfill..." + result = conn.query("SELECT COUNT(*) FROM events WHERE request_method IS NULL") + total_to_backfill = result.first&.first || 0 + + result = conn.query("SELECT COUNT(*) FROM events") + total_events = result.first&.first || 0 + + puts " Total events in DuckDB: #{total_events}" + puts " Events needing backfill: #{total_to_backfill}" + + if total_to_backfill == 0 + puts "\n✓ All events already have new columns populated!" + exit 0 + end + + # Get min and max event IDs in DuckDB + result = conn.query("SELECT MIN(id), MAX(id) FROM events WHERE request_method IS NULL") + min_id, max_id = result.first + puts " ID range to backfill: #{min_id} to #{max_id}" + + puts "\nStep 2: Exporting PostgreSQL data in batches..." + current_id = min_id + batch_num = 0 + total_updated = 0 + + # Create temporary CSV file for data transfer + temp_csv = Tempfile.new(['events_backfill', '.csv']) + + begin + CSV.open(temp_csv.path, 'w') do |csv| + # Header + csv << ['id', 'request_method', 'response_status', 'rule_id'] + + while current_id <= max_id + batch_num += 1 + batch_end_id = [current_id + BATCH_SIZE - 1, max_id].min + + print " Batch #{batch_num}: Exporting IDs #{current_id}-#{batch_end_id}..." + + # Fetch from PostgreSQL + pg_events = Event.where("id >= ? AND id <= ?", current_id, batch_end_id) + .select(:id, :request_method, :response_status, :rule_id) + + count = 0 + pg_events.find_each do |event| + csv << [ + event.id, + event.request_method, + event.response_status, + event.rule_id + ] + count += 1 + end + + puts " #{count} events" + current_id = batch_end_id + 1 + end + end + + temp_csv.close + + puts "\n✓ Exported to temporary CSV: #{temp_csv.path}" + puts " File size: #{(File.size(temp_csv.path) / 1024.0 / 1024.0).round(2)} MB" + + puts "\nStep 3: Loading CSV into temporary DuckDB table..." + conn.execute("DROP TABLE IF EXISTS events_updates") + conn.execute(<<~SQL) + CREATE TABLE events_updates ( + id BIGINT, + request_method INTEGER, + response_status INTEGER, + rule_id BIGINT + ) + SQL + + conn.execute(<<~SQL) + COPY events_updates FROM '#{temp_csv.path}' (FORMAT CSV, HEADER TRUE, NULL '') + SQL + + result = conn.query("SELECT COUNT(*) FROM events_updates") + loaded_count = result.first&.first || 0 + puts "✓ Loaded #{loaded_count} rows into temporary table" + + puts "\nStep 4: Performing bulk UPDATE via JOIN..." + start_time = Time.current + + # DuckDB's efficient UPDATE...FROM syntax + conn.execute(<<~SQL) + UPDATE events + SET + request_method = events_updates.request_method, + response_status = events_updates.response_status, + rule_id = events_updates.rule_id + FROM events_updates + WHERE events.id = events_updates.id + SQL + + duration = Time.current - start_time + puts "✓ Bulk update complete in #{duration.round(2)}s!" + + puts "\nStep 5: Cleaning up temporary table..." + conn.execute("DROP TABLE events_updates") + puts "✓ Temporary table dropped" + + ensure + # Clean up temp file + temp_csv.unlink if temp_csv + end + + puts "\nStep 6: Verifying backfill..." + result = conn.query("SELECT COUNT(*) FROM events WHERE request_method IS NOT NULL OR response_status IS NOT NULL OR rule_id IS NOT NULL") + filled_count = result.first&.first || 0 + + result = conn.query("SELECT COUNT(*) FROM events WHERE request_method IS NULL AND response_status IS NULL AND rule_id IS NULL") + still_null_count = result.first&.first || 0 + + puts " Events with new columns populated: #{filled_count}" + puts " Events still with NULL columns: #{still_null_count}" + + if still_null_count > 0 + puts "\n⚠ Note: #{still_null_count} events still have NULL values." + puts " This is normal if those events don't exist in PostgreSQL anymore" + puts " (they may have been cleaned up due to retention policy)" + else + puts "\n✓ Backfill complete! All events have new columns populated." + end +end + +puts "\n" + "=" * 60 +puts "Backfill complete!" +puts "\nNext steps:" +puts "1. Test the events index page to verify everything works" +puts "2. Monitor performance improvements from DuckDB queries"