10 Commits

Author SHA1 Message Date
Dan Milne
225d970123 Update duckdb. use more duckdb. Fix the display of stats 2025-12-25 12:03:25 +11:00
Dan Milne
a0ff0edb73 Update duckdb. use more duckdb 2025-12-25 11:59:53 +11:00
Dan Milne
693851f664 Use only parquet files for events 2025-12-03 17:16:38 +11:00
Dan Milne
032243ba6a Smarter backfil 2025-12-02 14:01:45 +11:00
Dan Milne
1aa77066a1 Catch exceptions in the process waf job, use symbols for actions, Don't create a rule which matches a supernet rule. 2025-12-01 21:23:23 +11:00
Dan Milne
f0ad3b2c90 Two bug fixes 2025-12-01 19:54:47 +11:00
Dan Milne
54d9c3a0d9 Move version file, fix oidc, make jobs use envs 2025-12-01 15:59:26 +11:00
Dan Milne
3eddfe9f7e Better version support 2025-11-30 13:31:16 +11:00
Dan Milne
179563022e Drop add_headers - headers can now be added to meta[] to be applied for any action. Consilidate Tagging in a service 2025-11-30 13:18:17 +11:00
Dan Milne
de2eb43e2b More use of tags - drop add_header action -> allow + headers+tags 2025-11-20 11:55:04 +11:00
40 changed files with 2003 additions and 425 deletions

View File

@@ -27,7 +27,7 @@ RUN apt-get update -qq && \
*) \
echo "Unsupported platform: $TARGETPLATFORM" && exit 1 ;; \
esac && \
wget "https://install.duckdb.org/v1.4.2/libduckdb-linux-${DUCKDB_ARCH}.zip" -O /tmp/libduckdb.zip && \
wget "https://github.com/duckdb/duckdb/releases/download/v1.4.3/libduckdb-linux-${DUCKDB_ARCH}.zip" -O /tmp/libduckdb.zip && \
unzip /tmp/libduckdb.zip -d /tmp/duckdb && \
cp /tmp/duckdb/duckdb.h /tmp/duckdb/duckdb.hpp /usr/local/include/ && \
cp /tmp/duckdb/libduckdb.so /usr/local/lib/ && \

View File

@@ -1 +0,0 @@
0.2.0

View File

@@ -7,6 +7,9 @@ class AnalyticsController < ApplicationController
def index
authorize :analytics, :index?
# Track overall request time
request_start = Time.current
# Time period selector (default: last 24 hours)
@time_period = params[:period]&.to_sym || :day
@start_time = calculate_start_time(@time_period)
@@ -24,16 +27,16 @@ class AnalyticsController < ApplicationController
cache_key_base = "analytics/#{@time_period}/#{@start_time.to_i}"
# Core statistics - cached (uses DuckDB if available)
@total_events = Rails.cache.fetch("#{cache_key_base}/total_events", expires_in: cache_ttl) do
with_duckdb_fallback { EventDdb.count_since(@start_time) } ||
Event.where("timestamp >= ?", @start_time).count
end
stat_start = Time.current
@total_events = BaffleDl.count_since(@start_time)
Rails.logger.info "[Analytics Perf] Total events: #{((Time.current - stat_start) * 1000).round(1)}ms"
@total_rules = Rails.cache.fetch("analytics/total_rules", expires_in: 5.minutes) do
Rule.enabled.count
end
@network_ranges_with_events = Rails.cache.fetch("analytics/network_ranges_with_events", expires_in: 5.minutes) do
@network_ranges_with_events = BaffleDl.count_network_ranges_with_events(@start_time) ||
Rails.cache.fetch("analytics/network_ranges_with_events", expires_in: 5.minutes) do
NetworkRange.with_events.count
end
@@ -41,34 +44,26 @@ class AnalyticsController < ApplicationController
NetworkRange.count
end
# Event breakdown by action - cached (uses DuckDB if available)
# Event breakdown by action - use DuckDB directly for performance
stat_start = Time.current
@event_breakdown = Rails.cache.fetch("#{cache_key_base}/event_breakdown", expires_in: cache_ttl) do
with_duckdb_fallback { EventDdb.breakdown_by_action(@start_time) } ||
Event.where("timestamp >= ?", @start_time)
.group(:waf_action)
.count
BaffleDl.breakdown_by_action(@start_time) || {}
end
Rails.logger.info "[Analytics Perf] Event breakdown: #{((Time.current - stat_start) * 1000).round(1)}ms"
# Top countries by event count - cached (uses DuckDB if available)
# Top countries by event count - use DuckDB directly for performance
stat_start = Time.current
@top_countries = Rails.cache.fetch("#{cache_key_base}/top_countries", expires_in: cache_ttl) do
with_duckdb_fallback { EventDdb.top_countries(@start_time, 10) } ||
Event.where("timestamp >= ? AND country IS NOT NULL", @start_time)
.group(:country)
.count
.sort_by { |_, count| -count }
.first(10)
BaffleDl.top_countries(@start_time, 10) || []
end
Rails.logger.info "[Analytics Perf] Top countries: #{((Time.current - stat_start) * 1000).round(1)}ms"
# Top blocked IPs - cached (uses DuckDB if available)
# Top blocked IPs - use DuckDB directly for performance
stat_start = Time.current
@top_blocked_ips = Rails.cache.fetch("#{cache_key_base}/top_blocked_ips", expires_in: cache_ttl) do
with_duckdb_fallback { EventDdb.top_blocked_ips(@start_time, 10) } ||
Event.where("timestamp >= ?", @start_time)
.where(waf_action: 0) # deny action in enum
.group(:ip_address)
.count
.sort_by { |_, count| -count }
.first(10)
BaffleDl.top_blocked_ips(@start_time, 10) || []
end
Rails.logger.info "[Analytics Perf] Top blocked IPs: #{((Time.current - stat_start) * 1000).round(1)}ms"
# Network range intelligence breakdown - cached
@network_intelligence = Rails.cache.fetch("analytics/network_intelligence", expires_in: 10.minutes) do
@@ -105,7 +100,11 @@ class AnalyticsController < ApplicationController
end
# Prepare data for charts - split caching for current vs historical data
stat_start = Time.current
@chart_data = prepare_chart_data_with_split_cache(cache_key_base, cache_ttl)
Rails.logger.info "[Analytics Perf] Chart data: #{((Time.current - stat_start) * 1000).round(1)}ms"
Rails.logger.info "[Analytics Perf] TOTAL REQUEST: #{((Time.current - request_start) * 1000).round(1)}ms"
respond_to do |format|
format.html
@@ -120,8 +119,8 @@ class AnalyticsController < ApplicationController
@time_period = params[:period]&.to_sym || :day
@start_time = calculate_start_time(@time_period)
# Top networks by request volume - use DuckDB if available
network_stats = with_duckdb_fallback { EventDdb.top_networks(@start_time, 50) }
# Top networks by request volume - use DuckLake if available
network_stats = with_duckdb_fallback { BaffleDl.top_networks(@start_time, 50) }
if network_stats
# DuckDB path: array format [network_range_id, event_count, unique_ips]
@@ -185,24 +184,24 @@ class AnalyticsController < ApplicationController
# Network type breakdown with traffic stats
@network_breakdown = calculate_network_type_stats(@start_time)
# Company breakdown for top traffic sources - use DuckDB if available
@top_companies = with_duckdb_fallback { EventDdb.top_companies(@start_time, 20) } ||
# Company breakdown for top traffic sources - use DuckLake if available
@top_companies = with_duckdb_fallback { BaffleDl.top_companies(@start_time, 20) } ||
Event.where("timestamp >= ? AND company IS NOT NULL", @start_time)
.group(:company)
.select("company, COUNT(*) as event_count, COUNT(DISTINCT ip_address) as unique_ips, COUNT(DISTINCT network_range_id) as network_count")
.order("event_count DESC")
.limit(20)
# ASN breakdown - use DuckDB if available
@top_asns = with_duckdb_fallback { EventDdb.top_asns(@start_time, 15) } ||
# ASN breakdown - use DuckLake if available
@top_asns = with_duckdb_fallback { BaffleDl.top_asns(@start_time, 15) } ||
Event.where("timestamp >= ? AND asn IS NOT NULL", @start_time)
.group(:asn, :asn_org)
.select("asn, asn_org, COUNT(*) as event_count, COUNT(DISTINCT ip_address) as unique_ips, COUNT(DISTINCT network_range_id) as network_count")
.order("event_count DESC")
.limit(15)
# Geographic breakdown - use DuckDB if available
@top_countries = with_duckdb_fallback { EventDdb.top_countries_with_stats(@start_time, 15) } ||
# Geographic breakdown - use DuckLake if available
@top_countries = with_duckdb_fallback { BaffleDl.top_countries_with_stats(@start_time, 15) } ||
Event.where("timestamp >= ? AND country IS NOT NULL", @start_time)
.group(:country)
.select("country, COUNT(*) as event_count, COUNT(DISTINCT ip_address) as unique_ips")
@@ -242,19 +241,35 @@ class AnalyticsController < ApplicationController
end
def prepare_chart_data_with_split_cache(cache_key_base, cache_ttl)
# Generate timeline based on selected time period
case @time_period
when :hour
# Show last 60 minutes for hour view
timeline_data = Rails.cache.fetch("#{cache_key_base}/chart_hourly", expires_in: 1.minute) do
# For hour view, show minute-by-minute data for the last hour
(0..59).map do |minutes_ago|
time_point = minutes_ago.minutes.ago
count = Event.where("timestamp >= ? AND timestamp < ?", time_point, time_point + 1.minute).count
{
time_iso: time_point.iso8601,
total: count
}
end.reverse
end
when :day
# Show last 24 hours (existing logic)
# Split timeline into historical (completed hours) and current (incomplete hour)
# Historical hours are cached for full TTL, current hour cached briefly for freshness
# Cache historical hours (1-23 hours ago) - these are complete and won't change
# No expiration - will stick around until evicted by cache store (uses DuckDB if available)
historical_timeline = Rails.cache.fetch("#{cache_key_base}/chart_historical") do
# Use DuckDB directly for performance, no PostgreSQL fallback
historical_timeline = Rails.cache.fetch("#{cache_key_base}/chart_historical", expires_in: 1.hour) do
historical_start = 23.hours.ago.beginning_of_hour
current_hour_start = Time.current.beginning_of_hour
events_by_hour = with_duckdb_fallback { EventDdb.hourly_timeline(historical_start, current_hour_start) } ||
Event.where("timestamp >= ? AND timestamp < ?", historical_start, current_hour_start)
.group("DATE_TRUNC('hour', timestamp)")
.count
# Use DuckDB directly - if it fails, we'll show empty data rather than slow PostgreSQL
events_by_hour = BaffleDl.hourly_timeline(historical_start, current_hour_start) || {}
(1..23).map do |hour_ago|
hour_time = hour_ago.hours.ago.beginning_of_hour
@@ -263,7 +278,7 @@ class AnalyticsController < ApplicationController
time_iso: hour_time.iso8601,
total: events_by_hour[hour_key] || 0
}
end
end.reverse
end
# Current hour (0 hours ago) - cache very briefly since it's actively accumulating
@@ -280,6 +295,30 @@ class AnalyticsController < ApplicationController
# Combine current + historical for full 24-hour timeline
timeline_data = [current_hour_data] + historical_timeline
when :week, :month
# Show daily data for week/month views
days_to_show = @time_period == :week ? 7 : 30
timeline_data = Rails.cache.fetch("#{cache_key_base}/chart_daily_#{days_to_show}", expires_in: cache_ttl) do
historical_start = days_to_show.days.ago.beginning_of_day
current_day_end = Time.current.end_of_day
# Use DuckDB for all data including current day (max 1 minute delay)
daily_events = BaffleDl.daily_timeline(historical_start, current_day_end) || {}
(0..days_to_show-1).map do |days_ago|
day_time = days_ago.days.ago.beginning_of_day
{
time_iso: day_time.iso8601,
total: daily_events[day_time] || 0
}
end
end
else
# Default to 24 hours
timeline_data = []
end
# Action distribution and other chart data (cached with main cache)
other_chart_data = Rails.cache.fetch("#{cache_key_base}/chart_metadata", expires_in: cache_ttl) do
action_distribution = @event_breakdown.map do |action, count|
@@ -324,7 +363,7 @@ class AnalyticsController < ApplicationController
time_iso: hour_time.iso8601,
total: events_by_hour[hour_key] || 0
}
end
end.reverse
# Action distribution for pie chart
action_distribution = @event_breakdown.map do |action, count|
@@ -349,8 +388,8 @@ class AnalyticsController < ApplicationController
end
def calculate_network_type_stats(start_time)
# Try DuckDB first, fallback to PostgreSQL
duckdb_stats = with_duckdb_fallback { EventDdb.network_type_stats(start_time) }
# Try DuckLake first, fallback to PostgreSQL
duckdb_stats = with_duckdb_fallback { BaffleDl.network_type_stats(start_time) }
return duckdb_stats if duckdb_stats
@@ -398,8 +437,8 @@ class AnalyticsController < ApplicationController
end
def calculate_suspicious_patterns(start_time)
# Try DuckDB first, fallback to PostgreSQL
duckdb_patterns = with_duckdb_fallback { EventDdb.suspicious_patterns(start_time) }
# Try DuckLake first, fallback to PostgreSQL
duckdb_patterns = with_duckdb_fallback { BaffleDl.suspicious_patterns(start_time) }
return duckdb_patterns if duckdb_patterns

View File

@@ -20,34 +20,71 @@ class EventsController < ApplicationController
end
def index
@events = Event.includes(:network_range, :rule).order(timestamp: :desc)
Rails.logger.debug "Found #{@events.count} total events"
Rails.logger.debug "Action: #{params[:waf_action]}"
# Build filters hash from params
filters = {}
filters[:ip] = params[:ip] if params[:ip].present?
filters[:waf_action] = params[:waf_action] if params[:waf_action].present?
filters[:country] = params[:country] if params[:country].present?
filters[:rule_id] = params[:rule_id] if params[:rule_id].present?
filters[:company] = params[:company] if params[:company].present?
filters[:network_type] = params[:network_type] if params[:network_type].present?
filters[:asn] = params[:asn] if params[:asn].present?
filters[:exclude_bots] = params[:exclude_bots] if params[:exclude_bots] == "true"
# Apply filters
# Handle network_cidr filter (requires NetworkRange lookup)
if params[:network_cidr].present?
range = NetworkRange.find_by(network: params[:network_cidr])
filters[:network_range_id] = range.id if range
end
# Try DuckLake first, fallback to PostgreSQL if unavailable
result = BaffleDl.search(filters, page: params[:page]&.to_i || 1, per_page: 50)
if result
# DuckDB query succeeded
@pagy = Pagy.new(count: result[:total_count], page: result[:page], items: result[:per_page])
@events = result[:events]
# Load network_range associations for events that have network_range_id
network_range_ids = @events.map(&:network_range_id).compact.uniq
if network_range_ids.any?
network_ranges = NetworkRange.where(id: network_range_ids).index_by(&:id)
@events.each do |event|
event.network_range = network_ranges[event.network_range_id] if event.network_range_id
end
end
# Load rule associations if needed
rule_ids = @events.map(&:rule_id).compact.uniq
if rule_ids.any?
rules = Rule.where(id: rule_ids).index_by(&:id)
@events.each do |event|
event.rule = rules[event.rule_id] if event.rule_id
end
end
Rails.logger.debug "[DuckDB] Found #{result[:total_count]} total events, showing page #{result[:page]}"
else
# Fallback to PostgreSQL
Rails.logger.warn "[EventsController] DuckDB unavailable, falling back to PostgreSQL"
@events = Event.includes(:network_range, :rule).order(timestamp: :desc)
# Apply filters using ActiveRecord scopes
@events = @events.by_ip(params[:ip]) if params[:ip].present?
@events = @events.by_waf_action(params[:waf_action]) if params[:waf_action].present?
@events = @events.by_country(params[:country]) if params[:country].present?
@events = @events.where(rule_id: params[:rule_id]) if params[:rule_id].present?
# Network-based filters (now using denormalized columns)
@events = @events.by_company(params[:company]) if params[:company].present?
@events = @events.by_network_type(params[:network_type]) if params[:network_type].present?
@events = @events.by_asn(params[:asn]) if params[:asn].present?
@events = @events.by_network_cidr(params[:network_cidr]) if params[:network_cidr].present?
Rails.logger.debug "Events count after filtering: #{@events.count}"
# Debug info
Rails.logger.debug "Events count before pagination: #{@events.count}"
@events = @events.exclude_bots if params[:exclude_bots] == "true"
# Paginate
@pagy, @events = pagy(@events, items: 50)
# Network ranges are now preloaded via includes(:network_range)
# The denormalized network_range_id makes this much faster than IP containment lookups
Rails.logger.debug "Events count after pagination: #{@events.count}"
Rails.logger.debug "Pagy info: #{@pagy.count} total, #{@pagy.pages} pages"
Rails.logger.debug "[PostgreSQL] Events count: #{@pagy.count} total, #{@pagy.pages} pages"
end
end
end

View File

@@ -256,16 +256,16 @@ class NetworkRangesController < ApplicationController
def calculate_traffic_stats(network_range)
if network_range.persisted?
# Real network - use cached events_count for total requests (much more performant)
if network_range.events_count > 0
# Real network - check if network has events using DuckDB for performance
if network_range.has_events?
# Use indexed network_range_id for much better performance instead of expensive CIDR operator
# Include child network ranges to capture all traffic within this network block
network_ids = [network_range.id] + network_range.child_ranges.pluck(:id)
# Try DuckDB first for stats (much faster)
duckdb_stats = with_duckdb_fallback { EventDdb.network_traffic_stats(network_ids) }
duckdb_top_paths = with_duckdb_fallback { EventDdb.network_top_paths(network_ids, 10) }
duckdb_top_agents = with_duckdb_fallback { EventDdb.network_top_user_agents(network_ids, 5) }
# Try DuckLake first for stats (much faster)
duckdb_stats = with_duckdb_fallback { BaffleDl.network_traffic_stats(network_ids) }
duckdb_top_paths = with_duckdb_fallback { BaffleDl.network_top_paths(network_ids, 10) }
duckdb_top_agents = with_duckdb_fallback { BaffleDl.network_top_user_agents(network_ids, 5) }
if duckdb_stats
# DuckDB success - use fast aggregated stats

View File

@@ -40,9 +40,13 @@ class OidcAuthController < ApplicationController
# Add PKCE verifier if available
code_verifier = retrieve_pkce_verifier
oidc_client.code_verifier = code_verifier if code_verifier.present?
access_token = oidc_client.access_token!
# Pass code_verifier as parameter to access_token! method (PKCE support)
access_token = if code_verifier.present?
oidc_client.access_token!(:body, code_verifier: code_verifier)
else
oidc_client.access_token!
end
# Extract claims from ID token (JWT-only approach)
id_token = access_token.id_token
@@ -171,7 +175,7 @@ class OidcAuthController < ApplicationController
# JWT claim extraction and validation
def extract_claims_from_id_token(id_token)
# Decode JWT without verification first to get claims
decoded_jwt = JWT.decode(id_token, nil, false).first
decoded_jwt = JSON::JWT.decode(id_token, :skip_verification)
{
sub: decoded_jwt['sub'],

View File

@@ -261,12 +261,6 @@ def process_quick_create_parameters
# Ensure metadata is a hash
@rule.metadata = {} unless @rule.metadata.is_a?(Hash)
# Handle add_header fields - use provided params or existing metadata values
if @rule.add_header_action? && (params[:header_name].present? || params[:header_value].present?)
@rule.metadata['header_name'] = params[:header_name].presence || @rule.metadata['header_name'] || 'X-Bot-Agent'
@rule.metadata['header_value'] = params[:header_value].presence || @rule.metadata['header_value'] || 'Unknown'
end
# Handle expires_at parsing for text input
if params.dig(:rule, :expires_at).present?
expires_at_str = params[:rule][:expires_at].strip

View File

@@ -37,20 +37,49 @@ export default class extends Controller {
// Convert ISO time to local time
const date = new Date(timeIso)
const localTime = date.toLocaleTimeString(undefined, {
// Determine if we should show date based on time range
const now = new Date()
const timeDiff = now - date
const hoursDiff = timeDiff / (1000 * 60 * 60)
let displayTime
if (hoursDiff > 25) {
// For periods longer than 25 hours, show date only (no time)
displayTime = date.toLocaleDateString(undefined, {
month: 'short',
day: 'numeric'
})
} else {
// Check if this is midnight UTC data (daily timeline) vs actual time data (hourly timeline)
// Daily timeline: time is at UTC midnight (hours/minutes/seconds = 0)
// Hourly timeline: time has actual hours/minutes
const utcHours = date.getUTCHours()
const utcMinutes = date.getUTCMinutes()
const utcSeconds = date.getUTCSeconds()
if (utcHours === 0 && utcMinutes === 0 && utcSeconds === 0) {
// This is midnight UTC - treat as daily data, show date only
displayTime = date.toLocaleDateString(undefined, {
month: 'short',
day: 'numeric'
})
} else {
// This is actual time data - show time only
displayTime = date.toLocaleTimeString(undefined, {
hour: '2-digit',
minute: '2-digit',
hour12: false
})
}
}
timeElement.textContent = localTime
timeElement.textContent = displayTime
timeElement.title = date.toLocaleString(undefined, {
weekday: 'short',
year: 'numeric',
month: 'short',
day: 'numeric',
hour: '2-digit',
minute: '2-digit',
timeZoneName: 'short'
})

View File

@@ -0,0 +1,52 @@
# frozen_string_literal: true
# One-time job to bootstrap Parquet export system
# Exports all existing DuckDB data to weekly Parquet archives
# Run this once when setting up Parquet exports for the first time
#
# Usage:
# BootstrapParquetExportJob.perform_now
# # or via docker:
# docker compose exec jobs bin/rails runner "BootstrapParquetExportJob.perform_now"
class BootstrapParquetExportJob < ApplicationJob
queue_as :default
def perform
service = AnalyticsDuckdbService.instance
# Check if DuckDB has any data
event_count = service.event_count
Rails.logger.info "[Parquet Bootstrap] DuckDB event count: #{event_count}"
if event_count == 0
Rails.logger.warn "[Parquet Bootstrap] No events in DuckDB. Run SyncEventsToDuckdbJob first."
return
end
# Check if Parquet files already exist
existing_weeks = Dir.glob(AnalyticsDuckdbService::PARQUET_WEEKS_PATH.join("*.parquet")).size
if existing_weeks > 0
Rails.logger.info "[Parquet Bootstrap] Found #{existing_weeks} existing week archives"
end
Rails.logger.info "[Parquet Bootstrap] Starting export of all DuckDB data to Parquet..."
start_time = Time.current
# Run the bootstrap export
service.export_all_to_parquet
duration = Time.current - start_time
week_count = Dir.glob(AnalyticsDuckdbService::PARQUET_WEEKS_PATH.join("*.parquet")).size
Rails.logger.info "[Parquet Bootstrap] Complete!"
Rails.logger.info "[Parquet Bootstrap] - Time taken: #{duration.round(2)} seconds"
Rails.logger.info "[Parquet Bootstrap] - Week archives: #{week_count}"
Rails.logger.info "[Parquet Bootstrap] - Storage: #{AnalyticsDuckdbService::PARQUET_BASE_PATH}"
Rails.logger.info "[Parquet Bootstrap] System is ready - jobs will maintain exports automatically"
rescue StandardError => e
Rails.logger.error "[Parquet Bootstrap] Job failed: #{e.message}"
Rails.logger.error e.backtrace.join("\n")
raise # Re-raise to mark job as failed
end
end

View File

@@ -0,0 +1,25 @@
# frozen_string_literal: true
# Background job to consolidate completed hour into day file
# Runs at :05 past each hour (e.g., 01:05, 02:05, etc.)
# Merges the previous hour's data into the day file and deletes the hour file
class ConsolidateParquetHourlyJob < ApplicationJob
queue_as :default
def perform
service = AnalyticsDuckdbService.instance
# Consolidate the previous hour (not current hour, which is still being written)
previous_hour = 1.hour.ago
Rails.logger.info "[Parquet Consolidate] Starting hourly consolidation for #{previous_hour.strftime('%Y-%m-%d %H:00')}"
service.consolidate_hour_to_day(previous_hour)
Rails.logger.info "[Parquet Consolidate] Hourly consolidation complete"
rescue StandardError => e
Rails.logger.error "[Parquet Consolidate] Hourly job failed: #{e.message}"
Rails.logger.error e.backtrace.join("\n")
raise # Re-raise to mark job as failed in Solid Queue
end
end

View File

@@ -0,0 +1,25 @@
# frozen_string_literal: true
# Background job to consolidate completed week into archive
# Runs Monday at 00:05 (start of new week)
# Merges the previous week's day files into a week archive and deletes day files
class ConsolidateParquetWeeklyJob < ApplicationJob
queue_as :default
def perform
service = AnalyticsDuckdbService.instance
# Consolidate the previous week (Monday to Sunday)
previous_week_start = 1.week.ago.beginning_of_week
Rails.logger.info "[Parquet Consolidate] Starting weekly consolidation for week starting #{previous_week_start.strftime('%Y-%m-%d')}"
service.consolidate_days_to_week(previous_week_start)
Rails.logger.info "[Parquet Consolidate] Weekly consolidation complete"
rescue StandardError => e
Rails.logger.error "[Parquet Consolidate] Weekly job failed: #{e.message}"
Rails.logger.error e.backtrace.join("\n")
raise # Re-raise to mark job as failed in Solid Queue
end
end

View File

@@ -79,6 +79,7 @@ class ProcessWafEventJob < ApplicationJob
Rails.logger.debug "Network processing took #{((Time.current - network_start) * 1000).round(2)}ms"
rescue => e
Rails.logger.warn "Failed to process network range for event #{event.id}: #{e.message}"
Sentry.capture_exception(e)
end
elsif event.ip_address.present?
Rails.logger.warn "Event #{event.id} has IP but no network_range_id (private IP?)"

View File

@@ -1,89 +0,0 @@
# frozen_string_literal: true
# Background job to sync events from PostgreSQL to DuckDB
# Runs every 5 minutes to keep analytics database up-to-date
# Uses watermark tracking to only sync new events
class SyncEventsToDuckdbJob < ApplicationJob
queue_as :default
# Key for storing last sync timestamp in Rails cache
WATERMARK_CACHE_KEY = "duckdb_last_sync_time"
WATERMARK_TTL = 1.week
# Overlap window to catch late-arriving events
SYNC_OVERLAP = 1.minute
def perform
service = AnalyticsDuckdbService.instance
# Determine where to start syncing
from_timestamp = determine_sync_start_time(service)
Rails.logger.info "[DuckDB Sync] Starting sync from #{from_timestamp}"
# Sync new events using PostgreSQL cursor + DuckDB Appender
# (setup_schema is called internally within sync_new_events)
count = service.sync_new_events(from_timestamp)
# Update watermark if we synced any events
if count > 0
update_last_sync_time
Rails.logger.info "[DuckDB Sync] Successfully synced #{count} events"
else
Rails.logger.info "[DuckDB Sync] No new events to sync"
end
rescue StandardError => e
Rails.logger.error "[DuckDB Sync] Job failed: #{e.message}"
Rails.logger.error e.backtrace.join("\n")
raise # Re-raise to mark job as failed in Solid Queue
end
private
# Determine timestamp to start syncing from
# Strategy:
# 1. First run (DuckDB empty): sync from oldest PostgreSQL event
# 2. Subsequent runs: sync from last watermark with overlap
def determine_sync_start_time(service)
oldest_duckdb = service.oldest_event_timestamp
if oldest_duckdb.nil?
# DuckDB is empty - this is the first sync
# Start from oldest PostgreSQL event (or reasonable cutoff)
oldest_pg = Event.minimum(:timestamp)
if oldest_pg.nil?
# No events in PostgreSQL at all
Rails.logger.warn "[DuckDB Sync] No events found in PostgreSQL"
1.day.ago # Default to recent window
else
Rails.logger.info "[DuckDB Sync] First sync - starting from oldest event: #{oldest_pg}"
oldest_pg
end
else
# DuckDB has data - sync from last watermark with overlap
last_sync = Rails.cache.read(WATERMARK_CACHE_KEY)
if last_sync.nil?
# Watermark not in cache (maybe cache expired or restarted)
# Fall back to newest event in DuckDB
newest_duckdb = service.newest_event_timestamp
start_time = newest_duckdb ? newest_duckdb - SYNC_OVERLAP : oldest_duckdb
Rails.logger.info "[DuckDB Sync] Watermark not found, using newest DuckDB event: #{start_time}"
start_time
else
# Normal case: use watermark with overlap to catch late arrivals
start_time = last_sync - SYNC_OVERLAP
Rails.logger.debug "[DuckDB Sync] Using watermark: #{last_sync} (with #{SYNC_OVERLAP}s overlap)"
start_time
end
end
end
# Update last sync watermark in cache
def update_last_sync_time
now = Time.current
Rails.cache.write(WATERMARK_CACHE_KEY, now, expires_in: WATERMARK_TTL)
Rails.logger.debug "[DuckDB Sync] Updated watermark to #{now}"
end
end

View File

@@ -105,6 +105,11 @@ class Event < ApplicationRecord
joins(:network_range).where("network_ranges.network = ?", cidr)
}
# Bot filtering scopes
scope :bots, -> { where(is_bot: true) }
scope :humans, -> { where(is_bot: false) }
scope :exclude_bots, -> { where(is_bot: false) }
# Add association for the optional network_range_id
belongs_to :network_range, optional: true
@@ -191,6 +196,9 @@ class Event < ApplicationRecord
# Populate network intelligence from IP address
before_save :populate_network_intelligence, if: :should_populate_network_intelligence?
# Detect bot traffic using user agent and network intelligence
before_save :detect_bot_traffic, if: :should_detect_bot?
# Backfill network intelligence for all events
def self.backfill_network_intelligence!(batch_size: 10_000)
total = where(country: nil).count
@@ -218,8 +226,8 @@ class Event < ApplicationRecord
# Normalize headers in payload during import phase
normalized_payload = normalize_payload_headers(payload)
# Create the WAF request event
create!(
# Create the WAF request event with agent-provided tags
event = create!(
request_id: request_id,
timestamp: parse_timestamp(normalized_payload["timestamp"]),
payload: normalized_payload,
@@ -242,11 +250,18 @@ class Event < ApplicationRecord
server_name: normalized_payload["server_name"],
environment: normalized_payload["environment"],
# Tags: start with agent-provided tags only
tags: normalized_payload["tags"] || [],
# WAF agent info
agent_version: normalized_payload.dig("agent", "version"),
agent_name: normalized_payload.dig("agent", "name")
)
# Apply rule tags using EventTagger service
EventTagger.tag_event(event)
event
end
# Normalize headers in payload to lower case during import phase
@@ -339,7 +354,10 @@ class Event < ApplicationRecord
def tags
# Use the dedicated tags column (array), fallback to payload during transition
super.presence || (payload&.dig("tags") || [])
# Ensure we always return an Array, even if payload has malformed data (e.g., {} instead of [])
result = super.presence || payload&.dig("tags")
return [] if result.nil?
result.is_a?(Array) ? result : []
end
def headers
@@ -699,4 +717,82 @@ class Event < ApplicationRecord
self.agent_version = agent_data["version"]
self.agent_name = agent_data["name"]
end
def should_detect_bot?
# Detect bots if user agent is present or if we have network intelligence
user_agent.present? || network_range_id.present?
end
def detect_bot_traffic
self.is_bot = bot_detected?
rescue => e
Rails.logger.error "Failed to detect bot for event #{id}: #{e.message}"
self.is_bot = false # Default to non-bot on error
end
def bot_detected?
# Multi-signal bot detection approach with tagging:
# 1. User agent detection (DeviceDetector gem) - adds bot:name tag
# 2. Network range source matching (bot_import_* sources) - adds network tags
# 3. Fallback to datacenter classification for infrastructure-based detection
# Signal 1: User agent bot detection (uses DeviceDetector's built-in cache)
if user_agent.present?
begin
detector = DeviceDetector.new(user_agent)
if detector.bot?
# Add bot tag with specific bot name
bot_name = detector.bot_name&.downcase&.gsub(/\s+/, '_') || 'unknown'
add_tag("bot:#{bot_name}")
return true
end
rescue => e
Rails.logger.debug "DeviceDetector failed for user agent: #{e.message}"
end
end
# Signal 2: Network range from known bot sources
if network_range_id.present?
range = NetworkRange.find_by(id: network_range_id)
if range
# Check if the network range source indicates a bot import
if range.source&.start_with?('bot_import_')
# Extract bot type from source (e.g., 'bot_import_googlebot' -> 'googlebot')
bot_type = range.source.sub('bot_import_', '')
add_tag("bot:#{bot_type}")
add_tag("network:#{range.company&.downcase&.gsub(/\s+/, '_')}") if range.company.present?
return true
end
# Check if the company is a known bot provider (from bot imports)
# Common bot companies: Google, Amazon, OpenAI, Cloudflare, Microsoft, etc.
known_bot_companies = ['googlebot', 'google bot', 'amazon', 'aws', 'openai',
'anthropic', 'cloudflare', 'microsoft', 'facebook',
'meta', 'apple', 'duckduckgo']
company_lower = company&.downcase
if company_lower && known_bot_companies.any? { |bot| company_lower.include?(bot) }
add_tag("bot:#{company_lower.gsub(/\s+/, '_')}")
add_tag("network:#{company_lower.gsub(/\s+/, '_')}")
return true
end
end
end
# Signal 3: Datacenter traffic is often bot traffic
# However, this is less precise so we use it as a weaker signal
# Only mark as bot if datacenter AND has other suspicious characteristics
if is_datacenter && user_agent.present?
# Generic/common bot user agents in datacenter networks
ua_lower = user_agent.downcase
bot_keywords = ['bot', 'crawler', 'spider', 'scraper', 'curl', 'wget', 'python', 'go-http-client']
if bot_keywords.any? { |keyword| ua_lower.include?(keyword) }
add_tag("bot:datacenter")
add_tag("datacenter:true")
return true
end
end
# Default: not a bot
false
end
end

View File

@@ -2,19 +2,54 @@
require 'ostruct'
# EventDdb - DuckDB-backed analytics queries for events
# Provides an ActiveRecord-like interface for querying DuckDB events table
# Falls back to PostgreSQL Event model if DuckDB is unavailable
# EventDdb - DuckLake-backed analytics queries for events
# Provides an ActiveRecord-like interface for querying DuckLake events table
# Falls back to PostgreSQL Event model if DuckLake is unavailable
class EventDdb
# Enum mappings from integer to string (matching Event model)
ACTION_MAP = {
0 => "deny",
1 => "allow",
2 => "redirect",
3 => "challenge",
4 => "log"
}.freeze
METHOD_MAP = {
0 => "get",
1 => "post",
2 => "put",
3 => "patch",
4 => "delete",
5 => "head",
6 => "options"
}.freeze
class << self
# Get DuckDB service
# Get DuckLake service
def service
AnalyticsDuckdbService.instance
AnalyticsDucklakeService.new
end
# Helper to work with DuckLake events table
# This allows all existing queries to work without modification
def with_events_from_parquet(&block)
service.with_connection do |conn|
# Ensure schema exists
service.setup_schema(conn)
# Use the DuckLake events table directly
# DuckLake automatically manages the Parquet files underneath
yield conn
end
rescue StandardError => e
Rails.logger.error "[EventDdb] Error accessing DuckLake events: #{e.message}"
nil
end
# Total events since timestamp
def count_since(start_time)
service.with_connection do |conn|
with_events_from_parquet do |conn|
result = conn.query("SELECT COUNT(*) as count FROM events WHERE timestamp >= ?", start_time)
result.first&.first || 0
end
@@ -25,7 +60,7 @@ class EventDdb
# Event breakdown by WAF action
def breakdown_by_action(start_time)
service.with_connection do |conn|
with_events_from_parquet do |conn|
result = conn.query(<<~SQL, start_time)
SELECT waf_action, COUNT(*) as count
FROM events
@@ -34,7 +69,10 @@ class EventDdb
SQL
# Convert to hash like ActiveRecord .group.count returns
result.to_a.to_h { |row| [row["waf_action"], row["count"]] }
# DuckDB returns integer enum values, map to string names
# 0=deny, 1=allow, 2=redirect, 3=challenge, 4=log
action_map = { 0 => "deny", 1 => "allow", 2 => "redirect", 3 => "challenge", 4 => "log" }
result.to_a.to_h { |row| [action_map[row[0]] || "unknown", row[1]] }
end
rescue StandardError => e
Rails.logger.error "[EventDdb] Error in breakdown_by_action: #{e.message}"
@@ -43,7 +81,7 @@ class EventDdb
# Top countries with event counts
def top_countries(start_time, limit = 10)
service.with_connection do |conn|
with_events_from_parquet do |conn|
result = conn.query(<<~SQL, start_time, limit)
SELECT country, COUNT(*) as count
FROM events
@@ -54,7 +92,8 @@ class EventDdb
SQL
# Return array of [country, count] tuples like ActiveRecord
result.to_a.map { |row| [row["country"], row["count"]] }
# DuckDB returns arrays: [country, count]
result.to_a.map { |row| [row[0], row[1]] }
end
rescue StandardError => e
Rails.logger.error "[EventDdb] Error in top_countries: #{e.message}"
@@ -63,7 +102,7 @@ class EventDdb
# Top blocked IPs
def top_blocked_ips(start_time, limit = 10)
service.with_connection do |conn|
with_events_from_parquet do |conn|
result = conn.query(<<~SQL, start_time, limit)
SELECT ip_address, COUNT(*) as count
FROM events
@@ -73,7 +112,8 @@ class EventDdb
LIMIT ?
SQL
result.to_a.map { |row| [row["ip_address"], row["count"]] }
# DuckDB returns arrays: [ip_address, count]
result.to_a.map { |row| [row[0], row[1]] }
end
rescue StandardError => e
Rails.logger.error "[EventDdb] Error in top_blocked_ips: #{e.message}"
@@ -82,7 +122,7 @@ class EventDdb
# Hourly timeline aggregation
def hourly_timeline(start_time, end_time)
service.with_connection do |conn|
with_events_from_parquet do |conn|
result = conn.query(<<~SQL, start_time, end_time)
SELECT
DATE_TRUNC('hour', timestamp) as hour,
@@ -94,7 +134,8 @@ class EventDdb
SQL
# Convert to hash with Time keys like ActiveRecord
result.to_a.to_h { |row| [row["hour"], row["count"]] }
# DuckDB returns arrays: [hour, count]
result.to_a.to_h { |row| [row[0], row[1]] }
end
rescue StandardError => e
Rails.logger.error "[EventDdb] Error in hourly_timeline: #{e.message}"
@@ -104,7 +145,7 @@ class EventDdb
# Top networks by traffic volume
# Returns array of arrays: [network_range_id, event_count, unique_ips]
def top_networks(start_time, limit = 50)
service.with_connection do |conn|
with_events_from_parquet do |conn|
result = conn.query(<<~SQL, start_time, limit)
SELECT
network_range_id,
@@ -127,7 +168,7 @@ class EventDdb
# Top companies
# Returns array of OpenStruct objects with: company, event_count, unique_ips, network_count
def top_companies(start_time, limit = 20)
service.with_connection do |conn|
with_events_from_parquet do |conn|
result = conn.query(<<~SQL, start_time, limit)
SELECT
company,
@@ -159,7 +200,7 @@ class EventDdb
# Top ASNs
# Returns array of OpenStruct objects with: asn, asn_org, event_count, unique_ips, network_count
def top_asns(start_time, limit = 15)
service.with_connection do |conn|
with_events_from_parquet do |conn|
result = conn.query(<<~SQL, start_time, limit)
SELECT
asn,
@@ -193,7 +234,7 @@ class EventDdb
# Network type breakdown (datacenter, VPN, proxy, standard)
# Returns hash with network_type as key and hash of stats as value
def network_type_breakdown(start_time)
service.with_connection do |conn|
with_events_from_parquet do |conn|
result = conn.query(<<~SQL, start_time)
SELECT
CASE
@@ -230,7 +271,7 @@ class EventDdb
# Top countries with detailed stats (event count and unique IPs)
# Returns array of OpenStruct objects with: country, event_count, unique_ips
def top_countries_with_stats(start_time, limit = 15)
service.with_connection do |conn|
with_events_from_parquet do |conn|
result = conn.query(<<~SQL, start_time, limit)
SELECT
country,
@@ -260,7 +301,7 @@ class EventDdb
# Network type stats with formatted output matching controller expectations
# Returns hash with type keys containing label, networks, events, unique_ips, percentage
def network_type_stats(start_time)
service.with_connection do |conn|
with_events_from_parquet do |conn|
# Get total events for percentage calculation
total_result = conn.query("SELECT COUNT(*) as total FROM events WHERE timestamp >= ?", start_time)
total_events = total_result.first&.first || 0
@@ -303,7 +344,7 @@ class EventDdb
network_range_ids = Array(network_range_ids)
return nil if network_range_ids.empty?
service.with_connection do |conn|
with_events_from_parquet do |conn|
# Build IN clause with placeholders
placeholders = network_range_ids.map { "?" }.join(", ")
@@ -338,7 +379,7 @@ class EventDdb
network_range_ids = Array(network_range_ids)
return nil if network_range_ids.empty?
service.with_connection do |conn|
with_events_from_parquet do |conn|
# Build IN clause with placeholders
placeholders = network_range_ids.map { "?" }.join(", ")
@@ -366,7 +407,7 @@ class EventDdb
network_range_ids = Array(network_range_ids)
return nil if network_range_ids.empty?
service.with_connection do |conn|
with_events_from_parquet do |conn|
# Build IN clause with placeholders
placeholders = network_range_ids.map { "?" }.join(", ")
@@ -389,13 +430,36 @@ class EventDdb
nil
end
# Count events for network range(s)
# Returns integer count of all events in the network
def network_event_count(network_range_ids)
network_range_ids = Array(network_range_ids)
return nil if network_range_ids.empty?
with_events_from_parquet do |conn|
# Build IN clause with placeholders
placeholders = network_range_ids.map { "?" }.join(", ")
result = conn.query(<<~SQL, *network_range_ids)
SELECT COUNT(*) as count
FROM events
WHERE network_range_id IN (#{placeholders})
SQL
result.first&.first || 0
end
rescue StandardError => e
Rails.logger.error "[EventDdb] Error in network_event_count: #{e.message}"
nil
end
# Full user agent tally for network range(s)
# Returns hash of user_agent => count for all agents in the network
def network_agent_tally(network_range_ids)
network_range_ids = Array(network_range_ids)
return nil if network_range_ids.empty?
service.with_connection do |conn|
with_events_from_parquet do |conn|
# Build IN clause with placeholders
placeholders = network_range_ids.map { "?" }.join(", ")
@@ -420,7 +484,7 @@ class EventDdb
# Suspicious network activity patterns
# Detects high-volume networks, high deny rates, and distributed companies
def suspicious_patterns(start_time)
service.with_connection do |conn|
with_events_from_parquet do |conn|
# High volume networks (5x average)
avg_query = conn.query(<<~SQL, start_time)
SELECT
@@ -495,5 +559,280 @@ class EventDdb
Rails.logger.error "[EventDdb] Error in suspicious_patterns: #{e.message}"
nil
end
# Bot traffic analysis - breakdown of bot vs human traffic
def bot_traffic_breakdown(start_time)
with_events_from_parquet do |conn|
result = conn.query(<<~SQL, start_time)
SELECT
is_bot,
COUNT(*) as event_count,
COUNT(DISTINCT ip_address) as unique_ips
FROM events
WHERE timestamp >= ?
GROUP BY is_bot
SQL
# Convert to hash: is_bot => { event_count, unique_ips }
# DuckDB returns arrays: [is_bot, event_count, unique_ips]
result.to_a.to_h do |row|
[
row[0] ? "bot" : "human", # row[0] = is_bot
{
"event_count" => row[1], # row[1] = event_count
"unique_ips" => row[2] # row[2] = unique_ips
}
]
end
end
rescue StandardError => e
Rails.logger.error "[EventDdb] Error in bot_traffic_breakdown: #{e.message}"
nil
end
# Count human traffic (non-bot) since timestamp
def human_traffic_count(start_time)
with_events_from_parquet do |conn|
result = conn.query(<<~SQL, start_time)
SELECT COUNT(*) as count
FROM events
WHERE timestamp >= ? AND is_bot = false
SQL
result.first&.first || 0
end
rescue StandardError => e
Rails.logger.error "[EventDdb] Error in human_traffic_count: #{e.message}"
nil
end
# Count bot traffic since timestamp
def bot_traffic_count(start_time)
with_events_from_parquet do |conn|
result = conn.query(<<~SQL, start_time)
SELECT COUNT(*) as count
FROM events
WHERE timestamp >= ? AND is_bot = true
SQL
result.first&.first || 0
end
rescue StandardError => e
Rails.logger.error "[EventDdb] Error in bot_traffic_count: #{e.message}"
nil
end
# Top bot user agents
def top_bot_user_agents(start_time, limit = 20)
with_events_from_parquet do |conn|
result = conn.query(<<~SQL, start_time, limit)
SELECT
user_agent,
COUNT(*) as event_count,
COUNT(DISTINCT ip_address) as unique_ips
FROM events
WHERE timestamp >= ? AND is_bot = true AND user_agent IS NOT NULL
GROUP BY user_agent
ORDER BY event_count DESC
LIMIT ?
SQL
# DuckDB returns arrays: [user_agent, event_count, unique_ips]
result.to_a.map do |row|
{
user_agent: row[0], # row[0] = user_agent
event_count: row[1], # row[1] = event_count
unique_ips: row[2] # row[2] = unique_ips
}
end
end
rescue StandardError => e
Rails.logger.error "[EventDdb] Error in top_bot_user_agents: #{e.message}"
nil
end
# Bot traffic timeline (hourly breakdown)
def bot_traffic_timeline(start_time, end_time)
with_events_from_parquet do |conn|
result = conn.query(<<~SQL, start_time, end_time)
SELECT
DATE_TRUNC('hour', timestamp) as hour,
SUM(CASE WHEN is_bot = true THEN 1 ELSE 0 END) as bot_count,
SUM(CASE WHEN is_bot = false THEN 1 ELSE 0 END) as human_count
FROM events
WHERE timestamp >= ? AND timestamp < ?
GROUP BY hour
ORDER BY hour
SQL
# Convert to hash with Time keys
# DuckDB returns arrays: [hour, bot_count, human_count]
result.to_a.to_h do |row|
[
row[0], # row[0] = hour
{
"bot_count" => row[1], # row[1] = bot_count
"human_count" => row[2], # row[2] = human_count
"total" => row[1] + row[2]
}
]
end
end
rescue StandardError => e
Rails.logger.error "[EventDdb] Error in bot_traffic_timeline: #{e.message}"
nil
end
# Search events with filters and pagination
# Returns { total_count:, events:[], page:, per_page: }
# Supports filters: ip, waf_action, country, rule_id, company, asn, network_type, network_range_id, exclude_bots, request_path
def search(filters = {}, page: 1, per_page: 50)
with_events_from_parquet do |conn|
# Build WHERE clause
where_clause, params = build_where_clause(filters)
# Get total count
count_sql = "SELECT COUNT(*) FROM baffle.events#{where_clause}"
count_result = conn.query(count_sql, *params)
total_count = count_result.first&.first || 0
# Get paginated results
offset = (page - 1) * per_page
data_sql = <<~SQL
SELECT
id, timestamp, ip_address, network_range_id, country, company,
asn, asn_org, is_datacenter, is_vpn, is_proxy, is_bot,
waf_action, request_method, response_status, rule_id,
request_path, user_agent, tags
FROM baffle.events
#{where_clause}
ORDER BY timestamp DESC
LIMIT ? OFFSET ?
SQL
result = conn.query(data_sql, *params, per_page, offset)
# Convert rows to event-like objects
events = result.to_a.map { |row| row_to_event(row) }
{
total_count: total_count,
events: events,
page: page,
per_page: per_page
}
end
rescue StandardError => e
Rails.logger.error "[EventDdb] Error in DuckLake search: #{e.message}"
Rails.logger.error e.backtrace.join("\n")
nil
end
private
# Build WHERE clause and params from filters hash
# Returns [where_clause_string, params_array]
def build_where_clause(filters)
conditions = []
params = []
if filters[:ip].present?
conditions << "ip_address = ?"
params << filters[:ip]
end
if filters[:waf_action].present?
# Convert string action to integer
action_int = ACTION_MAP.key(filters[:waf_action].to_s)
if action_int
conditions << "waf_action = ?"
params << action_int
end
end
if filters[:country].present?
conditions << "country = ?"
params << filters[:country]
end
if filters[:rule_id].present?
conditions << "rule_id = ?"
params << filters[:rule_id].to_i
end
if filters[:company].present?
conditions << "company ILIKE ?"
params << "%#{filters[:company]}%"
end
if filters[:asn].present?
conditions << "asn = ?"
params << filters[:asn].to_i
end
if filters[:network_range_id].present?
conditions << "network_range_id = ?"
params << filters[:network_range_id].to_i
end
# Network type filter
if filters[:network_type].present?
case filters[:network_type].to_s.downcase
when "datacenter"
conditions << "is_datacenter = true"
when "vpn"
conditions << "is_vpn = true"
when "proxy"
conditions << "is_proxy = true"
when "standard"
conditions << "(is_datacenter = false AND is_vpn = false AND is_proxy = false)"
end
end
# Path filtering
if filters[:request_path].present?
conditions << "request_path = ?"
params << filters[:request_path]
end
# Bot filtering
if filters[:exclude_bots] == true || filters[:exclude_bots] == "true"
conditions << "is_bot = false"
end
where_clause = conditions.any? ? " WHERE #{conditions.join(' AND ')}" : ""
[where_clause, params]
end
# Convert DuckDB row array to event-like OpenStruct
def row_to_event(row)
OpenStruct.new(
id: row[0],
timestamp: row[1],
ip_address: row[2],
network_range_id: row[3],
country: row[4],
company: row[5],
asn: row[6],
asn_org: row[7],
is_datacenter: row[8],
is_vpn: row[9],
is_proxy: row[10],
is_bot: row[11],
waf_action: ACTION_MAP[row[12]] || "unknown",
request_method: METHOD_MAP[row[13]],
response_status: row[14],
rule_id: row[15],
request_path: row[16],
user_agent: row[17],
tags: row[18] || [],
# Add helper method for country lookup
lookup_country: row[4],
# Network range will be loaded separately in controller
network_range: nil,
rule: nil
)
end
end
end

View File

@@ -7,10 +7,15 @@
# and classification flags (datacenter, proxy, VPN).
class NetworkRange < ApplicationRecord
# Sources for network range creation
SOURCES = %w[api_imported user_created manual auto_generated inherited geolite_asn geolite_country].freeze
SOURCES = %w[api_imported user_created manual auto_generated inherited geolite_asn geolite_country
bot_import_amazon_aws bot_import_google bot_import_microsoft_bing bot_import_anthropic
bot_import_openai_searchbot bot_import_openai_chatgpt_user bot_import_openai_gptbot
bot_import_cloudflare bot_import_facebook bot_import_applebot bot_import_duckduckgo
production_import].freeze
# Associations
has_many :rules, dependent: :destroy
has_many :events, foreign_key: :network_range_id, dependent: :nullify
belongs_to :user, optional: true
# Validations
@@ -32,8 +37,8 @@ class NetworkRange < ApplicationRecord
scope :geolite_imported, -> { where(source: ['geolite_asn', 'geolite_country']) }
scope :geolite_asn, -> { where(source: 'geolite_asn') }
scope :geolite_country, -> { where(source: 'geolite_country') }
scope :with_events, -> { where("events_count > 0") }
scope :most_active, -> { order(events_count: :desc) }
scope :with_events, -> { joins(:events).distinct }
scope :most_active, -> { joins(:events).group('network_ranges.id').order('COUNT(events.id) DESC') }
# Callbacks
before_validation :set_default_source
@@ -116,19 +121,19 @@ class NetworkRange < ApplicationRecord
# Parent/child relationships
def parent_ranges
NetworkRange.where("?::inet << network AND masklen(network) < ?", network.to_s, prefix_length)
.order("masklen(network) DESC")
# Find networks that contain this network (less specific / shorter prefix)
# The << operator implicitly means the containing network has a shorter prefix
# IMPORTANT: Use cidr (not network.to_s) to preserve the network mask
NetworkRange.where("?::inet << network", cidr)
.order("masklen(network) DESC") # Most specific parent first
end
def child_ranges
NetworkRange.where("network >> ?::inet AND masklen(network) > ?", network.to_s, prefix_length)
.order("masklen(network) ASC")
end
def sibling_ranges
NetworkRange.where("masklen(network) = ?", prefix_length)
.where("network && ?::inet", network.to_s)
.where.not(id: id)
# Find networks that are contained by this network (more specific / longer prefix)
# The >> operator implicitly means the contained network has a longer prefix
# IMPORTANT: Use cidr (not network.to_s) to preserve the network mask
NetworkRange.where("?::inet >> network", cidr)
.order("masklen(network) ASC") # Least specific child first
end
# Find nearest parent with intelligence data
@@ -237,7 +242,7 @@ class NetworkRange < ApplicationRecord
def agent_tally
Rails.cache.fetch("#{cache_key}:agent_tally", expires_in: 5.minutes) do
# Use DuckDB for fast agent tally instead of loading all events into memory
if persisted? && events_count > 0
if persisted? && has_events?
# Include child network ranges to capture all traffic within this network block
network_ids = [id] + child_ranges.pluck(:id)
@@ -413,10 +418,16 @@ class NetworkRange < ApplicationRecord
cidr.to_s.gsub('/', '_')
end
# Analytics methods - events_count is now a counter cache column maintained by database triggers
# This is much more performant than the previous implementation that did complex network queries
def events_count
self[:events_count] || 0
# Check if network range has any events using DuckDB for performance
def has_events?
return false unless persisted?
# Include child network ranges to capture all traffic within this network block
network_ids = [id] + child_ranges.pluck(:id)
# Try DuckDB first for fast event count check
event_count = with_duckdb_fallback { EventDdb.network_event_count(network_ids) }
event_count&.positive? || events.exists?
end
def events

View File

@@ -7,7 +7,8 @@
class Rule < ApplicationRecord
# Rule enums (prefix needed to avoid rate_limit collision)
# Canonical WAF action order - aligned with Agent and Event models
enum :waf_action, { deny: 0, allow: 1, redirect: 2, challenge: 3, log: 4, add_header: 5 }, prefix: :action
# Note: allow and log actions can include headers/tags in metadata for automatic injection
enum :waf_action, { deny: 0, allow: 1, redirect: 2, challenge: 3, log: 4 }, prefix: :action
enum :waf_rule_type, { network: 0, rate_limit: 1, path_pattern: 2 }, prefix: :type
SOURCES = %w[manual auto:scanner_detected auto:rate_limit_exceeded auto:bot_detected imported default manual:surgical_block manual:surgical_exception policy].freeze
@@ -120,10 +121,6 @@ class Rule < ApplicationRecord
action_challenge?
end
def add_header_action?
action_add_header?
end
# Redirect/challenge convenience methods
def redirect_url
metadata_hash['redirect_url']
@@ -141,12 +138,40 @@ class Rule < ApplicationRecord
metadata&.dig('challenge_message')
end
def header_name
metadata&.dig('header_name')
# Tag-related methods
def tags
metadata_hash['tags'] || []
end
def header_value
metadata&.dig('header_value')
def tags=(new_tags)
self.metadata = metadata_hash.merge('tags' => Array(new_tags))
end
def add_tag(tag)
current_tags = tags
return if current_tags.include?(tag.to_s)
self.metadata = metadata_hash.merge('tags' => (current_tags + [tag.to_s]))
end
def remove_tag(tag)
current_tags = tags
return unless current_tags.include?(tag.to_s)
self.metadata = metadata_hash.merge('tags' => (current_tags - [tag.to_s]))
end
def has_tag?(tag)
tags.include?(tag.to_s)
end
# Headers for add_header action or metadata-based header injection
def headers
metadata_hash['headers'] || {}
end
def headers=(new_headers)
self.metadata = metadata_hash.merge('headers' => new_headers.to_h)
end
def related_surgical_rules
@@ -433,12 +458,6 @@ class Rule < ApplicationRecord
if source&.start_with?('auto:') || source == 'default'
self.user ||= User.find_by(role: 1) # admin role
end
# Set default header values for add_header action
if add_header_action?
self.metadata['header_name'] ||= 'X-Bot-Agent'
self.metadata['header_value'] ||= 'Unknown'
end
end
def calculate_priority_for_network_rules
@@ -522,13 +541,6 @@ class Rule < ApplicationRecord
if challenge_type_value && !%w[captcha javascript proof_of_work].include?(challenge_type_value)
errors.add(:metadata, "challenge_type must be one of: captcha, javascript, proof_of_work")
end
when "add_header"
unless metadata&.dig("header_name").present?
errors.add(:metadata, "must include 'header_name' for add_header action")
end
unless metadata&.dig("header_value").present?
errors.add(:metadata, "must include 'header_value' for add_header action")
end
end
end

View File

@@ -9,7 +9,7 @@ class WafPolicy < ApplicationRecord
POLICY_TYPES = %w[country asn company network_type path_pattern].freeze
# Actions - what to do when traffic matches this policy
ACTIONS = %w[allow deny redirect challenge add_header].freeze
ACTIONS = %w[allow deny redirect challenge log].freeze
# Associations
belongs_to :user
@@ -25,7 +25,6 @@ validate :targets_must_be_array
validate :validate_targets_by_type
validate :validate_redirect_configuration, if: :redirect_policy_action?
validate :validate_challenge_configuration, if: :challenge_policy_action?
validate :validate_add_header_configuration, if: :add_header_policy_action?
# Scopes
scope :enabled, -> { where(enabled: true) }
@@ -96,10 +95,6 @@ validate :targets_must_be_array
policy_action == 'challenge'
end
def add_header_policy_action?
policy_action == 'add_header'
end
# Lifecycle methods
def active?
enabled? && !expired?
@@ -157,9 +152,18 @@ validate :targets_must_be_array
def create_rule_for_network_range(network_range)
return nil unless matches_network_range?(network_range)
# Check for existing supernet rules before attempting to create
if network_range.supernet_rules.any?
supernet = network_range.supernet_rules.first
Rails.logger.debug "Skipping rule creation for #{network_range.cidr} - covered by supernet rule ##{supernet.id} (#{supernet.network_range.cidr})"
return nil
end
# Try to create the rule, handling duplicates gracefully
begin
rule = Rule.create!(
rule_type: 'network',
action: policy_action,
waf_rule_type: 'network',
waf_action: policy_action.to_sym,
network_range: network_range,
waf_policy: self,
user: user,
@@ -167,8 +171,20 @@ validate :targets_must_be_array
metadata: build_rule_metadata(network_range),
priority: network_range.prefix_length
)
rescue ActiveRecord::RecordNotUnique
# Rule already exists (created by another job or earlier in this job)
# Find and return the existing rule
Rails.logger.debug "Rule already exists for #{network_range.cidr} with policy #{name}"
return Rule.find_by(
waf_rule_type: 'network',
waf_action: policy_action,
network_range: network_range,
waf_policy: self,
source: "policy"
)
end
# Handle redirect/challenge/add_header specific data
# Handle redirect/challenge specific data
if redirect_action? && additional_data['redirect_url']
rule.update!(
metadata: rule.metadata.merge(
@@ -183,13 +199,6 @@ validate :targets_must_be_array
challenge_message: additional_data['challenge_message']
)
)
elsif add_header_action?
rule.update!(
metadata: rule.metadata.merge(
header_name: additional_data['header_name'],
header_value: additional_data['header_value']
)
)
end
rule
@@ -215,7 +224,7 @@ validate :targets_must_be_array
rule = Rule.create!(
waf_rule_type: 'path_pattern',
waf_action: policy_action,
waf_action: policy_action.to_sym,
waf_policy: self,
user: user,
source: "policy",
@@ -224,7 +233,7 @@ validate :targets_must_be_array
priority: 50 # Default priority for path rules
)
# Handle redirect/challenge/add_header specific data
# Handle redirect/challenge specific data
if redirect_action? && additional_data['redirect_url']
rule.update!(
metadata: rule.metadata.merge(
@@ -239,13 +248,6 @@ validate :targets_must_be_array
challenge_message: additional_data['challenge_message']
)
)
elsif add_header_action?
rule.update!(
metadata: rule.metadata.merge(
header_name: additional_data['header_name'],
header_value: additional_data['header_value']
)
)
end
rule
@@ -365,12 +367,6 @@ validate :targets_must_be_array
self.targets ||= []
self.additional_data ||= {}
self.enabled = true if enabled.nil?
# Set default header values for add_header action
if add_header_policy_action?
self.additional_data['header_name'] ||= 'X-Bot-Agent'
self.additional_data['header_value'] ||= 'Unknown'
end
end
def targets_must_be_array
@@ -455,15 +451,6 @@ validate :targets_must_be_array
end
end
def validate_add_header_configuration
if additional_data['header_name'].blank?
errors.add(:additional_data, "must include 'header_name' for add_header action")
end
if additional_data['header_value'].blank?
errors.add(:additional_data, "must include 'header_value' for add_header action")
end
end
# Matching logic for different policy types
def matches_country?(network_range)
country = network_range.country || network_range.inherited_intelligence[:country]

View File

@@ -2,15 +2,22 @@
# Service for managing DuckDB analytics database
# Provides fast analytical queries on events data using columnar storage
# INSTALL ducklake;
# INSTALL sqlite;
# ATTACH 'ducklake:sqlite3:storage/ducklake.sqlite3' AS events (DATA_PATH 'storage/ducklake/events.ducklake');
class AnalyticsDuckdbService
include Singleton
DUCKDB_PATH = Rails.root.join("storage", "analytics.duckdb").to_s
BATCH_SIZE = 10_000
MAX_EVENTS_PER_SYNC = 50_000 # Limit events per job run to prevent OOM
# Execute block with connection, ensuring database and connection are closed afterward
# Execute block with DuckDB connection
# Always uses in-memory database (no file locks, no conflicts)
# Used for writing parquet files and querying parquet files
def with_connection
db = DuckDB::Database.open(DUCKDB_PATH)
db = DuckDB::Database.open(":memory:")
conn = db.connect
yield conn
ensure
@@ -33,15 +40,49 @@ class AnalyticsDuckdbService
is_datacenter BOOLEAN,
is_vpn BOOLEAN,
is_proxy BOOLEAN,
is_bot BOOLEAN,
waf_action INTEGER,
request_method INTEGER,
response_status INTEGER,
rule_id BIGINT,
request_path VARCHAR,
user_agent VARCHAR
user_agent VARCHAR,
tags VARCHAR[]
)
SQL
# Create indexes for common query patterns
create_indexes(conn)
Rails.logger.info "[DuckDB] Schema setup complete"
end
# Create indexes for fast querying
def create_indexes(conn)
indexes = [
"CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp DESC)",
"CREATE INDEX IF NOT EXISTS idx_events_network_range_id ON events(network_range_id)",
"CREATE INDEX IF NOT EXISTS idx_events_ip_address ON events(ip_address)",
"CREATE INDEX IF NOT EXISTS idx_events_waf_action ON events(waf_action)",
"CREATE INDEX IF NOT EXISTS idx_events_country ON events(country)",
"CREATE INDEX IF NOT EXISTS idx_events_company ON events(company)",
"CREATE INDEX IF NOT EXISTS idx_events_asn ON events(asn)",
"CREATE INDEX IF NOT EXISTS idx_events_rule_id ON events(rule_id)",
"CREATE INDEX IF NOT EXISTS idx_events_is_bot ON events(is_bot)",
"CREATE INDEX IF NOT EXISTS idx_events_is_datacenter ON events(is_datacenter)",
"CREATE INDEX IF NOT EXISTS idx_events_is_vpn ON events(is_vpn)",
"CREATE INDEX IF NOT EXISTS idx_events_is_proxy ON events(is_proxy)"
]
indexes.each do |index_sql|
conn.execute(index_sql)
end
Rails.logger.info "[DuckDB] Indexes created"
rescue StandardError => e
Rails.logger.warn "[DuckDB] Index creation warning: #{e.message}"
end
# Get timestamp of oldest event in DuckDB
# Returns nil if table is empty
def oldest_event_timestamp
@@ -52,7 +93,7 @@ class AnalyticsDuckdbService
end
rescue StandardError => e
Rails.logger.error "[DuckDB] Error getting oldest timestamp: #{e.message}"
nil
raise
end
# Get timestamp of newest event in DuckDB
@@ -65,7 +106,7 @@ class AnalyticsDuckdbService
end
rescue StandardError => e
Rails.logger.error "[DuckDB] Error getting newest timestamp: #{e.message}"
nil
raise
end
# Get maximum event ID already synced to DuckDB
@@ -77,31 +118,39 @@ class AnalyticsDuckdbService
end
rescue StandardError => e
Rails.logger.error "[DuckDB] Error getting max ID: #{e.message}"
0
raise
end
# Sync new events from PostgreSQL to DuckDB
# Export new events from PostgreSQL directly to timestamped Parquet file
# Uses PostgreSQL cursor for memory-efficient streaming
# Uses Appender API for fast bulk inserts
# Filters by ID to avoid duplicates
def sync_new_events(from_timestamp)
total_synced = 0
# Writes to minute/YYYYMMDDHHmmss.parquet
# @param from_timestamp [Time] Start timestamp to export from
# @param max_id [Integer] Maximum event ID already exported (to avoid duplicates)
# @return [Hash] { count: Integer, file_path: String, max_id: Integer }
def export_new_events_to_parquet(from_timestamp, max_id = 0)
ensure_parquet_directories
total_exported = 0
exported_max_id = max_id
timestamp = Time.current.utc.strftime("%Y%m%d%H%M%S")
parquet_file = PARQUET_MINUTE_PATH.join("#{timestamp}.parquet")
Rails.logger.info "[Parquet] Exporting events from #{from_timestamp}, max_id=#{max_id} to #{parquet_file}"
start_time = Time.current
with_connection do |conn|
# Ensure table exists
# Create temporary table in memory
setup_schema(conn)
# Get max ID already in DuckDB to avoid duplicates
max_id_result = conn.query("SELECT COALESCE(MAX(id), 0) as max_id FROM events")
max_id = max_id_result.first&.first || 0
Rails.logger.info "[DuckDB] Syncing events from #{from_timestamp}, max_id=#{max_id}"
start_time = Time.current
appender = nil
batch_count = 0
begin
# Use PostgreSQL cursor for memory-efficient streaming
# Create appender for in-memory table
appender = conn.appender("events")
# Stream from PostgreSQL cursor and append to DuckDB in-memory table
# Limit to MAX_EVENTS_PER_SYNC to prevent OOM on large backlogs
Event.where("timestamp >= ? AND id > ?", from_timestamp, max_id)
.select(
:id,
@@ -115,19 +164,18 @@ class AnalyticsDuckdbService
:is_datacenter,
:is_vpn,
:is_proxy,
:is_bot,
:waf_action,
:request_method,
:response_status,
:rule_id,
:request_path,
:user_agent
:user_agent,
:tags
)
.order(:id)
.limit(MAX_EVENTS_PER_SYNC)
.each_row(block_size: BATCH_SIZE) do |event_data|
# Create new appender for each batch
if batch_count % BATCH_SIZE == 0
appender&.close # Close previous appender
appender = conn.appender("events")
end
# Unpack event data from cursor row (Hash from each_row)
begin
appender.append_row(
event_data["id"],
@@ -141,43 +189,65 @@ class AnalyticsDuckdbService
event_data["is_datacenter"],
event_data["is_vpn"],
event_data["is_proxy"],
event_data["is_bot"],
event_data["waf_action"],
event_data["request_method"],
event_data["response_status"],
event_data["rule_id"],
event_data["request_path"],
event_data["user_agent"]
event_data["user_agent"],
event_data["tags"] || []
)
# Track maximum exported ID
exported_max_id = [exported_max_id, event_data["id"]].max
rescue StandardError => e
Rails.logger.error "[DuckDB] Error appending event #{event_data['id']}: #{e.message}"
Rails.logger.error "[DuckDB] event_data = #{event_data.inspect}"
Rails.logger.error "[Parquet] Error appending event #{event_data['id']}: #{e.message}"
Rails.logger.error "[Parquet] event_data = #{event_data.inspect}"
raise
end
batch_count += 1
total_synced += 1
total_exported += 1
# Log progress every BATCH_SIZE events
# Flush and recreate appender every BATCH_SIZE events to avoid chunk overflow
if batch_count % BATCH_SIZE == 0
Rails.logger.info "[DuckDB] Synced batch (total: #{total_synced} events)"
appender.close
appender = conn.appender("events")
Rails.logger.info "[Parquet] Loaded batch (total: #{total_exported} events)"
end
end
# Close final appender
# Close appender
appender&.close
# Export in-memory table to parquet file
conn.execute(<<~SQL)
COPY (SELECT * FROM events ORDER BY timestamp DESC)
TO '#{parquet_file}' (FORMAT PARQUET, COMPRESSION ZSTD)
SQL
duration = Time.current - start_time
rate = total_synced / duration if duration > 0
Rails.logger.info "[DuckDB] Sync complete: #{total_synced} events in #{duration.round(2)}s (~#{rate&.round(0)} events/sec)"
rate = total_exported / duration if duration > 0
# Log completion and check if there are more events to export
if total_exported >= MAX_EVENTS_PER_SYNC
Rails.logger.info "[Parquet] Export complete: #{total_exported} events in #{duration.round(2)}s (~#{rate&.round(0)} events/sec) - hit limit, more events may be pending"
else
Rails.logger.info "[Parquet] Export complete: #{total_exported} events in #{duration.round(2)}s (~#{rate&.round(0)} events/sec)"
end
rescue StandardError => e
appender&.close rescue nil # Ensure appender is closed on error
Rails.logger.error "[DuckDB] Error syncing events: #{e.message}"
Rails.logger.error "[Parquet] Error exporting events: #{e.message}"
Rails.logger.error e.backtrace.join("\n")
raise # Re-raise to be caught by outer rescue
end
end
total_synced
{ count: total_exported, file_path: parquet_file.to_s, max_id: exported_max_id }
rescue StandardError => e
Rails.logger.error "[DuckDB] Sync failed: #{e.message}"
0
Rails.logger.error "[Parquet] Export failed: #{e.message}"
raise
end
# Execute analytical query on DuckDB
@@ -200,7 +270,7 @@ class AnalyticsDuckdbService
end
rescue StandardError => e
Rails.logger.error "[DuckDB] Error getting event count: #{e.message}"
0
raise
end
# Analytics query: Total events since timestamp
@@ -222,7 +292,8 @@ class AnalyticsDuckdbService
SQL
# Convert to hash like PostgreSQL returns
result.to_a.to_h { |row| [row["waf_action"], row["count"]] }
# DuckDB returns arrays: [waf_action, count]
result.to_a.to_h { |row| [row[0], row[1]] }
end
end
@@ -238,7 +309,8 @@ class AnalyticsDuckdbService
LIMIT ?
SQL
result.to_a.map { |row| [row["country"], row["count"]] }
# DuckDB returns arrays: [country, count]
result.to_a.map { |row| [row[0], row[1]] }
end
end
@@ -254,7 +326,8 @@ class AnalyticsDuckdbService
LIMIT ?
SQL
result.to_a.map { |row| [row["ip_address"], row["count"]] }
# DuckDB returns arrays: [ip_address, count]
result.to_a.map { |row| [row[0], row[1]] }
end
end
@@ -272,7 +345,8 @@ class AnalyticsDuckdbService
SQL
# Convert to hash with Time keys like PostgreSQL
result.to_a.to_h { |row| [row["hour"], row["count"]] }
# DuckDB returns arrays: [hour, count]
result.to_a.to_h { |row| [row[0], row[1]] }
end
end
@@ -281,4 +355,254 @@ class AnalyticsDuckdbService
@connection&.close
@connection = nil
end
# ============================================================================
# PARQUET EXPORT SYSTEM
# ============================================================================
PARQUET_BASE_PATH = Rails.root.join("storage", "parquet")
PARQUET_MINUTE_PATH = PARQUET_BASE_PATH.join("minute")
PARQUET_HOURS_PATH = PARQUET_BASE_PATH.join("hours")
PARQUET_DAYS_PATH = PARQUET_BASE_PATH.join("days")
PARQUET_WEEKS_PATH = PARQUET_BASE_PATH.join("weeks")
WEEK_RETENTION = ENV.fetch("PARQUET_WEEK_RETENTION", 104).to_i # Keep N weeks (default: 104 = 2 years)
# One-time export of entire DuckDB to Parquet (bootstrap)
# Exports all data and organizes into week files
# Memory-efficient: processes one week at a time with new connections
def export_all_to_parquet
ensure_parquet_directories
# Get date range first, then close connection
min_time, max_time = with_connection do |conn|
result = conn.query("SELECT MIN(timestamp) as min_time, MAX(timestamp) as max_time FROM events")
row = result.first
return unless row && row[0] && row[1]
[Time.parse(row[0].to_s), Time.parse(row[1].to_s)]
end
Rails.logger.info "[Parquet] Exporting all events from #{min_time} to #{max_time}"
# Export week by week with separate connections (more memory efficient)
current_week_start = min_time.beginning_of_week
weeks_exported = 0
while current_week_start <= max_time
week_end = current_week_start.end_of_week
year = current_week_start.year
week_num = current_week_start.strftime("%U").to_i
week_file = PARQUET_WEEKS_PATH.join("#{year}-#{week_num.to_s.rjust(2, '0')}.parquet")
# Skip if week file already exists
unless File.exist?(week_file)
Rails.logger.info "[Parquet] Exporting week #{year}-#{week_num} (#{current_week_start} to #{week_end})"
# Use separate connection per week to limit memory usage
with_connection do |conn|
# COPY directly without ORDER BY to save memory
# Parquet files can be sorted during queries if needed
conn.execute(<<~SQL)
COPY (
SELECT * FROM events
WHERE timestamp >= '#{current_week_start.iso8601}'
AND timestamp < '#{week_end.iso8601}'
) TO '#{week_file}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 100000)
SQL
end
weeks_exported += 1
# Force garbage collection after each week to free memory
GC.start
end
current_week_start += 1.week
end
Rails.logger.info "[Parquet] Bootstrap complete: exported #{weeks_exported} weeks"
rescue StandardError => e
Rails.logger.error "[Parquet] Error in bootstrap export: #{e.message}"
Rails.logger.error e.backtrace.join("\n")
raise
end
# Consolidate completed hour's minute files into hour file
# @param time [Time] The hour to consolidate
def consolidate_hour_to_day(time)
ensure_parquet_directories
hour = time.utc.hour
day_of_year = time.utc.yday
hour_file = PARQUET_HOURS_PATH.join("#{hour.to_s.rjust(2, '0')}.parquet")
hour_temp_file = PARQUET_HOURS_PATH.join("#{hour.to_s.rjust(2, '0')}.parquet.temp")
day_file = PARQUET_DAYS_PATH.join("#{day_of_year.to_s.rjust(3, '0')}.parquet")
day_temp_file = PARQUET_DAYS_PATH.join("#{day_of_year.to_s.rjust(3, '0')}.parquet.temp")
# Find all minute files from previous hour
hour_prefix = time.utc.strftime("%Y%m%d%H")
minute_files = Dir.glob(PARQUET_MINUTE_PATH.join("#{hour_prefix}*.parquet"))
if minute_files.empty?
Rails.logger.info "[Parquet] No minute files found for hour #{hour_prefix}"
return
end
with_connection do |conn|
Rails.logger.info "[Parquet] Consolidating #{minute_files.size} minute files from hour #{hour} into day #{day_of_year}"
# Merge minute files into hour file using .temp
file_list = minute_files.map { |f| "'#{f}'" }.join(", ")
conn.execute(<<~SQL)
COPY (
SELECT * FROM read_parquet([#{file_list}])
ORDER BY timestamp DESC
) TO '#{hour_temp_file}' (FORMAT PARQUET, COMPRESSION ZSTD)
SQL
# Atomic rename
FileUtils.mv(hour_temp_file, hour_file, force: true)
# Now merge hour file into day file
if File.exist?(day_file)
# Merge hour data into existing day file
conn.execute(<<~SQL)
COPY (
SELECT * FROM read_parquet(['#{day_file}', '#{hour_file}'])
ORDER BY timestamp DESC
) TO '#{day_temp_file}' (FORMAT PARQUET, COMPRESSION ZSTD)
SQL
# Replace old day file with merged file
FileUtils.mv(day_temp_file, day_file, force: true)
# Delete hour file after merging into day
File.delete(hour_file)
else
# First hour of the day - just rename hour file to day file
FileUtils.mv(hour_file, day_file)
end
# Delete the minute files after successful consolidation
minute_files.each { |f| File.delete(f) }
Rails.logger.info "[Parquet] Consolidated #{minute_files.size} minute files into #{day_file}, deleted source files"
end
rescue StandardError => e
Rails.logger.error "[Parquet] Error consolidating hour: #{e.message}"
Rails.logger.error e.backtrace.join("\n")
raise
end
# Consolidate completed week into archive
# @param week_start [Time] The start of the week to consolidate
def consolidate_days_to_week(week_start)
ensure_parquet_directories
year = week_start.year
week_num = week_start.strftime("%U").to_i # Week number (00-53)
week_file = PARQUET_WEEKS_PATH.join("#{year}-#{week_num.to_s.rjust(2, '0')}.parquet")
week_temp_file = PARQUET_WEEKS_PATH.join("#{year}-#{week_num.to_s.rjust(2, '0')}.parquet.temp")
# Collect day files for this week (7 days)
day_files = (0..6).map do |offset|
day = week_start + offset.days
day_of_year = day.yday
PARQUET_DAYS_PATH.join("#{day_of_year.to_s.rjust(3, '0')}.parquet")
end.select { |f| File.exist?(f) }
return if day_files.empty?
with_connection do |conn|
Rails.logger.info "[Parquet] Consolidating #{day_files.size} days into week #{year}-#{week_num}"
# Merge all day files into week archive using .temp
file_list = day_files.map { |f| "'#{f}'" }.join(", ")
conn.execute(<<~SQL)
COPY (
SELECT * FROM read_parquet([#{file_list}])
ORDER BY timestamp DESC
) TO '#{week_temp_file}' (FORMAT PARQUET, COMPRESSION ZSTD)
SQL
# Atomic rename
FileUtils.mv(week_temp_file, week_file, force: true)
# Delete day files after successful consolidation
day_files.each { |f| File.delete(f) }
Rails.logger.info "[Parquet] Consolidated week #{year}-#{week_num}, deleted #{day_files.size} day files"
end
# Cleanup old weeks
cleanup_old_weeks
rescue StandardError => e
Rails.logger.error "[Parquet] Error consolidating week: #{e.message}"
Rails.logger.error e.backtrace.join("\n")
raise
end
# Build list of Parquet files to query for a given time range
# @param start_time [Time] Start of query range
# @param end_time [Time] End of query range (defaults to now)
# @return [Array<String>] List of Parquet file paths
def parquet_files_for_range(start_time, end_time = Time.current)
files = []
# Add minute files (most recent, not yet consolidated)
minute_files = Dir.glob(PARQUET_MINUTE_PATH.join("*.parquet"))
files.concat(minute_files)
# Add hour files (consolidated but not yet in day files)
hour_files = Dir.glob(PARQUET_HOURS_PATH.join("*.parquet"))
files.concat(hour_files)
# Add relevant day files
day_files = Dir.glob(PARQUET_DAYS_PATH.join("*.parquet"))
files.concat(day_files)
# Add relevant week files based on time range
# For simplicity, include all weeks (DuckDB will filter)
week_files = Dir.glob(PARQUET_WEEKS_PATH.join("*.parquet"))
files.concat(week_files)
files.sort
end
# Query Parquet files using in-memory DuckDB (no file locks)
# @param block [Block] Block that receives DuckDB connection
def with_parquet_connection(&block)
# Open in-memory DuckDB database (no file locks)
db = DuckDB::Database.open(":memory:")
conn = db.connect
yield conn
ensure
conn&.close
db&.close
end
# Cleanup old week archives beyond retention period
def cleanup_old_weeks
week_files = Dir.glob(PARQUET_WEEKS_PATH.join("*.parquet")).sort.reverse
if week_files.size > WEEK_RETENTION
files_to_delete = week_files[WEEK_RETENTION..-1]
files_to_delete.each do |file|
File.delete(file)
Rails.logger.info "[Parquet] Deleted old week archive: #{file}"
end
end
end
private
# Ensure Parquet directory structure exists
def ensure_parquet_directories
[PARQUET_MINUTE_PATH, PARQUET_HOURS_PATH, PARQUET_DAYS_PATH, PARQUET_WEEKS_PATH].each do |path|
FileUtils.mkdir_p(path) unless Dir.exist?(path)
end
end
end

View File

@@ -173,6 +173,7 @@ class BotNetworkRangeImporter
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = true
http.read_timeout = 30
http.verify_mode = OpenSSL::SSL::VERIFY_NONE if uri.scheme == 'https'
response = http.get(uri.request_uri)
raise ImportError, "Failed to fetch AWS IP ranges: #{response.code}" unless response.code == '200'
@@ -223,7 +224,7 @@ class BotNetworkRangeImporter
puts "Amazon AWS import completed: #{imported_count} ranges imported"
{ imported: imported_count, source: 'Amazon AWS' }
rescue Net::TimeoutError, Net::OpenTimeout => e
rescue Timeout::Error, Net::OpenTimeout => e
raise ImportError, "Network timeout while fetching AWS ranges: #{e.message}"
rescue JSON::ParserError => e
raise ImportError, "Failed to parse AWS JSON response: #{e.message}"
@@ -341,6 +342,7 @@ class BotNetworkRangeImporter
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = true
http.read_timeout = 30
http.verify_mode = OpenSSL::SSL::VERIFY_NONE if uri.scheme == 'https'
response = http.get(uri.request_uri)
raise ImportError, "Failed to fetch OpenAI IP ranges: #{response.code}" unless response.code == '200'
@@ -353,12 +355,15 @@ class BotNetworkRangeImporter
# Determine crawler type from source name
crawler_type = source[:name].gsub('OpenAI ', '').downcase
data.each do |entry|
# OpenAI provides IP ranges as either CIDR notation or single IPs
ip_range = entry['cidr'] || entry['ip_prefix'] || entry['ip']
# Handle different OpenAI JSON formats
prefixes = data['prefixes'] || data
prefixes.each do |entry|
# OpenAI provides IP ranges as ipv4Prefix/ipv6Prefix or cidr/ip_prefix
ip_range = entry['ipv4Prefix'] || entry['ipv6Prefix'] || entry['cidr'] || entry['ip_prefix'] || entry['ip']
next unless ip_range
# Convert single IPs to /32
# Convert single IPs to /32 or /128
network = ip_range.include?('/') ? ip_range : "#{ip_range}/32"
network_range = {
@@ -396,7 +401,7 @@ class BotNetworkRangeImporter
puts "OpenAI #{crawler_type} import completed: #{imported_count} ranges imported"
{ imported: imported_count, source: "OpenAI #{crawler_type}" }
rescue Net::TimeoutError, Net::OpenTimeout => e
rescue Timeout::Error, Net::OpenTimeout => e
raise ImportError, "Network timeout while fetching OpenAI #{crawler_type} ranges: #{e.message}"
rescue JSON::ParserError => e
raise ImportError, "Failed to parse OpenAI #{crawler_type} JSON response: #{e.message}"
@@ -483,7 +488,8 @@ class BotNetworkRangeImporter
raise ImportError, "Failed to fetch Cloudflare ranges: #{response.code}" unless response.code == '200'
# Cloudflare provides plain text CIDR lists
lines = response.body.split("\n")
# Handle both newline-separated and single-line formats
lines = response.body.include?("\n") ? response.body.split("\n") : response.body.split
ip_version = url.include?('ips-v4') ? 4 : 6
lines.each do |line|

View File

@@ -0,0 +1,116 @@
# frozen_string_literal: true
# EventTagger - Service for applying tags to events
#
# Centralizes tagging logic to keep Event model focused on data management.
# Tags can come from multiple sources:
# 1. Agent-provided tags (from payload)
# 2. Matched rule tags (from rule.metadata['tags'])
# 3. Future: Policy-based tags, network intelligence tags, etc.
#
# Usage:
# EventTagger.tag_event(event) # Tag single event
# EventTagger.tag_batch(Event.where(...)) # Efficiently tag multiple events
# EventTagger.retag_for_rule(rule) # Retag all events for a specific rule
class EventTagger
# Tag a single event with rule tags
#
# @param event [Event] The event to tag
# @return [Array<String>] The final array of tags applied
def self.tag_event(event)
tags = []
# 1. Keep agent-provided tags (if any)
tags += event.payload&.dig("tags") || []
# 2. Add tags from matched rule (if any)
if event.rule_id.present?
rule = event.rule
tags += rule&.tags || []
end
# 3. Future: Add tags from policies, network intelligence, etc.
# tags += apply_policy_tags(event)
# tags += apply_network_tags(event)
# Deduplicate and update
final_tags = tags.uniq
event.update_column(:tags, final_tags)
final_tags
end
# Efficiently tag multiple events with preloaded rules
#
# @param events [ActiveRecord::Relation, Array<Event>] Events to tag
# @return [Integer] Number of events tagged
def self.tag_batch(events)
events = events.to_a if events.is_a?(ActiveRecord::Relation)
return 0 if events.empty?
# Preload rules to avoid N+1 queries
rule_ids = events.map(&:rule_id).compact.uniq
rules_by_id = Rule.where(id: rule_ids).index_by(&:id)
tagged_count = 0
events.each do |event|
tags = event.payload&.dig("tags") || []
# Add rule tags if rule exists
if event.rule_id && rules_by_id[event.rule_id]
tags += rules_by_id[event.rule_id].tags
end
# Update tags
event.update_column(:tags, tags.uniq)
tagged_count += 1
end
tagged_count
end
# Retag all events that matched a specific rule
# Useful when a rule's tags are updated
#
# @param rule [Rule] The rule whose events should be retagged
# @param limit [Integer] Maximum number of events to retag (default: no limit)
# @return [Integer] Number of events retagged
def self.retag_for_rule(rule, limit: nil)
events = Event.where(rule_id: rule.id)
events = events.limit(limit) if limit
tag_batch(events)
end
# Retag all events (useful for bulk migrations or fixes)
#
# @param batch_size [Integer] Number of events to process at once
# @return [Integer] Total number of events retagged
def self.retag_all(batch_size: 1000)
total = 0
Event.find_in_batches(batch_size: batch_size) do |batch|
total += tag_batch(batch)
Rails.logger.info "[EventTagger] Retagged #{total} events..."
end
total
end
private
# Future: Apply policy-based tags
# def self.apply_policy_tags(event)
# tags = []
# # Check if event matches any policy conditions
# # Add tags based on policy matches
# tags
# end
# Future: Apply network intelligence tags
# def self.apply_network_tags(event)
# tags = []
# # Add tags based on network_range attributes
# # e.g., ["datacenter", "vpn", "proxy", "country:US"]
# tags
# end
end

View File

@@ -185,7 +185,13 @@
<div class="bg-white shadow rounded-lg">
<div class="px-6 py-4 border-b border-gray-200">
<div class="flex items-center justify-between">
<h3 class="text-lg font-medium text-gray-900">Events Timeline (Last 24 Hours)</h3>
<h3 class="text-lg font-medium text-gray-900">Events Timeline (<%= case @time_period
when :hour then "Last Hour"
when :day then "Last 24 Hours"
when :week then "Last 7 Days"
when :month then "Last 30 Days"
else "Last 24 Hours"
end %>)</h3>
<span class="text-sm text-gray-500">Times shown in your local timezone</span>
</div>
</div>
@@ -381,7 +387,7 @@
<h3 class="text-lg font-medium text-gray-900">Quick Actions</h3>
</div>
<div class="p-6">
<div class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-4 gap-4">
<div class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-5 gap-4">
<%= link_to new_rule_path, class: "flex items-center justify-center px-4 py-3 bg-blue-600 text-white rounded-md hover:bg-blue-700 transition-colors" do %>
<svg class="w-5 h-5 mr-2" fill="currentColor" viewBox="0 0 24 24">
<path d="M19 13h-6v6h-2v-6H5v-2h6V5h2v6h6v2z"/>
@@ -393,17 +399,24 @@
<svg class="w-5 h-5 mr-2" fill="currentColor" viewBox="0 0 24 24">
<path d="M19 13h-6v6h-2v-6H5v-2h6V5h2v6h6v2z"/>
</svg>
Add Network Range
Add Network
<% end %>
<%= link_to events_path, class: "flex items-center justify-center px-4 py-3 bg-purple-600 text-white rounded-md hover:bg-purple-700 transition-colors" do %>
<%= link_to analytics_networks_path, class: "flex items-center justify-center px-4 py-3 bg-purple-600 text-white rounded-md hover:bg-purple-700 transition-colors" do %>
<svg class="w-5 h-5 mr-2" fill="currentColor" viewBox="0 0 24 24">
<path d="M12 2l3.09 6.26L22 9.27l-5 4.87 1.18 6.88L12 17.77l-6.18 3.25L7 14.14 2 9.27l6.91-1.01L12 2z"/>
</svg>
Network Analytics
<% end %>
<%= link_to events_path, class: "flex items-center justify-center px-4 py-3 bg-orange-600 text-white rounded-md hover:bg-orange-700 transition-colors" do %>
<svg class="w-5 h-5 mr-2" fill="currentColor" viewBox="0 0 24 24">
<path d="M12 2C6.48 2 2 6.48 2 12s4.48 10 10 10 10-4.48 10-10S17.52 2 12 2zm1 15h-2v-2h2v2zm0-4h-2V7h2v6z"/>
</svg>
View Events
<% end %>
<%= link_to rules_path, class: "flex items-center justify-center px-4 py-3 bg-orange-600 text-white rounded-md hover:bg-orange-700 transition-colors" do %>
<%= link_to rules_path, class: "flex items-center justify-center px-4 py-3 bg-gray-600 text-white rounded-md hover:bg-gray-700 transition-colors" do %>
<svg class="w-5 h-5 mr-2" fill="currentColor" viewBox="0 0 24 24">
<path d="M12 1L3 5v6c0 5.55 3.84 10.74 9 12 5.16-1.26 9-6.45 9-12V5l-9-4z"/>
</svg>

View File

@@ -25,7 +25,7 @@
<div>
<%= form.label :waf_action, "Action", class: "block text-sm font-medium text-gray-700" %>
<%= form.select :waf_action,
options_for_select([['All', ''], ['Allow', 'allow'], ['Deny', 'deny'], ['Redirect', 'redirect'], ['Challenge', 'challenge'], ['Add Header', 'add_header']], params[:waf_action]),
options_for_select([['All', ''], ['Allow', 'allow'], ['Deny', 'deny'], ['Redirect', 'redirect'], ['Challenge', 'challenge'], ['Log', 'log']], params[:waf_action]),
{ }, { class: "mt-1 block w-full rounded-md border-gray-300 shadow-sm focus:border-blue-500 focus:ring-blue-500 sm:text-sm" } %>
</div>
<div>
@@ -77,6 +77,20 @@
placeholder: "e.g., 192.168.1.0/24" %>
</div>
</div>
<!-- Bot Filtering -->
<div class="mt-4 flex items-center">
<div class="flex items-center h-5">
<%= form.check_box :exclude_bots,
{ checked: params[:exclude_bots] == "true", class: "h-4 w-4 text-blue-600 focus:ring-blue-500 border-gray-300 rounded" },
"true", "false" %>
</div>
<div class="ml-3 text-sm">
<%= form.label :exclude_bots, class: "font-medium text-gray-700" do %>
Human Traffic Only
<span class="font-normal text-gray-500">(Exclude known bots and crawlers)</span>
<% end %>
</div>
</div>
</div>
<% end %>
</div>

View File

@@ -159,27 +159,6 @@
</div>
</div>
<!-- Add Header Fields (shown for add_header action) -->
<div id="add_header_section" class="hidden space-y-4" data-rule-form-target="addHeaderSection">
<div>
<%= label_tag :header_name, "Header Name", class: "block text-sm font-medium text-gray-700" %>
<%= text_field_tag :header_name, "",
class: "mt-1 block w-full rounded-md border-gray-300 shadow-sm focus:border-blue-500 focus:ring-blue-500 sm:text-sm",
placeholder: "X-Bot-Agent",
id: "header_name_input" %>
<p class="mt-2 text-sm text-gray-500">The HTTP header name to add (e.g., X-Bot-Agent, X-Network-Type)</p>
</div>
<div>
<%= label_tag :header_value, "Header Value", class: "block text-sm font-medium text-gray-700" %>
<%= text_field_tag :header_value, "",
class: "mt-1 block w-full rounded-md border-gray-300 shadow-sm focus:border-blue-500 focus:ring-blue-500 sm:text-sm",
placeholder: "BingBot",
id: "header_value_input" %>
<p class="mt-2 text-sm text-gray-500">The value for the header (e.g., BingBot, GoogleBot, Unknown)</p>
</div>
</div>
<!-- Metadata -->
<div data-controller="json-validator" data-json-validator-valid-class="json-valid" data-json-validator-invalid-class="json-invalid" data-json-validator-valid-status-class="json-valid-status" data-json-validator-invalid-status-class="json-invalid-status">
<%= form.label :metadata, "Metadata", class: "block text-sm font-medium text-gray-700" %>

View File

@@ -3,6 +3,7 @@
# If running the rails server then create or migrate existing database
if [ "${@: -2:1}" == "./bin/rails" ] && [ "${@: -1:1}" == "server" ]; then
./bin/rails db:prepare
./bin/rails ducklake:setup
fi
exec "${@}"

View File

@@ -55,6 +55,7 @@ production:
database: baffle_hub_production
username: baffle_hub
password: <%= ENV["BAFFLE_HUB_DATABASE_PASSWORD"] %>
pool: 80
cache:
<<: *sqlite_default
database: storage/production_cache.sqlite3

View File

@@ -0,0 +1,6 @@
# frozen_string_literal: true
# Configure DeviceDetector cache
# Default is 5,000 entries - we increase to 10,000 for better hit rate
# Memory usage: ~1-2MB for 10k cached user agents
DeviceDetector.config.max_cache_keys = 10_000

View File

@@ -140,7 +140,7 @@ end
# Add application-specific context
app_version = begin
File.read(Rails.root.join('VERSION')).strip
BaffleHub::VERSION
rescue
ENV['APP_VERSION'] || ENV['GIT_COMMIT_SHA']&.[](0..7) || 'unknown'
end

View File

@@ -0,0 +1,5 @@
# frozen_string_literal: true
module BaffleHub
VERSION = "0.4.0"
end

View File

@@ -4,8 +4,8 @@ default: &default
batch_size: 500
workers:
- queues: "*"
threads: 3
processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %>
threads: <%= ENV.fetch("JOB_THREADS", 3) %>
processes: <%= ENV.fetch("JOB_PROCESSES", 1) %>
polling_interval: 0.1
development:

View File

@@ -30,8 +30,30 @@ cleanup_old_events:
queue: background
schedule: every hour
# Sync events from PostgreSQL to DuckDB for fast analytics
sync_events_to_duckdb:
class: SyncEventsToDuckdbJob
# Export events from PostgreSQL to DuckLake for fast analytics
export_events_to_ducklake:
class: ExportEventsToDucklakeJob
queue: default
schedule: every 1 minutes
# Merge DuckLake files and clean up immediately after
merge_ducklake_files:
class: MergeDucklakeFilesJob
queue: background
schedule: every 15 minutes
# OLD PARQUET SYSTEM (DISABLED - using DuckLake now)
# export_events_to_parquet:
# class: ExportEventsToParquetJob
# queue: default
# schedule: every 1 minutes
#
# consolidate_parquet_hourly:
# class: ConsolidateParquetHourlyJob
# queue: default
# schedule: "5 * * * *" # At 5 minutes past every hour
#
# consolidate_parquet_weekly:
# class: ConsolidateParquetWeeklyJob
# queue: default
# schedule: "5 0 * * 1" # Monday at 00:05

View File

@@ -0,0 +1,6 @@
class AddIsBotToEvents < ActiveRecord::Migration[8.1]
def change
add_column :events, :is_bot, :boolean, default: false, null: false
add_index :events, :is_bot
end
end

View File

@@ -0,0 +1,39 @@
# frozen_string_literal: true
# Migrate add_header rules to use allow action with tags/headers in metadata
#
# Old pattern:
# waf_action: add_header (5)
# metadata: { header_name: "X-Bot-Agent", header_value: "googlebot" }
#
# New pattern:
# waf_action: allow (1)
# metadata: {
# headers: { "X-Bot-Agent" => "googlebot" },
# tags: ["bot:googlebot"]
# }
#
class MigrateAddHeaderRulesToAllowWithTags < ActiveRecord::Migration[8.1]
def up
# Change all add_header (5) rules to allow (1)
# Keep metadata as-is for now - will be handled by Rule helper methods
execute <<-SQL
UPDATE rules
SET waf_action = 1 -- allow
WHERE waf_action = 5 -- add_header
SQL
end
def down
# This rollback is conservative - only revert rules that clearly came from add_header
# (have header_name/header_value in metadata but not headers)
execute <<-SQL
UPDATE rules
SET waf_action = 5 -- add_header
WHERE waf_action = 1 -- allow
AND metadata ? 'header_name'
AND metadata ? 'header_value'
AND NOT metadata ? 'headers'
SQL
end
end

View File

@@ -0,0 +1,67 @@
# frozen_string_literal: true
class RemovePostgresEventsCountFromNetworkRanges < ActiveRecord::Migration[8.1]
def up
# Drop triggers first
execute <<-SQL
DROP TRIGGER IF EXISTS update_network_ranges_events_count_after_insert ON events;
DROP TRIGGER IF EXISTS update_network_ranges_events_count_after_delete ON events;
DROP FUNCTION IF EXISTS update_network_range_events_count();
SQL
# Remove index and column
remove_index :network_ranges, :events_count
remove_column :network_ranges, :events_count
end
def down
# Add column back (for rollback)
add_column :network_ranges, :events_count, :integer, null: false, default: 0
add_index :network_ranges, :events_count
# Recreate trigger function
execute <<-SQL
CREATE OR REPLACE FUNCTION update_network_range_events_count()
RETURNS TRIGGER AS $$
BEGIN
-- Update all network ranges that contain IP address
UPDATE network_ranges
SET events_count = events_count +
CASE
WHEN TG_OP = 'INSERT' THEN 1
WHEN TG_OP = 'DELETE' THEN -1
ELSE 0
END
WHERE network >>= NEW.ip_address::inet;
RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;
SQL
# Recreate triggers
execute <<-SQL
CREATE TRIGGER update_network_ranges_events_count_after_insert
AFTER INSERT ON events
FOR EACH ROW
EXECUTE FUNCTION update_network_range_events_count();
SQL
execute <<-SQL
CREATE TRIGGER update_network_ranges_events_count_after_delete
AFTER DELETE ON events
FOR EACH ROW
EXECUTE FUNCTION update_network_range_events_count();
SQL
# Backfill existing counts
execute <<-SQL
UPDATE network_ranges
SET events_count = (
SELECT COUNT(*)
FROM events
WHERE events.ip_address <<= network_ranges.network
);
SQL
end
end

View File

@@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema[8.1].define(version: 2025_11_16_025003) do
ActiveRecord::Schema[8.1].define(version: 2025_12_02_070000) do
# These are extensions that must be enabled in order to support this database
enable_extension "pg_catalog.plpgsql"
@@ -80,6 +80,7 @@ ActiveRecord::Schema[8.1].define(version: 2025_11_16_025003) do
t.datetime "created_at", null: false
t.string "environment"
t.inet "ip_address"
t.boolean "is_bot", default: false, null: false
t.boolean "is_datacenter", default: false, null: false
t.boolean "is_proxy", default: false, null: false
t.boolean "is_vpn", default: false, null: false
@@ -105,6 +106,7 @@ ActiveRecord::Schema[8.1].define(version: 2025_11_16_025003) do
t.index ["company"], name: "index_events_on_company"
t.index ["country"], name: "index_events_on_country"
t.index ["ip_address"], name: "index_events_on_ip_address"
t.index ["is_bot"], name: "index_events_on_is_bot"
t.index ["is_datacenter", "is_vpn", "is_proxy"], name: "index_events_on_network_flags"
t.index ["network_range_id"], name: "index_events_on_network_range_id"
t.index ["request_host_id", "request_method", "request_segment_ids"], name: "idx_events_host_method_path"
@@ -126,7 +128,6 @@ ActiveRecord::Schema[8.1].define(version: 2025_11_16_025003) do
t.string "country"
t.datetime "created_at", null: false
t.text "creation_reason"
t.integer "events_count", default: 0, null: false
t.boolean "is_datacenter", default: false
t.boolean "is_proxy", default: false
t.boolean "is_vpn", default: false
@@ -141,7 +142,6 @@ ActiveRecord::Schema[8.1].define(version: 2025_11_16_025003) do
t.index ["asn_org"], name: "index_network_ranges_on_asn_org"
t.index ["company"], name: "index_network_ranges_on_company"
t.index ["country"], name: "index_network_ranges_on_country"
t.index ["events_count"], name: "index_network_ranges_on_events_count"
t.index ["is_datacenter", "is_proxy", "is_vpn"], name: "idx_network_flags"
t.index ["is_datacenter"], name: "index_network_ranges_on_is_datacenter"
t.index ["network"], name: "index_network_ranges_on_network", opclass: :inet_ops, using: :gist

127
lib/tasks/duckdb.rake Normal file
View File

@@ -0,0 +1,127 @@
# frozen_string_literal: true
namespace :duckdb do
desc "Rebuild DuckDB analytics database from scratch"
task rebuild: :environment do
puts "=" * 80
puts "DuckDB Rebuild"
puts "=" * 80
puts
duckdb_path = Rails.root.join("storage", "analytics.duckdb")
# Step 1: Check if DuckDB exists
if File.exist?(duckdb_path)
puts "🗑️ Deleting existing DuckDB database..."
File.delete(duckdb_path)
puts " ✅ Deleted: #{duckdb_path}"
puts
else
puts " No existing DuckDB database found"
puts
end
# Step 2: Rebuild from PostgreSQL
puts "🔨 Rebuilding DuckDB from PostgreSQL events..."
puts
start_time = Time.current
begin
SyncEventsToDuckdbJob.perform_now
duration = Time.current - start_time
# Step 3: Verify the rebuild
event_count = AnalyticsDuckdbService.instance.event_count
bot_count = AnalyticsDuckdbService.instance.with_connection do |conn|
result = conn.query("SELECT COUNT(*) FROM events WHERE is_bot = true")
result.first&.first || 0
end
puts "=" * 80
puts "✅ DuckDB Rebuild Complete!"
puts "=" * 80
puts " Duration: #{duration.round(2)}s"
puts " Total events synced: #{event_count}"
puts " Bot events: #{bot_count} (#{(bot_count.to_f / event_count * 100).round(1)}%)" if event_count > 0
puts " Human events: #{event_count - bot_count} (#{((event_count - bot_count).to_f / event_count * 100).round(1)}%)" if event_count > 0
puts
puts "📂 Database location: #{duckdb_path}"
puts "📊 Database size: #{File.size(duckdb_path) / 1024.0 / 1024.0}MB"
puts
rescue => e
puts "❌ Error rebuilding DuckDB: #{e.message}"
puts e.backtrace.first(5).join("\n")
exit 1
end
end
desc "Show DuckDB statistics"
task stats: :environment do
duckdb_path = Rails.root.join("storage", "analytics.duckdb")
unless File.exist?(duckdb_path)
puts "❌ DuckDB database not found at: #{duckdb_path}"
exit 1
end
puts "=" * 80
puts "DuckDB Statistics"
puts "=" * 80
puts
total = AnalyticsDuckdbService.instance.event_count
AnalyticsDuckdbService.instance.with_connection do |conn|
# Bot breakdown
result = conn.query(<<~SQL)
SELECT
is_bot,
COUNT(*) as event_count,
COUNT(DISTINCT ip_address) as unique_ips
FROM events
GROUP BY is_bot
SQL
puts "📊 Bot Traffic Breakdown:"
result.each do |row|
type = row[0] ? "🤖 Bots" : "👤 Humans"
count = row[1]
ips = row[2]
percentage = (count.to_f / total * 100).round(1)
puts " #{type}: #{count} events (#{percentage}%) from #{ips} unique IPs"
end
puts
# Date range
range_result = conn.query("SELECT MIN(timestamp), MAX(timestamp) FROM events")
min_ts, max_ts = range_result.first
puts "📅 Date Range:"
puts " Oldest event: #{min_ts}"
puts " Newest event: #{max_ts}"
puts
# Database info
puts "💾 Database Info:"
puts " Location: #{duckdb_path}"
puts " Size: #{(File.size(duckdb_path) / 1024.0 / 1024.0).round(2)}MB"
puts " Total events: #{total}"
puts
end
end
desc "Sync new events from PostgreSQL to DuckDB"
task sync: :environment do
puts "🔄 Syncing events from PostgreSQL to DuckDB..."
start_time = Time.current
begin
SyncEventsToDuckdbJob.perform_now
duration = Time.current - start_time
puts "✅ Sync complete in #{duration.round(2)}s"
rescue => e
puts "❌ Error syncing: #{e.message}"
exit 1
end
end
end

View File

@@ -0,0 +1,152 @@
#!/usr/bin/env ruby
# frozen_string_literal: true
# One-time backfill script to populate new columns in existing DuckDB events
# This uses DuckDB's JOIN-based UPDATE for maximum performance
require 'csv'
require 'tempfile'
puts "DuckDB Column Backfill Script (JOIN-based UPDATE)"
puts "=" * 60
puts "This will update existing DuckDB events with data from PostgreSQL"
puts "using a fast JOIN-based approach"
puts
BATCH_SIZE = 50_000
AnalyticsDuckdbService.instance.with_connection do |conn|
# Get total events in DuckDB
puts "Step 1: Counting events to backfill..."
result = conn.query("SELECT COUNT(*) FROM events WHERE request_method IS NULL")
total_to_backfill = result.first&.first || 0
result = conn.query("SELECT COUNT(*) FROM events")
total_events = result.first&.first || 0
puts " Total events in DuckDB: #{total_events}"
puts " Events needing backfill: #{total_to_backfill}"
if total_to_backfill == 0
puts "\n✓ All events already have new columns populated!"
exit 0
end
# Get min and max event IDs in DuckDB
result = conn.query("SELECT MIN(id), MAX(id) FROM events WHERE request_method IS NULL")
min_id, max_id = result.first
puts " ID range to backfill: #{min_id} to #{max_id}"
puts "\nStep 2: Exporting PostgreSQL data in batches..."
current_id = min_id
batch_num = 0
total_updated = 0
# Create temporary CSV file for data transfer
temp_csv = Tempfile.new(['events_backfill', '.csv'])
begin
CSV.open(temp_csv.path, 'w') do |csv|
# Header
csv << ['id', 'request_method', 'response_status', 'rule_id']
while current_id <= max_id
batch_num += 1
batch_end_id = [current_id + BATCH_SIZE - 1, max_id].min
print " Batch #{batch_num}: Exporting IDs #{current_id}-#{batch_end_id}..."
# Fetch from PostgreSQL
pg_events = Event.where("id >= ? AND id <= ?", current_id, batch_end_id)
.select(:id, :request_method, :response_status, :rule_id)
count = 0
pg_events.find_each do |event|
csv << [
event.id,
event.request_method,
event.response_status,
event.rule_id
]
count += 1
end
puts " #{count} events"
current_id = batch_end_id + 1
end
end
temp_csv.close
puts "\n✓ Exported to temporary CSV: #{temp_csv.path}"
puts " File size: #{(File.size(temp_csv.path) / 1024.0 / 1024.0).round(2)} MB"
puts "\nStep 3: Loading CSV into temporary DuckDB table..."
conn.execute("DROP TABLE IF EXISTS events_updates")
conn.execute(<<~SQL)
CREATE TABLE events_updates (
id BIGINT,
request_method INTEGER,
response_status INTEGER,
rule_id BIGINT
)
SQL
conn.execute(<<~SQL)
COPY events_updates FROM '#{temp_csv.path}' (FORMAT CSV, HEADER TRUE, NULL '')
SQL
result = conn.query("SELECT COUNT(*) FROM events_updates")
loaded_count = result.first&.first || 0
puts "✓ Loaded #{loaded_count} rows into temporary table"
puts "\nStep 4: Performing bulk UPDATE via JOIN..."
start_time = Time.current
# DuckDB's efficient UPDATE...FROM syntax
conn.execute(<<~SQL)
UPDATE events
SET
request_method = events_updates.request_method,
response_status = events_updates.response_status,
rule_id = events_updates.rule_id
FROM events_updates
WHERE events.id = events_updates.id
SQL
duration = Time.current - start_time
puts "✓ Bulk update complete in #{duration.round(2)}s!"
puts "\nStep 5: Cleaning up temporary table..."
conn.execute("DROP TABLE events_updates")
puts "✓ Temporary table dropped"
ensure
# Clean up temp file
temp_csv.unlink if temp_csv
end
puts "\nStep 6: Verifying backfill..."
result = conn.query("SELECT COUNT(*) FROM events WHERE request_method IS NOT NULL OR response_status IS NOT NULL OR rule_id IS NOT NULL")
filled_count = result.first&.first || 0
result = conn.query("SELECT COUNT(*) FROM events WHERE request_method IS NULL AND response_status IS NULL AND rule_id IS NULL")
still_null_count = result.first&.first || 0
puts " Events with new columns populated: #{filled_count}"
puts " Events still with NULL columns: #{still_null_count}"
if still_null_count > 0
puts "\n⚠ Note: #{still_null_count} events still have NULL values."
puts " This is normal if those events don't exist in PostgreSQL anymore"
puts " (they may have been cleaned up due to retention policy)"
else
puts "\n✓ Backfill complete! All events have new columns populated."
end
end
puts "\n" + "=" * 60
puts "Backfill complete!"
puts "\nNext steps:"
puts "1. Test the events index page to verify everything works"
puts "2. Monitor performance improvements from DuckDB queries"

View File

@@ -7,3 +7,7 @@ one:
two:
email_address: two@example.com
password_digest: <%= password_digest %>
jason:
email_address: jason@example.com
password_digest: <%= password_digest %>

View File

@@ -211,16 +211,51 @@ class NetworkRangeTest < ActiveSupport::TestCase
assert_equal @ipv4_range, children.first
end
test "sibling_ranges finds same-level networks" do
# Create sibling networks
sibling1 = NetworkRange.create!(network: "192.168.0.0/24")
@ipv4_range.save! # 192.168.1.0/24
sibling2 = NetworkRange.create!(network: "192.168.2.0/24")
test "child_ranges works with Apple network hierarchy - 17.240.0.0/14" do
# This test demonstrates the current bug in child_ranges method
# Expected: 17.240.0.0/14 should have parents but no children in this test setup
siblings = @ipv4_range.sibling_ranges
assert_includes siblings, sibling1
assert_includes siblings, sibling2
assert_not_includes siblings, @ipv4_range
# Create the target network
target_network = NetworkRange.create!(network: "17.240.0.0/14", source: "manual")
# Create parent networks
parent1 = NetworkRange.create!(network: "17.240.0.0/13", source: "manual") # Should contain 17.240.0.0/14
parent2 = NetworkRange.create!(network: "17.128.0.0/9", source: "manual") # Should also contain 17.240.0.0/14
# Create some child networks (more specific networks contained by 17.240.0.0/14)
child1 = NetworkRange.create!(network: "17.240.0.0/15", source: "manual") # First half of /14
child2 = NetworkRange.create!(network: "17.242.0.0/15", source: "manual") # Second half of /14
child3 = NetworkRange.create!(network: "17.240.0.0/16", source: "manual") # More specific
child4 = NetworkRange.create!(network: "17.241.0.0/16", source: "manual") # More specific
# Test parent_ranges works correctly
parents = target_network.parent_ranges
assert_includes parents, parent1, "17.240.0.0/13 should be a parent of 17.240.0.0/14"
assert_includes parents, parent2, "17.128.0.0/9 should be a parent of 17.240.0.0/14"
# Test child_ranges - this is currently failing due to the bug
children = target_network.child_ranges
assert_includes children, child1, "17.240.0.0/15 should be a child of 17.240.0.0/14"
assert_includes children, child2, "17.242.0.0/15 should be a child of 17.240.0.0/14"
assert_includes children, child3, "17.240.0.0/16 should be a child of 17.240.0.0/14"
assert_includes children, child4, "17.241.0.0/16 should be a child of 17.240.0.0/14"
assert_not_includes children, parent1, "Parent networks should not be in child_ranges"
assert_not_includes children, parent2, "Parent networks should not be in child_ranges"
assert_not_includes children, target_network, "Self should not be in child_ranges"
# Test that parent can find child in its child_ranges
parent1_children = parent1.child_ranges
assert_includes parent1_children, target_network, "17.240.0.0/14 should be in child_ranges of 17.240.0.0/13"
parent2_children = parent2.child_ranges
assert_includes parent2_children, target_network, "17.240.0.0/14 should be in child_ranges of 17.128.0.0/9"
# Test bidirectional consistency
assert target_network.parent_ranges.include?(parent1), "Parent should list child"
assert parent1.child_ranges.include?(target_network), "Child should list parent"
assert target_network.parent_ranges.include?(parent2), "Parent should list child"
assert parent2.child_ranges.include?(target_network), "Child should list parent"
end
# Intelligence and Inheritance
@@ -526,14 +561,23 @@ class NetworkRangeTest < ActiveSupport::TestCase
end
# Analytics Methods
test "events_count returns counter cache value" do
test "has_events? correctly detects if network has events" do
range = NetworkRange.create!(network: "192.168.1.0/24")
assert_equal 0, range.events_count
assert_equal false, range.has_events?
# Update counter cache manually for testing
range.update_column(:events_count, 5)
assert_equal 5, range.events_count
# Create a test event in this network
Event.create!(
request_id: "test-1",
ip_address: "192.168.1.100",
network_range: range,
waf_action: 1,
request_method: 0,
response_status: 200
)
# Should now detect events exist
assert_equal true, range.has_events?
end
test "events method finds events within range" do

View File

@@ -202,4 +202,95 @@ class RuleTest < ActiveSupport::TestCase
assert_equal 8, format[:priority]
assert_equal true, format[:enabled]
end
# Tag functionality tests
test "should store and retrieve tags in metadata" do
network_range = NetworkRange.create!(cidr: "10.0.0.0/8")
rule = Rule.create!(
waf_rule_type: "network",
waf_action: "allow",
network_range: network_range,
metadata: { tags: ["bot:googlebot", "trusted"] },
user: users(:one)
)
assert_equal ["bot:googlebot", "trusted"], rule.tags
end
test "should add tag to rule" do
network_range = NetworkRange.create!(cidr: "10.0.0.0/8")
rule = Rule.create!(
waf_rule_type: "network",
waf_action: "allow",
network_range: network_range,
user: users(:one)
)
rule.add_tag("bot:googlebot")
rule.save!
assert_includes rule.tags, "bot:googlebot"
end
test "should remove tag from rule" do
network_range = NetworkRange.create!(cidr: "10.0.0.0/8")
rule = Rule.create!(
waf_rule_type: "network",
waf_action: "allow",
network_range: network_range,
metadata: { tags: ["bot:googlebot", "trusted"] },
user: users(:one)
)
rule.remove_tag("trusted")
rule.save!
assert_not_includes rule.tags, "trusted"
assert_includes rule.tags, "bot:googlebot"
end
test "should check if rule has tag" do
network_range = NetworkRange.create!(cidr: "10.0.0.0/8")
rule = Rule.create!(
waf_rule_type: "network",
waf_action: "allow",
network_range: network_range,
metadata: { tags: ["bot:googlebot"] },
user: users(:one)
)
assert rule.has_tag?("bot:googlebot")
assert_not rule.has_tag?("bot:bingbot")
end
test "should store headers in metadata" do
network_range = NetworkRange.create!(cidr: "10.0.0.0/8")
rule = Rule.create!(
waf_rule_type: "network",
waf_action: "allow",
network_range: network_range,
metadata: {
tags: ["bot:googlebot"],
headers: { "X-Bot-Agent" => "googlebot" }
},
user: users(:one)
)
assert_equal({ "X-Bot-Agent" => "googlebot" }, rule.headers)
end
test "should set tags via assignment" do
network_range = NetworkRange.create!(cidr: "10.0.0.0/8")
rule = Rule.create!(
waf_rule_type: "network",
waf_action: "allow",
network_range: network_range,
user: users(:one)
)
rule.tags = ["bot:bingbot", "network:microsoft"]
rule.save!
assert_equal ["bot:bingbot", "network:microsoft"], rule.tags
end
end