Compare commits
9 Commits
3eddfe9f7e
...
expand-geo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e53e782223 | ||
|
|
108caf2fe6 | ||
|
|
225d970123 | ||
|
|
a0ff0edb73 | ||
|
|
693851f664 | ||
|
|
032243ba6a | ||
|
|
1aa77066a1 | ||
|
|
f0ad3b2c90 | ||
|
|
54d9c3a0d9 |
@@ -27,7 +27,7 @@ RUN apt-get update -qq && \
|
|||||||
*) \
|
*) \
|
||||||
echo "Unsupported platform: $TARGETPLATFORM" && exit 1 ;; \
|
echo "Unsupported platform: $TARGETPLATFORM" && exit 1 ;; \
|
||||||
esac && \
|
esac && \
|
||||||
wget "https://github.com/duckdb/duckdb/releases/download/v1.4.2/libduckdb-linux-${DUCKDB_ARCH}.zip" -O /tmp/libduckdb.zip && \
|
wget "https://github.com/duckdb/duckdb/releases/download/v1.4.3/libduckdb-linux-${DUCKDB_ARCH}.zip" -O /tmp/libduckdb.zip && \
|
||||||
unzip /tmp/libduckdb.zip -d /tmp/duckdb && \
|
unzip /tmp/libduckdb.zip -d /tmp/duckdb && \
|
||||||
cp /tmp/duckdb/duckdb.h /tmp/duckdb/duckdb.hpp /usr/local/include/ && \
|
cp /tmp/duckdb/duckdb.h /tmp/duckdb/duckdb.hpp /usr/local/include/ && \
|
||||||
cp /tmp/duckdb/libduckdb.so /usr/local/lib/ && \
|
cp /tmp/duckdb/libduckdb.so /usr/local/lib/ && \
|
||||||
|
|||||||
@@ -28,17 +28,15 @@ class AnalyticsController < ApplicationController
|
|||||||
|
|
||||||
# Core statistics - cached (uses DuckDB if available)
|
# Core statistics - cached (uses DuckDB if available)
|
||||||
stat_start = Time.current
|
stat_start = Time.current
|
||||||
@total_events = Rails.cache.fetch("#{cache_key_base}/total_events", expires_in: cache_ttl) do
|
@total_events = BaffleDl.count_since(@start_time)
|
||||||
with_duckdb_fallback { EventDdb.count_since(@start_time) } ||
|
|
||||||
Event.where("timestamp >= ?", @start_time).count
|
|
||||||
end
|
|
||||||
Rails.logger.info "[Analytics Perf] Total events: #{((Time.current - stat_start) * 1000).round(1)}ms"
|
Rails.logger.info "[Analytics Perf] Total events: #{((Time.current - stat_start) * 1000).round(1)}ms"
|
||||||
|
|
||||||
@total_rules = Rails.cache.fetch("analytics/total_rules", expires_in: 5.minutes) do
|
@total_rules = Rails.cache.fetch("analytics/total_rules", expires_in: 5.minutes) do
|
||||||
Rule.enabled.count
|
Rule.enabled.count
|
||||||
end
|
end
|
||||||
|
|
||||||
@network_ranges_with_events = Rails.cache.fetch("analytics/network_ranges_with_events", expires_in: 5.minutes) do
|
@network_ranges_with_events = BaffleDl.count_network_ranges_with_events(@start_time) ||
|
||||||
|
Rails.cache.fetch("analytics/network_ranges_with_events", expires_in: 5.minutes) do
|
||||||
NetworkRange.with_events.count
|
NetworkRange.with_events.count
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -46,38 +44,24 @@ class AnalyticsController < ApplicationController
|
|||||||
NetworkRange.count
|
NetworkRange.count
|
||||||
end
|
end
|
||||||
|
|
||||||
# Event breakdown by action - cached (uses DuckDB if available)
|
# Event breakdown by action - use DuckDB directly for performance
|
||||||
stat_start = Time.current
|
stat_start = Time.current
|
||||||
@event_breakdown = Rails.cache.fetch("#{cache_key_base}/event_breakdown", expires_in: cache_ttl) do
|
@event_breakdown = Rails.cache.fetch("#{cache_key_base}/event_breakdown", expires_in: cache_ttl) do
|
||||||
with_duckdb_fallback { EventDdb.breakdown_by_action(@start_time) } ||
|
BaffleDl.breakdown_by_action(@start_time) || {}
|
||||||
Event.where("timestamp >= ?", @start_time)
|
|
||||||
.group(:waf_action)
|
|
||||||
.count
|
|
||||||
end
|
end
|
||||||
Rails.logger.info "[Analytics Perf] Event breakdown: #{((Time.current - stat_start) * 1000).round(1)}ms"
|
Rails.logger.info "[Analytics Perf] Event breakdown: #{((Time.current - stat_start) * 1000).round(1)}ms"
|
||||||
|
|
||||||
# Top countries by event count - cached (uses DuckDB if available)
|
# Top countries by event count - use DuckDB directly for performance
|
||||||
stat_start = Time.current
|
stat_start = Time.current
|
||||||
@top_countries = Rails.cache.fetch("#{cache_key_base}/top_countries", expires_in: cache_ttl) do
|
@top_countries = Rails.cache.fetch("#{cache_key_base}/top_countries", expires_in: cache_ttl) do
|
||||||
with_duckdb_fallback { EventDdb.top_countries(@start_time, 10) } ||
|
BaffleDl.top_countries(@start_time, 10) || []
|
||||||
Event.where("timestamp >= ? AND country IS NOT NULL", @start_time)
|
|
||||||
.group(:country)
|
|
||||||
.count
|
|
||||||
.sort_by { |_, count| -count }
|
|
||||||
.first(10)
|
|
||||||
end
|
end
|
||||||
Rails.logger.info "[Analytics Perf] Top countries: #{((Time.current - stat_start) * 1000).round(1)}ms"
|
Rails.logger.info "[Analytics Perf] Top countries: #{((Time.current - stat_start) * 1000).round(1)}ms"
|
||||||
|
|
||||||
# Top blocked IPs - cached (uses DuckDB if available)
|
# Top blocked IPs - use DuckDB directly for performance
|
||||||
stat_start = Time.current
|
stat_start = Time.current
|
||||||
@top_blocked_ips = Rails.cache.fetch("#{cache_key_base}/top_blocked_ips", expires_in: cache_ttl) do
|
@top_blocked_ips = Rails.cache.fetch("#{cache_key_base}/top_blocked_ips", expires_in: cache_ttl) do
|
||||||
with_duckdb_fallback { EventDdb.top_blocked_ips(@start_time, 10) } ||
|
BaffleDl.top_blocked_ips(@start_time, 10) || []
|
||||||
Event.where("timestamp >= ?", @start_time)
|
|
||||||
.where(waf_action: 0) # deny action in enum
|
|
||||||
.group(:ip_address)
|
|
||||||
.count
|
|
||||||
.sort_by { |_, count| -count }
|
|
||||||
.first(10)
|
|
||||||
end
|
end
|
||||||
Rails.logger.info "[Analytics Perf] Top blocked IPs: #{((Time.current - stat_start) * 1000).round(1)}ms"
|
Rails.logger.info "[Analytics Perf] Top blocked IPs: #{((Time.current - stat_start) * 1000).round(1)}ms"
|
||||||
|
|
||||||
@@ -135,8 +119,8 @@ class AnalyticsController < ApplicationController
|
|||||||
@time_period = params[:period]&.to_sym || :day
|
@time_period = params[:period]&.to_sym || :day
|
||||||
@start_time = calculate_start_time(@time_period)
|
@start_time = calculate_start_time(@time_period)
|
||||||
|
|
||||||
# Top networks by request volume - use DuckDB if available
|
# Top networks by request volume - use DuckLake if available
|
||||||
network_stats = with_duckdb_fallback { EventDdb.top_networks(@start_time, 50) }
|
network_stats = with_duckdb_fallback { BaffleDl.top_networks(@start_time, 50) }
|
||||||
|
|
||||||
if network_stats
|
if network_stats
|
||||||
# DuckDB path: array format [network_range_id, event_count, unique_ips]
|
# DuckDB path: array format [network_range_id, event_count, unique_ips]
|
||||||
@@ -200,24 +184,24 @@ class AnalyticsController < ApplicationController
|
|||||||
# Network type breakdown with traffic stats
|
# Network type breakdown with traffic stats
|
||||||
@network_breakdown = calculate_network_type_stats(@start_time)
|
@network_breakdown = calculate_network_type_stats(@start_time)
|
||||||
|
|
||||||
# Company breakdown for top traffic sources - use DuckDB if available
|
# Company breakdown for top traffic sources - use DuckLake if available
|
||||||
@top_companies = with_duckdb_fallback { EventDdb.top_companies(@start_time, 20) } ||
|
@top_companies = with_duckdb_fallback { BaffleDl.top_companies(@start_time, 20) } ||
|
||||||
Event.where("timestamp >= ? AND company IS NOT NULL", @start_time)
|
Event.where("timestamp >= ? AND company IS NOT NULL", @start_time)
|
||||||
.group(:company)
|
.group(:company)
|
||||||
.select("company, COUNT(*) as event_count, COUNT(DISTINCT ip_address) as unique_ips, COUNT(DISTINCT network_range_id) as network_count")
|
.select("company, COUNT(*) as event_count, COUNT(DISTINCT ip_address) as unique_ips, COUNT(DISTINCT network_range_id) as network_count")
|
||||||
.order("event_count DESC")
|
.order("event_count DESC")
|
||||||
.limit(20)
|
.limit(20)
|
||||||
|
|
||||||
# ASN breakdown - use DuckDB if available
|
# ASN breakdown - use DuckLake if available
|
||||||
@top_asns = with_duckdb_fallback { EventDdb.top_asns(@start_time, 15) } ||
|
@top_asns = with_duckdb_fallback { BaffleDl.top_asns(@start_time, 15) } ||
|
||||||
Event.where("timestamp >= ? AND asn IS NOT NULL", @start_time)
|
Event.where("timestamp >= ? AND asn IS NOT NULL", @start_time)
|
||||||
.group(:asn, :asn_org)
|
.group(:asn, :asn_org)
|
||||||
.select("asn, asn_org, COUNT(*) as event_count, COUNT(DISTINCT ip_address) as unique_ips, COUNT(DISTINCT network_range_id) as network_count")
|
.select("asn, asn_org, COUNT(*) as event_count, COUNT(DISTINCT ip_address) as unique_ips, COUNT(DISTINCT network_range_id) as network_count")
|
||||||
.order("event_count DESC")
|
.order("event_count DESC")
|
||||||
.limit(15)
|
.limit(15)
|
||||||
|
|
||||||
# Geographic breakdown - use DuckDB if available
|
# Geographic breakdown - use DuckLake if available
|
||||||
@top_countries = with_duckdb_fallback { EventDdb.top_countries_with_stats(@start_time, 15) } ||
|
@top_countries = with_duckdb_fallback { BaffleDl.top_countries_with_stats(@start_time, 15) } ||
|
||||||
Event.where("timestamp >= ? AND country IS NOT NULL", @start_time)
|
Event.where("timestamp >= ? AND country IS NOT NULL", @start_time)
|
||||||
.group(:country)
|
.group(:country)
|
||||||
.select("country, COUNT(*) as event_count, COUNT(DISTINCT ip_address) as unique_ips")
|
.select("country, COUNT(*) as event_count, COUNT(DISTINCT ip_address) as unique_ips")
|
||||||
@@ -257,19 +241,35 @@ class AnalyticsController < ApplicationController
|
|||||||
end
|
end
|
||||||
|
|
||||||
def prepare_chart_data_with_split_cache(cache_key_base, cache_ttl)
|
def prepare_chart_data_with_split_cache(cache_key_base, cache_ttl)
|
||||||
|
# Generate timeline based on selected time period
|
||||||
|
case @time_period
|
||||||
|
when :hour
|
||||||
|
# Show last 60 minutes for hour view
|
||||||
|
timeline_data = Rails.cache.fetch("#{cache_key_base}/chart_hourly", expires_in: 1.minute) do
|
||||||
|
# For hour view, show minute-by-minute data for the last hour
|
||||||
|
(0..59).map do |minutes_ago|
|
||||||
|
time_point = minutes_ago.minutes.ago
|
||||||
|
count = Event.where("timestamp >= ? AND timestamp < ?", time_point, time_point + 1.minute).count
|
||||||
|
{
|
||||||
|
time_iso: time_point.iso8601,
|
||||||
|
total: count
|
||||||
|
}
|
||||||
|
end.reverse
|
||||||
|
end
|
||||||
|
|
||||||
|
when :day
|
||||||
|
# Show last 24 hours (existing logic)
|
||||||
# Split timeline into historical (completed hours) and current (incomplete hour)
|
# Split timeline into historical (completed hours) and current (incomplete hour)
|
||||||
# Historical hours are cached for full TTL, current hour cached briefly for freshness
|
# Historical hours are cached for full TTL, current hour cached briefly for freshness
|
||||||
|
|
||||||
# Cache historical hours (1-23 hours ago) - these are complete and won't change
|
# Cache historical hours (1-23 hours ago) - these are complete and won't change
|
||||||
# No expiration - will stick around until evicted by cache store (uses DuckDB if available)
|
# Use DuckDB directly for performance, no PostgreSQL fallback
|
||||||
historical_timeline = Rails.cache.fetch("#{cache_key_base}/chart_historical") do
|
historical_timeline = Rails.cache.fetch("#{cache_key_base}/chart_historical", expires_in: 1.hour) do
|
||||||
historical_start = 23.hours.ago.beginning_of_hour
|
historical_start = 23.hours.ago.beginning_of_hour
|
||||||
current_hour_start = Time.current.beginning_of_hour
|
current_hour_start = Time.current.beginning_of_hour
|
||||||
|
|
||||||
events_by_hour = with_duckdb_fallback { EventDdb.hourly_timeline(historical_start, current_hour_start) } ||
|
# Use DuckDB directly - if it fails, we'll show empty data rather than slow PostgreSQL
|
||||||
Event.where("timestamp >= ? AND timestamp < ?", historical_start, current_hour_start)
|
events_by_hour = BaffleDl.hourly_timeline(historical_start, current_hour_start) || {}
|
||||||
.group("DATE_TRUNC('hour', timestamp)")
|
|
||||||
.count
|
|
||||||
|
|
||||||
(1..23).map do |hour_ago|
|
(1..23).map do |hour_ago|
|
||||||
hour_time = hour_ago.hours.ago.beginning_of_hour
|
hour_time = hour_ago.hours.ago.beginning_of_hour
|
||||||
@@ -278,7 +278,7 @@ class AnalyticsController < ApplicationController
|
|||||||
time_iso: hour_time.iso8601,
|
time_iso: hour_time.iso8601,
|
||||||
total: events_by_hour[hour_key] || 0
|
total: events_by_hour[hour_key] || 0
|
||||||
}
|
}
|
||||||
end
|
end.reverse
|
||||||
end
|
end
|
||||||
|
|
||||||
# Current hour (0 hours ago) - cache very briefly since it's actively accumulating
|
# Current hour (0 hours ago) - cache very briefly since it's actively accumulating
|
||||||
@@ -295,6 +295,30 @@ class AnalyticsController < ApplicationController
|
|||||||
# Combine current + historical for full 24-hour timeline
|
# Combine current + historical for full 24-hour timeline
|
||||||
timeline_data = [current_hour_data] + historical_timeline
|
timeline_data = [current_hour_data] + historical_timeline
|
||||||
|
|
||||||
|
when :week, :month
|
||||||
|
# Show daily data for week/month views
|
||||||
|
days_to_show = @time_period == :week ? 7 : 30
|
||||||
|
timeline_data = Rails.cache.fetch("#{cache_key_base}/chart_daily_#{days_to_show}", expires_in: cache_ttl) do
|
||||||
|
historical_start = days_to_show.days.ago.beginning_of_day
|
||||||
|
current_day_end = Time.current.end_of_day
|
||||||
|
|
||||||
|
# Use DuckDB for all data including current day (max 1 minute delay)
|
||||||
|
daily_events = BaffleDl.daily_timeline(historical_start, current_day_end) || {}
|
||||||
|
|
||||||
|
(0..days_to_show-1).map do |days_ago|
|
||||||
|
day_time = days_ago.days.ago.beginning_of_day
|
||||||
|
{
|
||||||
|
time_iso: day_time.iso8601,
|
||||||
|
total: daily_events[day_time] || 0
|
||||||
|
}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
else
|
||||||
|
# Default to 24 hours
|
||||||
|
timeline_data = []
|
||||||
|
end
|
||||||
|
|
||||||
# Action distribution and other chart data (cached with main cache)
|
# Action distribution and other chart data (cached with main cache)
|
||||||
other_chart_data = Rails.cache.fetch("#{cache_key_base}/chart_metadata", expires_in: cache_ttl) do
|
other_chart_data = Rails.cache.fetch("#{cache_key_base}/chart_metadata", expires_in: cache_ttl) do
|
||||||
action_distribution = @event_breakdown.map do |action, count|
|
action_distribution = @event_breakdown.map do |action, count|
|
||||||
@@ -339,7 +363,7 @@ class AnalyticsController < ApplicationController
|
|||||||
time_iso: hour_time.iso8601,
|
time_iso: hour_time.iso8601,
|
||||||
total: events_by_hour[hour_key] || 0
|
total: events_by_hour[hour_key] || 0
|
||||||
}
|
}
|
||||||
end
|
end.reverse
|
||||||
|
|
||||||
# Action distribution for pie chart
|
# Action distribution for pie chart
|
||||||
action_distribution = @event_breakdown.map do |action, count|
|
action_distribution = @event_breakdown.map do |action, count|
|
||||||
@@ -364,8 +388,8 @@ class AnalyticsController < ApplicationController
|
|||||||
end
|
end
|
||||||
|
|
||||||
def calculate_network_type_stats(start_time)
|
def calculate_network_type_stats(start_time)
|
||||||
# Try DuckDB first, fallback to PostgreSQL
|
# Try DuckLake first, fallback to PostgreSQL
|
||||||
duckdb_stats = with_duckdb_fallback { EventDdb.network_type_stats(start_time) }
|
duckdb_stats = with_duckdb_fallback { BaffleDl.network_type_stats(start_time) }
|
||||||
|
|
||||||
return duckdb_stats if duckdb_stats
|
return duckdb_stats if duckdb_stats
|
||||||
|
|
||||||
@@ -413,8 +437,8 @@ class AnalyticsController < ApplicationController
|
|||||||
end
|
end
|
||||||
|
|
||||||
def calculate_suspicious_patterns(start_time)
|
def calculate_suspicious_patterns(start_time)
|
||||||
# Try DuckDB first, fallback to PostgreSQL
|
# Try DuckLake first, fallback to PostgreSQL
|
||||||
duckdb_patterns = with_duckdb_fallback { EventDdb.suspicious_patterns(start_time) }
|
duckdb_patterns = with_duckdb_fallback { BaffleDl.suspicious_patterns(start_time) }
|
||||||
|
|
||||||
return duckdb_patterns if duckdb_patterns
|
return duckdb_patterns if duckdb_patterns
|
||||||
|
|
||||||
|
|||||||
@@ -20,37 +20,71 @@ class EventsController < ApplicationController
|
|||||||
end
|
end
|
||||||
|
|
||||||
def index
|
def index
|
||||||
@events = Event.includes(:network_range, :rule).order(timestamp: :desc)
|
# Build filters hash from params
|
||||||
Rails.logger.debug "Found #{@events.count} total events"
|
filters = {}
|
||||||
Rails.logger.debug "Action: #{params[:waf_action]}"
|
filters[:ip] = params[:ip] if params[:ip].present?
|
||||||
|
filters[:waf_action] = params[:waf_action] if params[:waf_action].present?
|
||||||
|
filters[:country] = params[:country] if params[:country].present?
|
||||||
|
filters[:rule_id] = params[:rule_id] if params[:rule_id].present?
|
||||||
|
filters[:company] = params[:company] if params[:company].present?
|
||||||
|
filters[:network_type] = params[:network_type] if params[:network_type].present?
|
||||||
|
filters[:asn] = params[:asn] if params[:asn].present?
|
||||||
|
filters[:exclude_bots] = params[:exclude_bots] if params[:exclude_bots] == "true"
|
||||||
|
|
||||||
# Apply filters
|
# Handle network_cidr filter (requires NetworkRange lookup)
|
||||||
|
if params[:network_cidr].present?
|
||||||
|
range = NetworkRange.find_by(network: params[:network_cidr])
|
||||||
|
filters[:network_range_id] = range.id if range
|
||||||
|
end
|
||||||
|
|
||||||
|
# Try DuckLake first, fallback to PostgreSQL if unavailable
|
||||||
|
result = BaffleDl.search(filters, page: params[:page]&.to_i || 1, per_page: 50)
|
||||||
|
|
||||||
|
if result
|
||||||
|
# DuckDB query succeeded
|
||||||
|
@pagy = Pagy.new(count: result[:total_count], page: result[:page], items: result[:per_page])
|
||||||
|
@events = result[:events]
|
||||||
|
|
||||||
|
# Load network_range associations for events that have network_range_id
|
||||||
|
network_range_ids = @events.map(&:network_range_id).compact.uniq
|
||||||
|
if network_range_ids.any?
|
||||||
|
network_ranges = NetworkRange.where(id: network_range_ids).index_by(&:id)
|
||||||
|
@events.each do |event|
|
||||||
|
event.network_range = network_ranges[event.network_range_id] if event.network_range_id
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# Load rule associations if needed
|
||||||
|
rule_ids = @events.map(&:rule_id).compact.uniq
|
||||||
|
if rule_ids.any?
|
||||||
|
rules = Rule.where(id: rule_ids).index_by(&:id)
|
||||||
|
@events.each do |event|
|
||||||
|
event.rule = rules[event.rule_id] if event.rule_id
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
Rails.logger.debug "[DuckDB] Found #{result[:total_count]} total events, showing page #{result[:page]}"
|
||||||
|
else
|
||||||
|
# Fallback to PostgreSQL
|
||||||
|
Rails.logger.warn "[EventsController] DuckDB unavailable, falling back to PostgreSQL"
|
||||||
|
|
||||||
|
@events = Event.includes(:network_range, :rule).order(timestamp: :desc)
|
||||||
|
|
||||||
|
# Apply filters using ActiveRecord scopes
|
||||||
@events = @events.by_ip(params[:ip]) if params[:ip].present?
|
@events = @events.by_ip(params[:ip]) if params[:ip].present?
|
||||||
@events = @events.by_waf_action(params[:waf_action]) if params[:waf_action].present?
|
@events = @events.by_waf_action(params[:waf_action]) if params[:waf_action].present?
|
||||||
@events = @events.by_country(params[:country]) if params[:country].present?
|
@events = @events.by_country(params[:country]) if params[:country].present?
|
||||||
@events = @events.where(rule_id: params[:rule_id]) if params[:rule_id].present?
|
@events = @events.where(rule_id: params[:rule_id]) if params[:rule_id].present?
|
||||||
|
|
||||||
# Network-based filters (now using denormalized columns)
|
|
||||||
@events = @events.by_company(params[:company]) if params[:company].present?
|
@events = @events.by_company(params[:company]) if params[:company].present?
|
||||||
@events = @events.by_network_type(params[:network_type]) if params[:network_type].present?
|
@events = @events.by_network_type(params[:network_type]) if params[:network_type].present?
|
||||||
@events = @events.by_asn(params[:asn]) if params[:asn].present?
|
@events = @events.by_asn(params[:asn]) if params[:asn].present?
|
||||||
@events = @events.by_network_cidr(params[:network_cidr]) if params[:network_cidr].present?
|
@events = @events.by_network_cidr(params[:network_cidr]) if params[:network_cidr].present?
|
||||||
|
|
||||||
# Bot filtering
|
|
||||||
@events = @events.exclude_bots if params[:exclude_bots] == "true"
|
@events = @events.exclude_bots if params[:exclude_bots] == "true"
|
||||||
|
|
||||||
Rails.logger.debug "Events count after filtering: #{@events.count}"
|
|
||||||
|
|
||||||
# Debug info
|
|
||||||
Rails.logger.debug "Events count before pagination: #{@events.count}"
|
|
||||||
|
|
||||||
# Paginate
|
# Paginate
|
||||||
@pagy, @events = pagy(@events, items: 50)
|
@pagy, @events = pagy(@events, items: 50)
|
||||||
|
|
||||||
# Network ranges are now preloaded via includes(:network_range)
|
Rails.logger.debug "[PostgreSQL] Events count: #{@pagy.count} total, #{@pagy.pages} pages"
|
||||||
# The denormalized network_range_id makes this much faster than IP containment lookups
|
end
|
||||||
|
|
||||||
Rails.logger.debug "Events count after pagination: #{@events.count}"
|
|
||||||
Rails.logger.debug "Pagy info: #{@pagy.count} total, #{@pagy.pages} pages"
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@@ -256,16 +256,16 @@ class NetworkRangesController < ApplicationController
|
|||||||
|
|
||||||
def calculate_traffic_stats(network_range)
|
def calculate_traffic_stats(network_range)
|
||||||
if network_range.persisted?
|
if network_range.persisted?
|
||||||
# Real network - use cached events_count for total requests (much more performant)
|
# Real network - check if network has events using DuckDB for performance
|
||||||
if network_range.events_count > 0
|
if network_range.has_events?
|
||||||
# Use indexed network_range_id for much better performance instead of expensive CIDR operator
|
# Use indexed network_range_id for much better performance instead of expensive CIDR operator
|
||||||
# Include child network ranges to capture all traffic within this network block
|
# Include child network ranges to capture all traffic within this network block
|
||||||
network_ids = [network_range.id] + network_range.child_ranges.pluck(:id)
|
network_ids = [network_range.id] + network_range.child_ranges.pluck(:id)
|
||||||
|
|
||||||
# Try DuckDB first for stats (much faster)
|
# Try DuckLake first for stats (much faster)
|
||||||
duckdb_stats = with_duckdb_fallback { EventDdb.network_traffic_stats(network_ids) }
|
duckdb_stats = with_duckdb_fallback { BaffleDl.network_traffic_stats(network_ids) }
|
||||||
duckdb_top_paths = with_duckdb_fallback { EventDdb.network_top_paths(network_ids, 10) }
|
duckdb_top_paths = with_duckdb_fallback { BaffleDl.network_top_paths(network_ids, 10) }
|
||||||
duckdb_top_agents = with_duckdb_fallback { EventDdb.network_top_user_agents(network_ids, 5) }
|
duckdb_top_agents = with_duckdb_fallback { BaffleDl.network_top_user_agents(network_ids, 5) }
|
||||||
|
|
||||||
if duckdb_stats
|
if duckdb_stats
|
||||||
# DuckDB success - use fast aggregated stats
|
# DuckDB success - use fast aggregated stats
|
||||||
|
|||||||
@@ -40,9 +40,13 @@ class OidcAuthController < ApplicationController
|
|||||||
|
|
||||||
# Add PKCE verifier if available
|
# Add PKCE verifier if available
|
||||||
code_verifier = retrieve_pkce_verifier
|
code_verifier = retrieve_pkce_verifier
|
||||||
oidc_client.code_verifier = code_verifier if code_verifier.present?
|
|
||||||
|
|
||||||
access_token = oidc_client.access_token!
|
# Pass code_verifier as parameter to access_token! method (PKCE support)
|
||||||
|
access_token = if code_verifier.present?
|
||||||
|
oidc_client.access_token!(:body, code_verifier: code_verifier)
|
||||||
|
else
|
||||||
|
oidc_client.access_token!
|
||||||
|
end
|
||||||
|
|
||||||
# Extract claims from ID token (JWT-only approach)
|
# Extract claims from ID token (JWT-only approach)
|
||||||
id_token = access_token.id_token
|
id_token = access_token.id_token
|
||||||
@@ -171,7 +175,7 @@ class OidcAuthController < ApplicationController
|
|||||||
# JWT claim extraction and validation
|
# JWT claim extraction and validation
|
||||||
def extract_claims_from_id_token(id_token)
|
def extract_claims_from_id_token(id_token)
|
||||||
# Decode JWT without verification first to get claims
|
# Decode JWT without verification first to get claims
|
||||||
decoded_jwt = JWT.decode(id_token, nil, false).first
|
decoded_jwt = JSON::JWT.decode(id_token, :skip_verification)
|
||||||
|
|
||||||
{
|
{
|
||||||
sub: decoded_jwt['sub'],
|
sub: decoded_jwt['sub'],
|
||||||
|
|||||||
@@ -37,20 +37,49 @@ export default class extends Controller {
|
|||||||
|
|
||||||
// Convert ISO time to local time
|
// Convert ISO time to local time
|
||||||
const date = new Date(timeIso)
|
const date = new Date(timeIso)
|
||||||
const localTime = date.toLocaleTimeString(undefined, {
|
|
||||||
|
// Determine if we should show date based on time range
|
||||||
|
const now = new Date()
|
||||||
|
const timeDiff = now - date
|
||||||
|
const hoursDiff = timeDiff / (1000 * 60 * 60)
|
||||||
|
|
||||||
|
let displayTime
|
||||||
|
if (hoursDiff > 25) {
|
||||||
|
// For periods longer than 25 hours, show date only (no time)
|
||||||
|
displayTime = date.toLocaleDateString(undefined, {
|
||||||
|
month: 'short',
|
||||||
|
day: 'numeric'
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
// Check if this is midnight UTC data (daily timeline) vs actual time data (hourly timeline)
|
||||||
|
// Daily timeline: time is at UTC midnight (hours/minutes/seconds = 0)
|
||||||
|
// Hourly timeline: time has actual hours/minutes
|
||||||
|
const utcHours = date.getUTCHours()
|
||||||
|
const utcMinutes = date.getUTCMinutes()
|
||||||
|
const utcSeconds = date.getUTCSeconds()
|
||||||
|
|
||||||
|
if (utcHours === 0 && utcMinutes === 0 && utcSeconds === 0) {
|
||||||
|
// This is midnight UTC - treat as daily data, show date only
|
||||||
|
displayTime = date.toLocaleDateString(undefined, {
|
||||||
|
month: 'short',
|
||||||
|
day: 'numeric'
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
// This is actual time data - show time only
|
||||||
|
displayTime = date.toLocaleTimeString(undefined, {
|
||||||
hour: '2-digit',
|
hour: '2-digit',
|
||||||
minute: '2-digit',
|
minute: '2-digit',
|
||||||
hour12: false
|
hour12: false
|
||||||
})
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
timeElement.textContent = localTime
|
timeElement.textContent = displayTime
|
||||||
timeElement.title = date.toLocaleString(undefined, {
|
timeElement.title = date.toLocaleString(undefined, {
|
||||||
weekday: 'short',
|
weekday: 'short',
|
||||||
year: 'numeric',
|
year: 'numeric',
|
||||||
month: 'short',
|
month: 'short',
|
||||||
day: 'numeric',
|
day: 'numeric',
|
||||||
hour: '2-digit',
|
|
||||||
minute: '2-digit',
|
|
||||||
timeZoneName: 'short'
|
timeZoneName: 'short'
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
52
app/jobs/bootstrap_parquet_export_job.rb
Normal file
52
app/jobs/bootstrap_parquet_export_job.rb
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
# One-time job to bootstrap Parquet export system
|
||||||
|
# Exports all existing DuckDB data to weekly Parquet archives
|
||||||
|
# Run this once when setting up Parquet exports for the first time
|
||||||
|
#
|
||||||
|
# Usage:
|
||||||
|
# BootstrapParquetExportJob.perform_now
|
||||||
|
# # or via docker:
|
||||||
|
# docker compose exec jobs bin/rails runner "BootstrapParquetExportJob.perform_now"
|
||||||
|
class BootstrapParquetExportJob < ApplicationJob
|
||||||
|
queue_as :default
|
||||||
|
|
||||||
|
def perform
|
||||||
|
service = AnalyticsDuckdbService.instance
|
||||||
|
|
||||||
|
# Check if DuckDB has any data
|
||||||
|
event_count = service.event_count
|
||||||
|
Rails.logger.info "[Parquet Bootstrap] DuckDB event count: #{event_count}"
|
||||||
|
|
||||||
|
if event_count == 0
|
||||||
|
Rails.logger.warn "[Parquet Bootstrap] No events in DuckDB. Run SyncEventsToDuckdbJob first."
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
# Check if Parquet files already exist
|
||||||
|
existing_weeks = Dir.glob(AnalyticsDuckdbService::PARQUET_WEEKS_PATH.join("*.parquet")).size
|
||||||
|
if existing_weeks > 0
|
||||||
|
Rails.logger.info "[Parquet Bootstrap] Found #{existing_weeks} existing week archives"
|
||||||
|
end
|
||||||
|
|
||||||
|
Rails.logger.info "[Parquet Bootstrap] Starting export of all DuckDB data to Parquet..."
|
||||||
|
|
||||||
|
start_time = Time.current
|
||||||
|
|
||||||
|
# Run the bootstrap export
|
||||||
|
service.export_all_to_parquet
|
||||||
|
|
||||||
|
duration = Time.current - start_time
|
||||||
|
week_count = Dir.glob(AnalyticsDuckdbService::PARQUET_WEEKS_PATH.join("*.parquet")).size
|
||||||
|
|
||||||
|
Rails.logger.info "[Parquet Bootstrap] Complete!"
|
||||||
|
Rails.logger.info "[Parquet Bootstrap] - Time taken: #{duration.round(2)} seconds"
|
||||||
|
Rails.logger.info "[Parquet Bootstrap] - Week archives: #{week_count}"
|
||||||
|
Rails.logger.info "[Parquet Bootstrap] - Storage: #{AnalyticsDuckdbService::PARQUET_BASE_PATH}"
|
||||||
|
Rails.logger.info "[Parquet Bootstrap] System is ready - jobs will maintain exports automatically"
|
||||||
|
rescue StandardError => e
|
||||||
|
Rails.logger.error "[Parquet Bootstrap] Job failed: #{e.message}"
|
||||||
|
Rails.logger.error e.backtrace.join("\n")
|
||||||
|
raise # Re-raise to mark job as failed
|
||||||
|
end
|
||||||
|
end
|
||||||
25
app/jobs/consolidate_parquet_hourly_job.rb
Normal file
25
app/jobs/consolidate_parquet_hourly_job.rb
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
# Background job to consolidate completed hour into day file
|
||||||
|
# Runs at :05 past each hour (e.g., 01:05, 02:05, etc.)
|
||||||
|
# Merges the previous hour's data into the day file and deletes the hour file
|
||||||
|
class ConsolidateParquetHourlyJob < ApplicationJob
|
||||||
|
queue_as :default
|
||||||
|
|
||||||
|
def perform
|
||||||
|
service = AnalyticsDuckdbService.instance
|
||||||
|
|
||||||
|
# Consolidate the previous hour (not current hour, which is still being written)
|
||||||
|
previous_hour = 1.hour.ago
|
||||||
|
|
||||||
|
Rails.logger.info "[Parquet Consolidate] Starting hourly consolidation for #{previous_hour.strftime('%Y-%m-%d %H:00')}"
|
||||||
|
|
||||||
|
service.consolidate_hour_to_day(previous_hour)
|
||||||
|
|
||||||
|
Rails.logger.info "[Parquet Consolidate] Hourly consolidation complete"
|
||||||
|
rescue StandardError => e
|
||||||
|
Rails.logger.error "[Parquet Consolidate] Hourly job failed: #{e.message}"
|
||||||
|
Rails.logger.error e.backtrace.join("\n")
|
||||||
|
raise # Re-raise to mark job as failed in Solid Queue
|
||||||
|
end
|
||||||
|
end
|
||||||
25
app/jobs/consolidate_parquet_weekly_job.rb
Normal file
25
app/jobs/consolidate_parquet_weekly_job.rb
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
# Background job to consolidate completed week into archive
|
||||||
|
# Runs Monday at 00:05 (start of new week)
|
||||||
|
# Merges the previous week's day files into a week archive and deletes day files
|
||||||
|
class ConsolidateParquetWeeklyJob < ApplicationJob
|
||||||
|
queue_as :default
|
||||||
|
|
||||||
|
def perform
|
||||||
|
service = AnalyticsDuckdbService.instance
|
||||||
|
|
||||||
|
# Consolidate the previous week (Monday to Sunday)
|
||||||
|
previous_week_start = 1.week.ago.beginning_of_week
|
||||||
|
|
||||||
|
Rails.logger.info "[Parquet Consolidate] Starting weekly consolidation for week starting #{previous_week_start.strftime('%Y-%m-%d')}"
|
||||||
|
|
||||||
|
service.consolidate_days_to_week(previous_week_start)
|
||||||
|
|
||||||
|
Rails.logger.info "[Parquet Consolidate] Weekly consolidation complete"
|
||||||
|
rescue StandardError => e
|
||||||
|
Rails.logger.error "[Parquet Consolidate] Weekly job failed: #{e.message}"
|
||||||
|
Rails.logger.error e.backtrace.join("\n")
|
||||||
|
raise # Re-raise to mark job as failed in Solid Queue
|
||||||
|
end
|
||||||
|
end
|
||||||
@@ -79,6 +79,7 @@ class ProcessWafEventJob < ApplicationJob
|
|||||||
Rails.logger.debug "Network processing took #{((Time.current - network_start) * 1000).round(2)}ms"
|
Rails.logger.debug "Network processing took #{((Time.current - network_start) * 1000).round(2)}ms"
|
||||||
rescue => e
|
rescue => e
|
||||||
Rails.logger.warn "Failed to process network range for event #{event.id}: #{e.message}"
|
Rails.logger.warn "Failed to process network range for event #{event.id}: #{e.message}"
|
||||||
|
Sentry.capture_exception(e)
|
||||||
end
|
end
|
||||||
elsif event.ip_address.present?
|
elsif event.ip_address.present?
|
||||||
Rails.logger.warn "Event #{event.id} has IP but no network_range_id (private IP?)"
|
Rails.logger.warn "Event #{event.id} has IP but no network_range_id (private IP?)"
|
||||||
|
|||||||
@@ -1,89 +0,0 @@
|
|||||||
# frozen_string_literal: true
|
|
||||||
|
|
||||||
# Background job to sync events from PostgreSQL to DuckDB
|
|
||||||
# Runs every 5 minutes to keep analytics database up-to-date
|
|
||||||
# Uses watermark tracking to only sync new events
|
|
||||||
class SyncEventsToDuckdbJob < ApplicationJob
|
|
||||||
queue_as :default
|
|
||||||
|
|
||||||
# Key for storing last sync timestamp in Rails cache
|
|
||||||
WATERMARK_CACHE_KEY = "duckdb_last_sync_time"
|
|
||||||
WATERMARK_TTL = 1.week
|
|
||||||
|
|
||||||
# Overlap window to catch late-arriving events
|
|
||||||
SYNC_OVERLAP = 1.minute
|
|
||||||
|
|
||||||
def perform
|
|
||||||
service = AnalyticsDuckdbService.instance
|
|
||||||
|
|
||||||
# Determine where to start syncing
|
|
||||||
from_timestamp = determine_sync_start_time(service)
|
|
||||||
|
|
||||||
Rails.logger.info "[DuckDB Sync] Starting sync from #{from_timestamp}"
|
|
||||||
|
|
||||||
# Sync new events using PostgreSQL cursor + DuckDB Appender
|
|
||||||
# (setup_schema is called internally within sync_new_events)
|
|
||||||
count = service.sync_new_events(from_timestamp)
|
|
||||||
|
|
||||||
# Update watermark if we synced any events
|
|
||||||
if count > 0
|
|
||||||
update_last_sync_time
|
|
||||||
Rails.logger.info "[DuckDB Sync] Successfully synced #{count} events"
|
|
||||||
else
|
|
||||||
Rails.logger.info "[DuckDB Sync] No new events to sync"
|
|
||||||
end
|
|
||||||
rescue StandardError => e
|
|
||||||
Rails.logger.error "[DuckDB Sync] Job failed: #{e.message}"
|
|
||||||
Rails.logger.error e.backtrace.join("\n")
|
|
||||||
raise # Re-raise to mark job as failed in Solid Queue
|
|
||||||
end
|
|
||||||
|
|
||||||
private
|
|
||||||
|
|
||||||
# Determine timestamp to start syncing from
|
|
||||||
# Strategy:
|
|
||||||
# 1. First run (DuckDB empty): sync from oldest PostgreSQL event
|
|
||||||
# 2. Subsequent runs: sync from last watermark with overlap
|
|
||||||
def determine_sync_start_time(service)
|
|
||||||
oldest_duckdb = service.oldest_event_timestamp
|
|
||||||
|
|
||||||
if oldest_duckdb.nil?
|
|
||||||
# DuckDB is empty - this is the first sync
|
|
||||||
# Start from oldest PostgreSQL event (or reasonable cutoff)
|
|
||||||
oldest_pg = Event.minimum(:timestamp)
|
|
||||||
|
|
||||||
if oldest_pg.nil?
|
|
||||||
# No events in PostgreSQL at all
|
|
||||||
Rails.logger.warn "[DuckDB Sync] No events found in PostgreSQL"
|
|
||||||
1.day.ago # Default to recent window
|
|
||||||
else
|
|
||||||
Rails.logger.info "[DuckDB Sync] First sync - starting from oldest event: #{oldest_pg}"
|
|
||||||
oldest_pg
|
|
||||||
end
|
|
||||||
else
|
|
||||||
# DuckDB has data - sync from last watermark with overlap
|
|
||||||
last_sync = Rails.cache.read(WATERMARK_CACHE_KEY)
|
|
||||||
|
|
||||||
if last_sync.nil?
|
|
||||||
# Watermark not in cache (maybe cache expired or restarted)
|
|
||||||
# Fall back to newest event in DuckDB
|
|
||||||
newest_duckdb = service.newest_event_timestamp
|
|
||||||
start_time = newest_duckdb ? newest_duckdb - SYNC_OVERLAP : oldest_duckdb
|
|
||||||
Rails.logger.info "[DuckDB Sync] Watermark not found, using newest DuckDB event: #{start_time}"
|
|
||||||
start_time
|
|
||||||
else
|
|
||||||
# Normal case: use watermark with overlap to catch late arrivals
|
|
||||||
start_time = last_sync - SYNC_OVERLAP
|
|
||||||
Rails.logger.debug "[DuckDB Sync] Using watermark: #{last_sync} (with #{SYNC_OVERLAP}s overlap)"
|
|
||||||
start_time
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
# Update last sync watermark in cache
|
|
||||||
def update_last_sync_time
|
|
||||||
now = Time.current
|
|
||||||
Rails.cache.write(WATERMARK_CACHE_KEY, now, expires_in: WATERMARK_TTL)
|
|
||||||
Rails.logger.debug "[DuckDB Sync] Updated watermark to #{now}"
|
|
||||||
end
|
|
||||||
end
|
|
||||||
@@ -2,19 +2,54 @@
|
|||||||
|
|
||||||
require 'ostruct'
|
require 'ostruct'
|
||||||
|
|
||||||
# EventDdb - DuckDB-backed analytics queries for events
|
# EventDdb - DuckLake-backed analytics queries for events
|
||||||
# Provides an ActiveRecord-like interface for querying DuckDB events table
|
# Provides an ActiveRecord-like interface for querying DuckLake events table
|
||||||
# Falls back to PostgreSQL Event model if DuckDB is unavailable
|
# Falls back to PostgreSQL Event model if DuckLake is unavailable
|
||||||
class EventDdb
|
class EventDdb
|
||||||
|
# Enum mappings from integer to string (matching Event model)
|
||||||
|
ACTION_MAP = {
|
||||||
|
0 => "deny",
|
||||||
|
1 => "allow",
|
||||||
|
2 => "redirect",
|
||||||
|
3 => "challenge",
|
||||||
|
4 => "log"
|
||||||
|
}.freeze
|
||||||
|
|
||||||
|
METHOD_MAP = {
|
||||||
|
0 => "get",
|
||||||
|
1 => "post",
|
||||||
|
2 => "put",
|
||||||
|
3 => "patch",
|
||||||
|
4 => "delete",
|
||||||
|
5 => "head",
|
||||||
|
6 => "options"
|
||||||
|
}.freeze
|
||||||
|
|
||||||
class << self
|
class << self
|
||||||
# Get DuckDB service
|
# Get DuckLake service
|
||||||
def service
|
def service
|
||||||
AnalyticsDuckdbService.instance
|
AnalyticsDucklakeService.new
|
||||||
|
end
|
||||||
|
|
||||||
|
# Helper to work with DuckLake events table
|
||||||
|
# This allows all existing queries to work without modification
|
||||||
|
def with_events_from_parquet(&block)
|
||||||
|
service.with_connection do |conn|
|
||||||
|
# Ensure schema exists
|
||||||
|
service.setup_schema(conn)
|
||||||
|
|
||||||
|
# Use the DuckLake events table directly
|
||||||
|
# DuckLake automatically manages the Parquet files underneath
|
||||||
|
yield conn
|
||||||
|
end
|
||||||
|
rescue StandardError => e
|
||||||
|
Rails.logger.error "[EventDdb] Error accessing DuckLake events: #{e.message}"
|
||||||
|
nil
|
||||||
end
|
end
|
||||||
|
|
||||||
# Total events since timestamp
|
# Total events since timestamp
|
||||||
def count_since(start_time)
|
def count_since(start_time)
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
result = conn.query("SELECT COUNT(*) as count FROM events WHERE timestamp >= ?", start_time)
|
result = conn.query("SELECT COUNT(*) as count FROM events WHERE timestamp >= ?", start_time)
|
||||||
result.first&.first || 0
|
result.first&.first || 0
|
||||||
end
|
end
|
||||||
@@ -25,7 +60,7 @@ class EventDdb
|
|||||||
|
|
||||||
# Event breakdown by WAF action
|
# Event breakdown by WAF action
|
||||||
def breakdown_by_action(start_time)
|
def breakdown_by_action(start_time)
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
result = conn.query(<<~SQL, start_time)
|
result = conn.query(<<~SQL, start_time)
|
||||||
SELECT waf_action, COUNT(*) as count
|
SELECT waf_action, COUNT(*) as count
|
||||||
FROM events
|
FROM events
|
||||||
@@ -46,7 +81,7 @@ class EventDdb
|
|||||||
|
|
||||||
# Top countries with event counts
|
# Top countries with event counts
|
||||||
def top_countries(start_time, limit = 10)
|
def top_countries(start_time, limit = 10)
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
result = conn.query(<<~SQL, start_time, limit)
|
result = conn.query(<<~SQL, start_time, limit)
|
||||||
SELECT country, COUNT(*) as count
|
SELECT country, COUNT(*) as count
|
||||||
FROM events
|
FROM events
|
||||||
@@ -67,7 +102,7 @@ class EventDdb
|
|||||||
|
|
||||||
# Top blocked IPs
|
# Top blocked IPs
|
||||||
def top_blocked_ips(start_time, limit = 10)
|
def top_blocked_ips(start_time, limit = 10)
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
result = conn.query(<<~SQL, start_time, limit)
|
result = conn.query(<<~SQL, start_time, limit)
|
||||||
SELECT ip_address, COUNT(*) as count
|
SELECT ip_address, COUNT(*) as count
|
||||||
FROM events
|
FROM events
|
||||||
@@ -87,7 +122,7 @@ class EventDdb
|
|||||||
|
|
||||||
# Hourly timeline aggregation
|
# Hourly timeline aggregation
|
||||||
def hourly_timeline(start_time, end_time)
|
def hourly_timeline(start_time, end_time)
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
result = conn.query(<<~SQL, start_time, end_time)
|
result = conn.query(<<~SQL, start_time, end_time)
|
||||||
SELECT
|
SELECT
|
||||||
DATE_TRUNC('hour', timestamp) as hour,
|
DATE_TRUNC('hour', timestamp) as hour,
|
||||||
@@ -110,7 +145,7 @@ class EventDdb
|
|||||||
# Top networks by traffic volume
|
# Top networks by traffic volume
|
||||||
# Returns array of arrays: [network_range_id, event_count, unique_ips]
|
# Returns array of arrays: [network_range_id, event_count, unique_ips]
|
||||||
def top_networks(start_time, limit = 50)
|
def top_networks(start_time, limit = 50)
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
result = conn.query(<<~SQL, start_time, limit)
|
result = conn.query(<<~SQL, start_time, limit)
|
||||||
SELECT
|
SELECT
|
||||||
network_range_id,
|
network_range_id,
|
||||||
@@ -133,7 +168,7 @@ class EventDdb
|
|||||||
# Top companies
|
# Top companies
|
||||||
# Returns array of OpenStruct objects with: company, event_count, unique_ips, network_count
|
# Returns array of OpenStruct objects with: company, event_count, unique_ips, network_count
|
||||||
def top_companies(start_time, limit = 20)
|
def top_companies(start_time, limit = 20)
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
result = conn.query(<<~SQL, start_time, limit)
|
result = conn.query(<<~SQL, start_time, limit)
|
||||||
SELECT
|
SELECT
|
||||||
company,
|
company,
|
||||||
@@ -165,7 +200,7 @@ class EventDdb
|
|||||||
# Top ASNs
|
# Top ASNs
|
||||||
# Returns array of OpenStruct objects with: asn, asn_org, event_count, unique_ips, network_count
|
# Returns array of OpenStruct objects with: asn, asn_org, event_count, unique_ips, network_count
|
||||||
def top_asns(start_time, limit = 15)
|
def top_asns(start_time, limit = 15)
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
result = conn.query(<<~SQL, start_time, limit)
|
result = conn.query(<<~SQL, start_time, limit)
|
||||||
SELECT
|
SELECT
|
||||||
asn,
|
asn,
|
||||||
@@ -199,7 +234,7 @@ class EventDdb
|
|||||||
# Network type breakdown (datacenter, VPN, proxy, standard)
|
# Network type breakdown (datacenter, VPN, proxy, standard)
|
||||||
# Returns hash with network_type as key and hash of stats as value
|
# Returns hash with network_type as key and hash of stats as value
|
||||||
def network_type_breakdown(start_time)
|
def network_type_breakdown(start_time)
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
result = conn.query(<<~SQL, start_time)
|
result = conn.query(<<~SQL, start_time)
|
||||||
SELECT
|
SELECT
|
||||||
CASE
|
CASE
|
||||||
@@ -236,7 +271,7 @@ class EventDdb
|
|||||||
# Top countries with detailed stats (event count and unique IPs)
|
# Top countries with detailed stats (event count and unique IPs)
|
||||||
# Returns array of OpenStruct objects with: country, event_count, unique_ips
|
# Returns array of OpenStruct objects with: country, event_count, unique_ips
|
||||||
def top_countries_with_stats(start_time, limit = 15)
|
def top_countries_with_stats(start_time, limit = 15)
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
result = conn.query(<<~SQL, start_time, limit)
|
result = conn.query(<<~SQL, start_time, limit)
|
||||||
SELECT
|
SELECT
|
||||||
country,
|
country,
|
||||||
@@ -266,7 +301,7 @@ class EventDdb
|
|||||||
# Network type stats with formatted output matching controller expectations
|
# Network type stats with formatted output matching controller expectations
|
||||||
# Returns hash with type keys containing label, networks, events, unique_ips, percentage
|
# Returns hash with type keys containing label, networks, events, unique_ips, percentage
|
||||||
def network_type_stats(start_time)
|
def network_type_stats(start_time)
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
# Get total events for percentage calculation
|
# Get total events for percentage calculation
|
||||||
total_result = conn.query("SELECT COUNT(*) as total FROM events WHERE timestamp >= ?", start_time)
|
total_result = conn.query("SELECT COUNT(*) as total FROM events WHERE timestamp >= ?", start_time)
|
||||||
total_events = total_result.first&.first || 0
|
total_events = total_result.first&.first || 0
|
||||||
@@ -309,7 +344,7 @@ class EventDdb
|
|||||||
network_range_ids = Array(network_range_ids)
|
network_range_ids = Array(network_range_ids)
|
||||||
return nil if network_range_ids.empty?
|
return nil if network_range_ids.empty?
|
||||||
|
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
# Build IN clause with placeholders
|
# Build IN clause with placeholders
|
||||||
placeholders = network_range_ids.map { "?" }.join(", ")
|
placeholders = network_range_ids.map { "?" }.join(", ")
|
||||||
|
|
||||||
@@ -344,7 +379,7 @@ class EventDdb
|
|||||||
network_range_ids = Array(network_range_ids)
|
network_range_ids = Array(network_range_ids)
|
||||||
return nil if network_range_ids.empty?
|
return nil if network_range_ids.empty?
|
||||||
|
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
# Build IN clause with placeholders
|
# Build IN clause with placeholders
|
||||||
placeholders = network_range_ids.map { "?" }.join(", ")
|
placeholders = network_range_ids.map { "?" }.join(", ")
|
||||||
|
|
||||||
@@ -372,7 +407,7 @@ class EventDdb
|
|||||||
network_range_ids = Array(network_range_ids)
|
network_range_ids = Array(network_range_ids)
|
||||||
return nil if network_range_ids.empty?
|
return nil if network_range_ids.empty?
|
||||||
|
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
# Build IN clause with placeholders
|
# Build IN clause with placeholders
|
||||||
placeholders = network_range_ids.map { "?" }.join(", ")
|
placeholders = network_range_ids.map { "?" }.join(", ")
|
||||||
|
|
||||||
@@ -395,13 +430,36 @@ class EventDdb
|
|||||||
nil
|
nil
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Count events for network range(s)
|
||||||
|
# Returns integer count of all events in the network
|
||||||
|
def network_event_count(network_range_ids)
|
||||||
|
network_range_ids = Array(network_range_ids)
|
||||||
|
return nil if network_range_ids.empty?
|
||||||
|
|
||||||
|
with_events_from_parquet do |conn|
|
||||||
|
# Build IN clause with placeholders
|
||||||
|
placeholders = network_range_ids.map { "?" }.join(", ")
|
||||||
|
|
||||||
|
result = conn.query(<<~SQL, *network_range_ids)
|
||||||
|
SELECT COUNT(*) as count
|
||||||
|
FROM events
|
||||||
|
WHERE network_range_id IN (#{placeholders})
|
||||||
|
SQL
|
||||||
|
|
||||||
|
result.first&.first || 0
|
||||||
|
end
|
||||||
|
rescue StandardError => e
|
||||||
|
Rails.logger.error "[EventDdb] Error in network_event_count: #{e.message}"
|
||||||
|
nil
|
||||||
|
end
|
||||||
|
|
||||||
# Full user agent tally for network range(s)
|
# Full user agent tally for network range(s)
|
||||||
# Returns hash of user_agent => count for all agents in the network
|
# Returns hash of user_agent => count for all agents in the network
|
||||||
def network_agent_tally(network_range_ids)
|
def network_agent_tally(network_range_ids)
|
||||||
network_range_ids = Array(network_range_ids)
|
network_range_ids = Array(network_range_ids)
|
||||||
return nil if network_range_ids.empty?
|
return nil if network_range_ids.empty?
|
||||||
|
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
# Build IN clause with placeholders
|
# Build IN clause with placeholders
|
||||||
placeholders = network_range_ids.map { "?" }.join(", ")
|
placeholders = network_range_ids.map { "?" }.join(", ")
|
||||||
|
|
||||||
@@ -426,7 +484,7 @@ class EventDdb
|
|||||||
# Suspicious network activity patterns
|
# Suspicious network activity patterns
|
||||||
# Detects high-volume networks, high deny rates, and distributed companies
|
# Detects high-volume networks, high deny rates, and distributed companies
|
||||||
def suspicious_patterns(start_time)
|
def suspicious_patterns(start_time)
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
# High volume networks (5x average)
|
# High volume networks (5x average)
|
||||||
avg_query = conn.query(<<~SQL, start_time)
|
avg_query = conn.query(<<~SQL, start_time)
|
||||||
SELECT
|
SELECT
|
||||||
@@ -504,7 +562,7 @@ class EventDdb
|
|||||||
|
|
||||||
# Bot traffic analysis - breakdown of bot vs human traffic
|
# Bot traffic analysis - breakdown of bot vs human traffic
|
||||||
def bot_traffic_breakdown(start_time)
|
def bot_traffic_breakdown(start_time)
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
result = conn.query(<<~SQL, start_time)
|
result = conn.query(<<~SQL, start_time)
|
||||||
SELECT
|
SELECT
|
||||||
is_bot,
|
is_bot,
|
||||||
@@ -534,7 +592,7 @@ class EventDdb
|
|||||||
|
|
||||||
# Count human traffic (non-bot) since timestamp
|
# Count human traffic (non-bot) since timestamp
|
||||||
def human_traffic_count(start_time)
|
def human_traffic_count(start_time)
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
result = conn.query(<<~SQL, start_time)
|
result = conn.query(<<~SQL, start_time)
|
||||||
SELECT COUNT(*) as count
|
SELECT COUNT(*) as count
|
||||||
FROM events
|
FROM events
|
||||||
@@ -550,7 +608,7 @@ class EventDdb
|
|||||||
|
|
||||||
# Count bot traffic since timestamp
|
# Count bot traffic since timestamp
|
||||||
def bot_traffic_count(start_time)
|
def bot_traffic_count(start_time)
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
result = conn.query(<<~SQL, start_time)
|
result = conn.query(<<~SQL, start_time)
|
||||||
SELECT COUNT(*) as count
|
SELECT COUNT(*) as count
|
||||||
FROM events
|
FROM events
|
||||||
@@ -566,7 +624,7 @@ class EventDdb
|
|||||||
|
|
||||||
# Top bot user agents
|
# Top bot user agents
|
||||||
def top_bot_user_agents(start_time, limit = 20)
|
def top_bot_user_agents(start_time, limit = 20)
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
result = conn.query(<<~SQL, start_time, limit)
|
result = conn.query(<<~SQL, start_time, limit)
|
||||||
SELECT
|
SELECT
|
||||||
user_agent,
|
user_agent,
|
||||||
@@ -595,7 +653,7 @@ class EventDdb
|
|||||||
|
|
||||||
# Bot traffic timeline (hourly breakdown)
|
# Bot traffic timeline (hourly breakdown)
|
||||||
def bot_traffic_timeline(start_time, end_time)
|
def bot_traffic_timeline(start_time, end_time)
|
||||||
service.with_connection do |conn|
|
with_events_from_parquet do |conn|
|
||||||
result = conn.query(<<~SQL, start_time, end_time)
|
result = conn.query(<<~SQL, start_time, end_time)
|
||||||
SELECT
|
SELECT
|
||||||
DATE_TRUNC('hour', timestamp) as hour,
|
DATE_TRUNC('hour', timestamp) as hour,
|
||||||
@@ -624,5 +682,157 @@ class EventDdb
|
|||||||
Rails.logger.error "[EventDdb] Error in bot_traffic_timeline: #{e.message}"
|
Rails.logger.error "[EventDdb] Error in bot_traffic_timeline: #{e.message}"
|
||||||
nil
|
nil
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Search events with filters and pagination
|
||||||
|
# Returns { total_count:, events:[], page:, per_page: }
|
||||||
|
# Supports filters: ip, waf_action, country, rule_id, company, asn, network_type, network_range_id, exclude_bots, request_path
|
||||||
|
def search(filters = {}, page: 1, per_page: 50)
|
||||||
|
with_events_from_parquet do |conn|
|
||||||
|
# Build WHERE clause
|
||||||
|
where_clause, params = build_where_clause(filters)
|
||||||
|
|
||||||
|
# Get total count
|
||||||
|
count_sql = "SELECT COUNT(*) FROM baffle.events#{where_clause}"
|
||||||
|
count_result = conn.query(count_sql, *params)
|
||||||
|
total_count = count_result.first&.first || 0
|
||||||
|
|
||||||
|
# Get paginated results
|
||||||
|
offset = (page - 1) * per_page
|
||||||
|
|
||||||
|
data_sql = <<~SQL
|
||||||
|
SELECT
|
||||||
|
id, timestamp, ip_address, network_range_id, country, company,
|
||||||
|
asn, asn_org, is_datacenter, is_vpn, is_proxy, is_bot,
|
||||||
|
waf_action, request_method, response_status, rule_id,
|
||||||
|
request_path, user_agent, tags
|
||||||
|
FROM baffle.events
|
||||||
|
#{where_clause}
|
||||||
|
ORDER BY timestamp DESC
|
||||||
|
LIMIT ? OFFSET ?
|
||||||
|
SQL
|
||||||
|
|
||||||
|
result = conn.query(data_sql, *params, per_page, offset)
|
||||||
|
|
||||||
|
# Convert rows to event-like objects
|
||||||
|
events = result.to_a.map { |row| row_to_event(row) }
|
||||||
|
|
||||||
|
{
|
||||||
|
total_count: total_count,
|
||||||
|
events: events,
|
||||||
|
page: page,
|
||||||
|
per_page: per_page
|
||||||
|
}
|
||||||
|
end
|
||||||
|
rescue StandardError => e
|
||||||
|
Rails.logger.error "[EventDdb] Error in DuckLake search: #{e.message}"
|
||||||
|
Rails.logger.error e.backtrace.join("\n")
|
||||||
|
nil
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
# Build WHERE clause and params from filters hash
|
||||||
|
# Returns [where_clause_string, params_array]
|
||||||
|
def build_where_clause(filters)
|
||||||
|
conditions = []
|
||||||
|
params = []
|
||||||
|
|
||||||
|
if filters[:ip].present?
|
||||||
|
conditions << "ip_address = ?"
|
||||||
|
params << filters[:ip]
|
||||||
|
end
|
||||||
|
|
||||||
|
if filters[:waf_action].present?
|
||||||
|
# Convert string action to integer
|
||||||
|
action_int = ACTION_MAP.key(filters[:waf_action].to_s)
|
||||||
|
if action_int
|
||||||
|
conditions << "waf_action = ?"
|
||||||
|
params << action_int
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
if filters[:country].present?
|
||||||
|
conditions << "country = ?"
|
||||||
|
params << filters[:country]
|
||||||
|
end
|
||||||
|
|
||||||
|
if filters[:rule_id].present?
|
||||||
|
conditions << "rule_id = ?"
|
||||||
|
params << filters[:rule_id].to_i
|
||||||
|
end
|
||||||
|
|
||||||
|
if filters[:company].present?
|
||||||
|
conditions << "company ILIKE ?"
|
||||||
|
params << "%#{filters[:company]}%"
|
||||||
|
end
|
||||||
|
|
||||||
|
if filters[:asn].present?
|
||||||
|
conditions << "asn = ?"
|
||||||
|
params << filters[:asn].to_i
|
||||||
|
end
|
||||||
|
|
||||||
|
if filters[:network_range_id].present?
|
||||||
|
conditions << "network_range_id = ?"
|
||||||
|
params << filters[:network_range_id].to_i
|
||||||
|
end
|
||||||
|
|
||||||
|
# Network type filter
|
||||||
|
if filters[:network_type].present?
|
||||||
|
case filters[:network_type].to_s.downcase
|
||||||
|
when "datacenter"
|
||||||
|
conditions << "is_datacenter = true"
|
||||||
|
when "vpn"
|
||||||
|
conditions << "is_vpn = true"
|
||||||
|
when "proxy"
|
||||||
|
conditions << "is_proxy = true"
|
||||||
|
when "standard"
|
||||||
|
conditions << "(is_datacenter = false AND is_vpn = false AND is_proxy = false)"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# Path filtering
|
||||||
|
if filters[:request_path].present?
|
||||||
|
conditions << "request_path = ?"
|
||||||
|
params << filters[:request_path]
|
||||||
|
end
|
||||||
|
|
||||||
|
# Bot filtering
|
||||||
|
if filters[:exclude_bots] == true || filters[:exclude_bots] == "true"
|
||||||
|
conditions << "is_bot = false"
|
||||||
|
end
|
||||||
|
|
||||||
|
where_clause = conditions.any? ? " WHERE #{conditions.join(' AND ')}" : ""
|
||||||
|
[where_clause, params]
|
||||||
|
end
|
||||||
|
|
||||||
|
# Convert DuckDB row array to event-like OpenStruct
|
||||||
|
def row_to_event(row)
|
||||||
|
OpenStruct.new(
|
||||||
|
id: row[0],
|
||||||
|
timestamp: row[1],
|
||||||
|
ip_address: row[2],
|
||||||
|
network_range_id: row[3],
|
||||||
|
country: row[4],
|
||||||
|
company: row[5],
|
||||||
|
asn: row[6],
|
||||||
|
asn_org: row[7],
|
||||||
|
is_datacenter: row[8],
|
||||||
|
is_vpn: row[9],
|
||||||
|
is_proxy: row[10],
|
||||||
|
is_bot: row[11],
|
||||||
|
waf_action: ACTION_MAP[row[12]] || "unknown",
|
||||||
|
request_method: METHOD_MAP[row[13]],
|
||||||
|
response_status: row[14],
|
||||||
|
rule_id: row[15],
|
||||||
|
request_path: row[16],
|
||||||
|
user_agent: row[17],
|
||||||
|
tags: row[18] || [],
|
||||||
|
# Add helper method for country lookup
|
||||||
|
lookup_country: row[4],
|
||||||
|
# Network range will be loaded separately in controller
|
||||||
|
network_range: nil,
|
||||||
|
rule: nil
|
||||||
|
)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ class NetworkRange < ApplicationRecord
|
|||||||
|
|
||||||
# Associations
|
# Associations
|
||||||
has_many :rules, dependent: :destroy
|
has_many :rules, dependent: :destroy
|
||||||
|
has_many :events, foreign_key: :network_range_id, dependent: :nullify
|
||||||
belongs_to :user, optional: true
|
belongs_to :user, optional: true
|
||||||
|
|
||||||
# Validations
|
# Validations
|
||||||
@@ -36,8 +37,8 @@ class NetworkRange < ApplicationRecord
|
|||||||
scope :geolite_imported, -> { where(source: ['geolite_asn', 'geolite_country']) }
|
scope :geolite_imported, -> { where(source: ['geolite_asn', 'geolite_country']) }
|
||||||
scope :geolite_asn, -> { where(source: 'geolite_asn') }
|
scope :geolite_asn, -> { where(source: 'geolite_asn') }
|
||||||
scope :geolite_country, -> { where(source: 'geolite_country') }
|
scope :geolite_country, -> { where(source: 'geolite_country') }
|
||||||
scope :with_events, -> { where("events_count > 0") }
|
scope :with_events, -> { joins(:events).distinct }
|
||||||
scope :most_active, -> { order(events_count: :desc) }
|
scope :most_active, -> { joins(:events).group('network_ranges.id').order('COUNT(events.id) DESC') }
|
||||||
|
|
||||||
# Callbacks
|
# Callbacks
|
||||||
before_validation :set_default_source
|
before_validation :set_default_source
|
||||||
@@ -241,7 +242,7 @@ class NetworkRange < ApplicationRecord
|
|||||||
def agent_tally
|
def agent_tally
|
||||||
Rails.cache.fetch("#{cache_key}:agent_tally", expires_in: 5.minutes) do
|
Rails.cache.fetch("#{cache_key}:agent_tally", expires_in: 5.minutes) do
|
||||||
# Use DuckDB for fast agent tally instead of loading all events into memory
|
# Use DuckDB for fast agent tally instead of loading all events into memory
|
||||||
if persisted? && events_count > 0
|
if persisted? && has_events?
|
||||||
# Include child network ranges to capture all traffic within this network block
|
# Include child network ranges to capture all traffic within this network block
|
||||||
network_ids = [id] + child_ranges.pluck(:id)
|
network_ids = [id] + child_ranges.pluck(:id)
|
||||||
|
|
||||||
@@ -417,10 +418,16 @@ class NetworkRange < ApplicationRecord
|
|||||||
cidr.to_s.gsub('/', '_')
|
cidr.to_s.gsub('/', '_')
|
||||||
end
|
end
|
||||||
|
|
||||||
# Analytics methods - events_count is now a counter cache column maintained by database triggers
|
# Check if network range has any events using DuckDB for performance
|
||||||
# This is much more performant than the previous implementation that did complex network queries
|
def has_events?
|
||||||
def events_count
|
return false unless persisted?
|
||||||
self[:events_count] || 0
|
|
||||||
|
# Include child network ranges to capture all traffic within this network block
|
||||||
|
network_ids = [id] + child_ranges.pluck(:id)
|
||||||
|
|
||||||
|
# Try DuckDB first for fast event count check
|
||||||
|
event_count = with_duckdb_fallback { EventDdb.network_event_count(network_ids) }
|
||||||
|
event_count&.positive? || events.exists?
|
||||||
end
|
end
|
||||||
|
|
||||||
def events
|
def events
|
||||||
|
|||||||
@@ -152,16 +152,41 @@ validate :targets_must_be_array
|
|||||||
def create_rule_for_network_range(network_range)
|
def create_rule_for_network_range(network_range)
|
||||||
return nil unless matches_network_range?(network_range)
|
return nil unless matches_network_range?(network_range)
|
||||||
|
|
||||||
|
# For country policies, expand to largest matching ancestor
|
||||||
|
# This consolidates /24 rules into /16, /8, etc. when possible
|
||||||
|
expanded_range = find_largest_matching_ancestor(network_range)
|
||||||
|
|
||||||
|
# Check for existing supernet rules before attempting to create
|
||||||
|
if expanded_range.supernet_rules.any?
|
||||||
|
supernet = expanded_range.supernet_rules.first
|
||||||
|
Rails.logger.debug "Skipping rule creation for #{expanded_range.cidr} - covered by supernet rule ##{supernet.id} (#{supernet.network_range.cidr})"
|
||||||
|
return nil
|
||||||
|
end
|
||||||
|
|
||||||
|
# Try to create the rule, handling duplicates gracefully
|
||||||
|
begin
|
||||||
rule = Rule.create!(
|
rule = Rule.create!(
|
||||||
rule_type: 'network',
|
waf_rule_type: 'network',
|
||||||
action: policy_action,
|
waf_action: policy_action.to_sym,
|
||||||
network_range: network_range,
|
network_range: expanded_range,
|
||||||
waf_policy: self,
|
waf_policy: self,
|
||||||
user: user,
|
user: user,
|
||||||
source: "policy",
|
source: "policy",
|
||||||
metadata: build_rule_metadata(network_range),
|
metadata: build_rule_metadata(expanded_range),
|
||||||
priority: network_range.prefix_length
|
priority: expanded_range.prefix_length
|
||||||
)
|
)
|
||||||
|
rescue ActiveRecord::RecordNotUnique
|
||||||
|
# Rule already exists (created by another job or earlier in this job)
|
||||||
|
# Find and return the existing rule
|
||||||
|
Rails.logger.debug "Rule already exists for #{expanded_range.cidr} with policy #{name}"
|
||||||
|
return Rule.find_by(
|
||||||
|
waf_rule_type: 'network',
|
||||||
|
waf_action: policy_action,
|
||||||
|
network_range: expanded_range,
|
||||||
|
waf_policy: self,
|
||||||
|
source: "policy"
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
# Handle redirect/challenge specific data
|
# Handle redirect/challenge specific data
|
||||||
if redirect_action? && additional_data['redirect_url']
|
if redirect_action? && additional_data['redirect_url']
|
||||||
@@ -203,7 +228,7 @@ validate :targets_must_be_array
|
|||||||
|
|
||||||
rule = Rule.create!(
|
rule = Rule.create!(
|
||||||
waf_rule_type: 'path_pattern',
|
waf_rule_type: 'path_pattern',
|
||||||
waf_action: policy_action,
|
waf_action: policy_action.to_sym,
|
||||||
waf_policy: self,
|
waf_policy: self,
|
||||||
user: user,
|
user: user,
|
||||||
source: "policy",
|
source: "policy",
|
||||||
@@ -484,6 +509,64 @@ validate :targets_must_be_array
|
|||||||
base_metadata.merge!(additional_data || {})
|
base_metadata.merge!(additional_data || {})
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# For country policies, find the largest ancestor network that matches the same country
|
||||||
|
# This allows consolidating /24 rules into /16, /8, etc. when the entire block is in the same country
|
||||||
|
def find_largest_matching_ancestor(network_range)
|
||||||
|
return network_range unless country_policy?
|
||||||
|
|
||||||
|
country = network_range.country || network_range.inherited_intelligence[:country]
|
||||||
|
return network_range unless country
|
||||||
|
|
||||||
|
# Check if this network has IPAPI data with a larger CIDR (asn.route or ipapi_returned_cidr)
|
||||||
|
ipapi_cidr = network_range.network_data&.dig('ipapi', 'asn', 'route') ||
|
||||||
|
network_range.network_data&.dig('ipapi_returned_cidr')
|
||||||
|
|
||||||
|
if ipapi_cidr && ipapi_cidr != network_range.cidr
|
||||||
|
# IPAPI returned a larger network - use it if it exists
|
||||||
|
existing = NetworkRange.find_by(network: ipapi_cidr)
|
||||||
|
if existing
|
||||||
|
existing_country = existing.country || existing.inherited_intelligence[:country]
|
||||||
|
if existing_country == country
|
||||||
|
Rails.logger.debug "Using IPAPI CIDR #{existing.cidr} instead of #{network_range.cidr} (both #{country})"
|
||||||
|
return existing
|
||||||
|
end
|
||||||
|
else
|
||||||
|
# Create the IPAPI network range if it doesn't exist
|
||||||
|
begin
|
||||||
|
ipapi_network = NetworkRange.create!(
|
||||||
|
network: ipapi_cidr,
|
||||||
|
source: 'inherited',
|
||||||
|
country: country
|
||||||
|
)
|
||||||
|
Rails.logger.info "Created IPAPI network range #{ipapi_cidr} for country #{country}"
|
||||||
|
return ipapi_network
|
||||||
|
rescue ActiveRecord::RecordNotUnique
|
||||||
|
# Race condition - another process created it
|
||||||
|
existing = NetworkRange.find_by(network: ipapi_cidr)
|
||||||
|
return existing || network_range
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# Fallback: Look for existing parent networks with IPAPI data and same country
|
||||||
|
# Query for all networks that contain this network and have IPAPI data
|
||||||
|
parent_with_ipapi = NetworkRange.where(
|
||||||
|
"?::inet << network", network_range.cidr
|
||||||
|
).where(
|
||||||
|
"network_data ? 'ipapi' AND " \
|
||||||
|
"network_data -> 'ipapi' ->> 'location' ->> 'country_code' = ?",
|
||||||
|
country
|
||||||
|
).order("masklen(network) DESC").first
|
||||||
|
|
||||||
|
if parent_with_ipapi
|
||||||
|
Rails.logger.debug "Found existing IPAPI parent #{parent_with_ipapi.cidr} for #{network_range.cidr} (both #{country})"
|
||||||
|
return parent_with_ipapi
|
||||||
|
end
|
||||||
|
|
||||||
|
# No expansion possible - use original network
|
||||||
|
network_range
|
||||||
|
end
|
||||||
|
|
||||||
def matched_field(network_range)
|
def matched_field(network_range)
|
||||||
case policy_type
|
case policy_type
|
||||||
when 'country'
|
when 'country'
|
||||||
|
|||||||
@@ -2,15 +2,22 @@
|
|||||||
|
|
||||||
# Service for managing DuckDB analytics database
|
# Service for managing DuckDB analytics database
|
||||||
# Provides fast analytical queries on events data using columnar storage
|
# Provides fast analytical queries on events data using columnar storage
|
||||||
|
|
||||||
|
# INSTALL ducklake;
|
||||||
|
# INSTALL sqlite;
|
||||||
|
# ATTACH 'ducklake:sqlite3:storage/ducklake.sqlite3' AS events (DATA_PATH 'storage/ducklake/events.ducklake');
|
||||||
|
|
||||||
class AnalyticsDuckdbService
|
class AnalyticsDuckdbService
|
||||||
include Singleton
|
include Singleton
|
||||||
|
|
||||||
DUCKDB_PATH = Rails.root.join("storage", "analytics.duckdb").to_s
|
|
||||||
BATCH_SIZE = 10_000
|
BATCH_SIZE = 10_000
|
||||||
|
MAX_EVENTS_PER_SYNC = 50_000 # Limit events per job run to prevent OOM
|
||||||
|
|
||||||
# Execute block with connection, ensuring database and connection are closed afterward
|
# Execute block with DuckDB connection
|
||||||
|
# Always uses in-memory database (no file locks, no conflicts)
|
||||||
|
# Used for writing parquet files and querying parquet files
|
||||||
def with_connection
|
def with_connection
|
||||||
db = DuckDB::Database.open(DUCKDB_PATH)
|
db = DuckDB::Database.open(":memory:")
|
||||||
conn = db.connect
|
conn = db.connect
|
||||||
yield conn
|
yield conn
|
||||||
ensure
|
ensure
|
||||||
@@ -35,15 +42,47 @@ class AnalyticsDuckdbService
|
|||||||
is_proxy BOOLEAN,
|
is_proxy BOOLEAN,
|
||||||
is_bot BOOLEAN,
|
is_bot BOOLEAN,
|
||||||
waf_action INTEGER,
|
waf_action INTEGER,
|
||||||
|
request_method INTEGER,
|
||||||
|
response_status INTEGER,
|
||||||
|
rule_id BIGINT,
|
||||||
request_path VARCHAR,
|
request_path VARCHAR,
|
||||||
user_agent VARCHAR,
|
user_agent VARCHAR,
|
||||||
tags VARCHAR[]
|
tags VARCHAR[]
|
||||||
)
|
)
|
||||||
SQL
|
SQL
|
||||||
|
|
||||||
|
# Create indexes for common query patterns
|
||||||
|
create_indexes(conn)
|
||||||
|
|
||||||
Rails.logger.info "[DuckDB] Schema setup complete"
|
Rails.logger.info "[DuckDB] Schema setup complete"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Create indexes for fast querying
|
||||||
|
def create_indexes(conn)
|
||||||
|
indexes = [
|
||||||
|
"CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp DESC)",
|
||||||
|
"CREATE INDEX IF NOT EXISTS idx_events_network_range_id ON events(network_range_id)",
|
||||||
|
"CREATE INDEX IF NOT EXISTS idx_events_ip_address ON events(ip_address)",
|
||||||
|
"CREATE INDEX IF NOT EXISTS idx_events_waf_action ON events(waf_action)",
|
||||||
|
"CREATE INDEX IF NOT EXISTS idx_events_country ON events(country)",
|
||||||
|
"CREATE INDEX IF NOT EXISTS idx_events_company ON events(company)",
|
||||||
|
"CREATE INDEX IF NOT EXISTS idx_events_asn ON events(asn)",
|
||||||
|
"CREATE INDEX IF NOT EXISTS idx_events_rule_id ON events(rule_id)",
|
||||||
|
"CREATE INDEX IF NOT EXISTS idx_events_is_bot ON events(is_bot)",
|
||||||
|
"CREATE INDEX IF NOT EXISTS idx_events_is_datacenter ON events(is_datacenter)",
|
||||||
|
"CREATE INDEX IF NOT EXISTS idx_events_is_vpn ON events(is_vpn)",
|
||||||
|
"CREATE INDEX IF NOT EXISTS idx_events_is_proxy ON events(is_proxy)"
|
||||||
|
]
|
||||||
|
|
||||||
|
indexes.each do |index_sql|
|
||||||
|
conn.execute(index_sql)
|
||||||
|
end
|
||||||
|
|
||||||
|
Rails.logger.info "[DuckDB] Indexes created"
|
||||||
|
rescue StandardError => e
|
||||||
|
Rails.logger.warn "[DuckDB] Index creation warning: #{e.message}"
|
||||||
|
end
|
||||||
|
|
||||||
# Get timestamp of oldest event in DuckDB
|
# Get timestamp of oldest event in DuckDB
|
||||||
# Returns nil if table is empty
|
# Returns nil if table is empty
|
||||||
def oldest_event_timestamp
|
def oldest_event_timestamp
|
||||||
@@ -54,7 +93,7 @@ class AnalyticsDuckdbService
|
|||||||
end
|
end
|
||||||
rescue StandardError => e
|
rescue StandardError => e
|
||||||
Rails.logger.error "[DuckDB] Error getting oldest timestamp: #{e.message}"
|
Rails.logger.error "[DuckDB] Error getting oldest timestamp: #{e.message}"
|
||||||
nil
|
raise
|
||||||
end
|
end
|
||||||
|
|
||||||
# Get timestamp of newest event in DuckDB
|
# Get timestamp of newest event in DuckDB
|
||||||
@@ -67,7 +106,7 @@ class AnalyticsDuckdbService
|
|||||||
end
|
end
|
||||||
rescue StandardError => e
|
rescue StandardError => e
|
||||||
Rails.logger.error "[DuckDB] Error getting newest timestamp: #{e.message}"
|
Rails.logger.error "[DuckDB] Error getting newest timestamp: #{e.message}"
|
||||||
nil
|
raise
|
||||||
end
|
end
|
||||||
|
|
||||||
# Get maximum event ID already synced to DuckDB
|
# Get maximum event ID already synced to DuckDB
|
||||||
@@ -79,34 +118,39 @@ class AnalyticsDuckdbService
|
|||||||
end
|
end
|
||||||
rescue StandardError => e
|
rescue StandardError => e
|
||||||
Rails.logger.error "[DuckDB] Error getting max ID: #{e.message}"
|
Rails.logger.error "[DuckDB] Error getting max ID: #{e.message}"
|
||||||
0
|
raise
|
||||||
end
|
end
|
||||||
|
|
||||||
# Sync new events from PostgreSQL to DuckDB
|
# Export new events from PostgreSQL directly to timestamped Parquet file
|
||||||
# Uses PostgreSQL cursor for memory-efficient streaming
|
# Uses PostgreSQL cursor for memory-efficient streaming
|
||||||
# Uses Appender API for fast bulk inserts
|
# Writes to minute/YYYYMMDDHHmmss.parquet
|
||||||
# Filters by ID to avoid duplicates
|
# @param from_timestamp [Time] Start timestamp to export from
|
||||||
def sync_new_events(from_timestamp)
|
# @param max_id [Integer] Maximum event ID already exported (to avoid duplicates)
|
||||||
total_synced = 0
|
# @return [Hash] { count: Integer, file_path: String, max_id: Integer }
|
||||||
|
def export_new_events_to_parquet(from_timestamp, max_id = 0)
|
||||||
|
ensure_parquet_directories
|
||||||
|
|
||||||
|
total_exported = 0
|
||||||
|
exported_max_id = max_id
|
||||||
|
timestamp = Time.current.utc.strftime("%Y%m%d%H%M%S")
|
||||||
|
parquet_file = PARQUET_MINUTE_PATH.join("#{timestamp}.parquet")
|
||||||
|
|
||||||
|
Rails.logger.info "[Parquet] Exporting events from #{from_timestamp}, max_id=#{max_id} to #{parquet_file}"
|
||||||
|
start_time = Time.current
|
||||||
|
|
||||||
with_connection do |conn|
|
with_connection do |conn|
|
||||||
# Ensure table exists
|
# Create temporary table in memory
|
||||||
setup_schema(conn)
|
setup_schema(conn)
|
||||||
|
|
||||||
# Get max ID already in DuckDB to avoid duplicates
|
|
||||||
max_id_result = conn.query("SELECT COALESCE(MAX(id), 0) as max_id FROM events")
|
|
||||||
max_id = max_id_result.first&.first || 0
|
|
||||||
Rails.logger.info "[DuckDB] Syncing events from #{from_timestamp}, max_id=#{max_id}"
|
|
||||||
|
|
||||||
start_time = Time.current
|
|
||||||
appender = nil
|
appender = nil
|
||||||
batch_count = 0
|
batch_count = 0
|
||||||
|
|
||||||
begin
|
begin
|
||||||
# Create initial appender
|
# Create appender for in-memory table
|
||||||
appender = conn.appender("events")
|
appender = conn.appender("events")
|
||||||
|
|
||||||
# Use PostgreSQL cursor for memory-efficient streaming
|
# Stream from PostgreSQL cursor and append to DuckDB in-memory table
|
||||||
|
# Limit to MAX_EVENTS_PER_SYNC to prevent OOM on large backlogs
|
||||||
Event.where("timestamp >= ? AND id > ?", from_timestamp, max_id)
|
Event.where("timestamp >= ? AND id > ?", from_timestamp, max_id)
|
||||||
.select(
|
.select(
|
||||||
:id,
|
:id,
|
||||||
@@ -122,13 +166,16 @@ class AnalyticsDuckdbService
|
|||||||
:is_proxy,
|
:is_proxy,
|
||||||
:is_bot,
|
:is_bot,
|
||||||
:waf_action,
|
:waf_action,
|
||||||
|
:request_method,
|
||||||
|
:response_status,
|
||||||
|
:rule_id,
|
||||||
:request_path,
|
:request_path,
|
||||||
:user_agent,
|
:user_agent,
|
||||||
:tags
|
:tags
|
||||||
)
|
)
|
||||||
.order(:id)
|
.order(:id)
|
||||||
|
.limit(MAX_EVENTS_PER_SYNC)
|
||||||
.each_row(block_size: BATCH_SIZE) do |event_data|
|
.each_row(block_size: BATCH_SIZE) do |event_data|
|
||||||
# Unpack event data from cursor row (Hash from each_row)
|
|
||||||
begin
|
begin
|
||||||
appender.append_row(
|
appender.append_row(
|
||||||
event_data["id"],
|
event_data["id"],
|
||||||
@@ -144,45 +191,63 @@ class AnalyticsDuckdbService
|
|||||||
event_data["is_proxy"],
|
event_data["is_proxy"],
|
||||||
event_data["is_bot"],
|
event_data["is_bot"],
|
||||||
event_data["waf_action"],
|
event_data["waf_action"],
|
||||||
|
event_data["request_method"],
|
||||||
|
event_data["response_status"],
|
||||||
|
event_data["rule_id"],
|
||||||
event_data["request_path"],
|
event_data["request_path"],
|
||||||
event_data["user_agent"],
|
event_data["user_agent"],
|
||||||
event_data["tags"] || []
|
event_data["tags"] || []
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Track maximum exported ID
|
||||||
|
exported_max_id = [exported_max_id, event_data["id"]].max
|
||||||
rescue StandardError => e
|
rescue StandardError => e
|
||||||
Rails.logger.error "[DuckDB] Error appending event #{event_data['id']}: #{e.message}"
|
Rails.logger.error "[Parquet] Error appending event #{event_data['id']}: #{e.message}"
|
||||||
Rails.logger.error "[DuckDB] event_data = #{event_data.inspect}"
|
Rails.logger.error "[Parquet] event_data = #{event_data.inspect}"
|
||||||
raise
|
raise
|
||||||
end
|
end
|
||||||
|
|
||||||
batch_count += 1
|
batch_count += 1
|
||||||
total_synced += 1
|
total_exported += 1
|
||||||
|
|
||||||
# Flush and recreate appender every BATCH_SIZE events to avoid chunk overflow
|
# Flush and recreate appender every BATCH_SIZE events to avoid chunk overflow
|
||||||
if batch_count % BATCH_SIZE == 0
|
if batch_count % BATCH_SIZE == 0
|
||||||
appender.close
|
appender.close
|
||||||
appender = conn.appender("events")
|
appender = conn.appender("events")
|
||||||
Rails.logger.info "[DuckDB] Synced batch (total: #{total_synced} events)"
|
Rails.logger.info "[Parquet] Loaded batch (total: #{total_exported} events)"
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# Close final appender
|
# Close appender
|
||||||
appender&.close
|
appender&.close
|
||||||
|
|
||||||
|
# Export in-memory table to parquet file
|
||||||
|
conn.execute(<<~SQL)
|
||||||
|
COPY (SELECT * FROM events ORDER BY timestamp DESC)
|
||||||
|
TO '#{parquet_file}' (FORMAT PARQUET, COMPRESSION ZSTD)
|
||||||
|
SQL
|
||||||
|
|
||||||
duration = Time.current - start_time
|
duration = Time.current - start_time
|
||||||
rate = total_synced / duration if duration > 0
|
rate = total_exported / duration if duration > 0
|
||||||
Rails.logger.info "[DuckDB] Sync complete: #{total_synced} events in #{duration.round(2)}s (~#{rate&.round(0)} events/sec)"
|
|
||||||
|
# Log completion and check if there are more events to export
|
||||||
|
if total_exported >= MAX_EVENTS_PER_SYNC
|
||||||
|
Rails.logger.info "[Parquet] Export complete: #{total_exported} events in #{duration.round(2)}s (~#{rate&.round(0)} events/sec) - hit limit, more events may be pending"
|
||||||
|
else
|
||||||
|
Rails.logger.info "[Parquet] Export complete: #{total_exported} events in #{duration.round(2)}s (~#{rate&.round(0)} events/sec)"
|
||||||
|
end
|
||||||
rescue StandardError => e
|
rescue StandardError => e
|
||||||
appender&.close rescue nil # Ensure appender is closed on error
|
appender&.close rescue nil # Ensure appender is closed on error
|
||||||
Rails.logger.error "[DuckDB] Error syncing events: #{e.message}"
|
Rails.logger.error "[Parquet] Error exporting events: #{e.message}"
|
||||||
Rails.logger.error e.backtrace.join("\n")
|
Rails.logger.error e.backtrace.join("\n")
|
||||||
raise # Re-raise to be caught by outer rescue
|
raise # Re-raise to be caught by outer rescue
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
total_synced
|
{ count: total_exported, file_path: parquet_file.to_s, max_id: exported_max_id }
|
||||||
rescue StandardError => e
|
rescue StandardError => e
|
||||||
Rails.logger.error "[DuckDB] Sync failed: #{e.message}"
|
Rails.logger.error "[Parquet] Export failed: #{e.message}"
|
||||||
0
|
raise
|
||||||
end
|
end
|
||||||
|
|
||||||
# Execute analytical query on DuckDB
|
# Execute analytical query on DuckDB
|
||||||
@@ -205,7 +270,7 @@ class AnalyticsDuckdbService
|
|||||||
end
|
end
|
||||||
rescue StandardError => e
|
rescue StandardError => e
|
||||||
Rails.logger.error "[DuckDB] Error getting event count: #{e.message}"
|
Rails.logger.error "[DuckDB] Error getting event count: #{e.message}"
|
||||||
0
|
raise
|
||||||
end
|
end
|
||||||
|
|
||||||
# Analytics query: Total events since timestamp
|
# Analytics query: Total events since timestamp
|
||||||
@@ -290,4 +355,254 @@ class AnalyticsDuckdbService
|
|||||||
@connection&.close
|
@connection&.close
|
||||||
@connection = nil
|
@connection = nil
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# PARQUET EXPORT SYSTEM
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
PARQUET_BASE_PATH = Rails.root.join("storage", "parquet")
|
||||||
|
PARQUET_MINUTE_PATH = PARQUET_BASE_PATH.join("minute")
|
||||||
|
PARQUET_HOURS_PATH = PARQUET_BASE_PATH.join("hours")
|
||||||
|
PARQUET_DAYS_PATH = PARQUET_BASE_PATH.join("days")
|
||||||
|
PARQUET_WEEKS_PATH = PARQUET_BASE_PATH.join("weeks")
|
||||||
|
WEEK_RETENTION = ENV.fetch("PARQUET_WEEK_RETENTION", 104).to_i # Keep N weeks (default: 104 = 2 years)
|
||||||
|
|
||||||
|
# One-time export of entire DuckDB to Parquet (bootstrap)
|
||||||
|
# Exports all data and organizes into week files
|
||||||
|
# Memory-efficient: processes one week at a time with new connections
|
||||||
|
def export_all_to_parquet
|
||||||
|
ensure_parquet_directories
|
||||||
|
|
||||||
|
# Get date range first, then close connection
|
||||||
|
min_time, max_time = with_connection do |conn|
|
||||||
|
result = conn.query("SELECT MIN(timestamp) as min_time, MAX(timestamp) as max_time FROM events")
|
||||||
|
row = result.first
|
||||||
|
return unless row && row[0] && row[1]
|
||||||
|
|
||||||
|
[Time.parse(row[0].to_s), Time.parse(row[1].to_s)]
|
||||||
|
end
|
||||||
|
|
||||||
|
Rails.logger.info "[Parquet] Exporting all events from #{min_time} to #{max_time}"
|
||||||
|
|
||||||
|
# Export week by week with separate connections (more memory efficient)
|
||||||
|
current_week_start = min_time.beginning_of_week
|
||||||
|
weeks_exported = 0
|
||||||
|
|
||||||
|
while current_week_start <= max_time
|
||||||
|
week_end = current_week_start.end_of_week
|
||||||
|
|
||||||
|
year = current_week_start.year
|
||||||
|
week_num = current_week_start.strftime("%U").to_i
|
||||||
|
week_file = PARQUET_WEEKS_PATH.join("#{year}-#{week_num.to_s.rjust(2, '0')}.parquet")
|
||||||
|
|
||||||
|
# Skip if week file already exists
|
||||||
|
unless File.exist?(week_file)
|
||||||
|
Rails.logger.info "[Parquet] Exporting week #{year}-#{week_num} (#{current_week_start} to #{week_end})"
|
||||||
|
|
||||||
|
# Use separate connection per week to limit memory usage
|
||||||
|
with_connection do |conn|
|
||||||
|
# COPY directly without ORDER BY to save memory
|
||||||
|
# Parquet files can be sorted during queries if needed
|
||||||
|
conn.execute(<<~SQL)
|
||||||
|
COPY (
|
||||||
|
SELECT * FROM events
|
||||||
|
WHERE timestamp >= '#{current_week_start.iso8601}'
|
||||||
|
AND timestamp < '#{week_end.iso8601}'
|
||||||
|
) TO '#{week_file}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 100000)
|
||||||
|
SQL
|
||||||
|
end
|
||||||
|
|
||||||
|
weeks_exported += 1
|
||||||
|
|
||||||
|
# Force garbage collection after each week to free memory
|
||||||
|
GC.start
|
||||||
|
end
|
||||||
|
|
||||||
|
current_week_start += 1.week
|
||||||
|
end
|
||||||
|
|
||||||
|
Rails.logger.info "[Parquet] Bootstrap complete: exported #{weeks_exported} weeks"
|
||||||
|
rescue StandardError => e
|
||||||
|
Rails.logger.error "[Parquet] Error in bootstrap export: #{e.message}"
|
||||||
|
Rails.logger.error e.backtrace.join("\n")
|
||||||
|
raise
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
# Consolidate completed hour's minute files into hour file
|
||||||
|
# @param time [Time] The hour to consolidate
|
||||||
|
def consolidate_hour_to_day(time)
|
||||||
|
ensure_parquet_directories
|
||||||
|
|
||||||
|
hour = time.utc.hour
|
||||||
|
day_of_year = time.utc.yday
|
||||||
|
hour_file = PARQUET_HOURS_PATH.join("#{hour.to_s.rjust(2, '0')}.parquet")
|
||||||
|
hour_temp_file = PARQUET_HOURS_PATH.join("#{hour.to_s.rjust(2, '0')}.parquet.temp")
|
||||||
|
day_file = PARQUET_DAYS_PATH.join("#{day_of_year.to_s.rjust(3, '0')}.parquet")
|
||||||
|
day_temp_file = PARQUET_DAYS_PATH.join("#{day_of_year.to_s.rjust(3, '0')}.parquet.temp")
|
||||||
|
|
||||||
|
# Find all minute files from previous hour
|
||||||
|
hour_prefix = time.utc.strftime("%Y%m%d%H")
|
||||||
|
minute_files = Dir.glob(PARQUET_MINUTE_PATH.join("#{hour_prefix}*.parquet"))
|
||||||
|
|
||||||
|
if minute_files.empty?
|
||||||
|
Rails.logger.info "[Parquet] No minute files found for hour #{hour_prefix}"
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
with_connection do |conn|
|
||||||
|
Rails.logger.info "[Parquet] Consolidating #{minute_files.size} minute files from hour #{hour} into day #{day_of_year}"
|
||||||
|
|
||||||
|
# Merge minute files into hour file using .temp
|
||||||
|
file_list = minute_files.map { |f| "'#{f}'" }.join(", ")
|
||||||
|
conn.execute(<<~SQL)
|
||||||
|
COPY (
|
||||||
|
SELECT * FROM read_parquet([#{file_list}])
|
||||||
|
ORDER BY timestamp DESC
|
||||||
|
) TO '#{hour_temp_file}' (FORMAT PARQUET, COMPRESSION ZSTD)
|
||||||
|
SQL
|
||||||
|
|
||||||
|
# Atomic rename
|
||||||
|
FileUtils.mv(hour_temp_file, hour_file, force: true)
|
||||||
|
|
||||||
|
# Now merge hour file into day file
|
||||||
|
if File.exist?(day_file)
|
||||||
|
# Merge hour data into existing day file
|
||||||
|
conn.execute(<<~SQL)
|
||||||
|
COPY (
|
||||||
|
SELECT * FROM read_parquet(['#{day_file}', '#{hour_file}'])
|
||||||
|
ORDER BY timestamp DESC
|
||||||
|
) TO '#{day_temp_file}' (FORMAT PARQUET, COMPRESSION ZSTD)
|
||||||
|
SQL
|
||||||
|
|
||||||
|
# Replace old day file with merged file
|
||||||
|
FileUtils.mv(day_temp_file, day_file, force: true)
|
||||||
|
|
||||||
|
# Delete hour file after merging into day
|
||||||
|
File.delete(hour_file)
|
||||||
|
else
|
||||||
|
# First hour of the day - just rename hour file to day file
|
||||||
|
FileUtils.mv(hour_file, day_file)
|
||||||
|
end
|
||||||
|
|
||||||
|
# Delete the minute files after successful consolidation
|
||||||
|
minute_files.each { |f| File.delete(f) }
|
||||||
|
Rails.logger.info "[Parquet] Consolidated #{minute_files.size} minute files into #{day_file}, deleted source files"
|
||||||
|
end
|
||||||
|
rescue StandardError => e
|
||||||
|
Rails.logger.error "[Parquet] Error consolidating hour: #{e.message}"
|
||||||
|
Rails.logger.error e.backtrace.join("\n")
|
||||||
|
raise
|
||||||
|
end
|
||||||
|
|
||||||
|
# Consolidate completed week into archive
|
||||||
|
# @param week_start [Time] The start of the week to consolidate
|
||||||
|
def consolidate_days_to_week(week_start)
|
||||||
|
ensure_parquet_directories
|
||||||
|
|
||||||
|
year = week_start.year
|
||||||
|
week_num = week_start.strftime("%U").to_i # Week number (00-53)
|
||||||
|
week_file = PARQUET_WEEKS_PATH.join("#{year}-#{week_num.to_s.rjust(2, '0')}.parquet")
|
||||||
|
week_temp_file = PARQUET_WEEKS_PATH.join("#{year}-#{week_num.to_s.rjust(2, '0')}.parquet.temp")
|
||||||
|
|
||||||
|
# Collect day files for this week (7 days)
|
||||||
|
day_files = (0..6).map do |offset|
|
||||||
|
day = week_start + offset.days
|
||||||
|
day_of_year = day.yday
|
||||||
|
PARQUET_DAYS_PATH.join("#{day_of_year.to_s.rjust(3, '0')}.parquet")
|
||||||
|
end.select { |f| File.exist?(f) }
|
||||||
|
|
||||||
|
return if day_files.empty?
|
||||||
|
|
||||||
|
with_connection do |conn|
|
||||||
|
Rails.logger.info "[Parquet] Consolidating #{day_files.size} days into week #{year}-#{week_num}"
|
||||||
|
|
||||||
|
# Merge all day files into week archive using .temp
|
||||||
|
file_list = day_files.map { |f| "'#{f}'" }.join(", ")
|
||||||
|
|
||||||
|
conn.execute(<<~SQL)
|
||||||
|
COPY (
|
||||||
|
SELECT * FROM read_parquet([#{file_list}])
|
||||||
|
ORDER BY timestamp DESC
|
||||||
|
) TO '#{week_temp_file}' (FORMAT PARQUET, COMPRESSION ZSTD)
|
||||||
|
SQL
|
||||||
|
|
||||||
|
# Atomic rename
|
||||||
|
FileUtils.mv(week_temp_file, week_file, force: true)
|
||||||
|
|
||||||
|
# Delete day files after successful consolidation
|
||||||
|
day_files.each { |f| File.delete(f) }
|
||||||
|
|
||||||
|
Rails.logger.info "[Parquet] Consolidated week #{year}-#{week_num}, deleted #{day_files.size} day files"
|
||||||
|
end
|
||||||
|
|
||||||
|
# Cleanup old weeks
|
||||||
|
cleanup_old_weeks
|
||||||
|
rescue StandardError => e
|
||||||
|
Rails.logger.error "[Parquet] Error consolidating week: #{e.message}"
|
||||||
|
Rails.logger.error e.backtrace.join("\n")
|
||||||
|
raise
|
||||||
|
end
|
||||||
|
|
||||||
|
# Build list of Parquet files to query for a given time range
|
||||||
|
# @param start_time [Time] Start of query range
|
||||||
|
# @param end_time [Time] End of query range (defaults to now)
|
||||||
|
# @return [Array<String>] List of Parquet file paths
|
||||||
|
def parquet_files_for_range(start_time, end_time = Time.current)
|
||||||
|
files = []
|
||||||
|
|
||||||
|
# Add minute files (most recent, not yet consolidated)
|
||||||
|
minute_files = Dir.glob(PARQUET_MINUTE_PATH.join("*.parquet"))
|
||||||
|
files.concat(minute_files)
|
||||||
|
|
||||||
|
# Add hour files (consolidated but not yet in day files)
|
||||||
|
hour_files = Dir.glob(PARQUET_HOURS_PATH.join("*.parquet"))
|
||||||
|
files.concat(hour_files)
|
||||||
|
|
||||||
|
# Add relevant day files
|
||||||
|
day_files = Dir.glob(PARQUET_DAYS_PATH.join("*.parquet"))
|
||||||
|
files.concat(day_files)
|
||||||
|
|
||||||
|
# Add relevant week files based on time range
|
||||||
|
# For simplicity, include all weeks (DuckDB will filter)
|
||||||
|
week_files = Dir.glob(PARQUET_WEEKS_PATH.join("*.parquet"))
|
||||||
|
files.concat(week_files)
|
||||||
|
|
||||||
|
files.sort
|
||||||
|
end
|
||||||
|
|
||||||
|
# Query Parquet files using in-memory DuckDB (no file locks)
|
||||||
|
# @param block [Block] Block that receives DuckDB connection
|
||||||
|
def with_parquet_connection(&block)
|
||||||
|
# Open in-memory DuckDB database (no file locks)
|
||||||
|
db = DuckDB::Database.open(":memory:")
|
||||||
|
conn = db.connect
|
||||||
|
|
||||||
|
yield conn
|
||||||
|
ensure
|
||||||
|
conn&.close
|
||||||
|
db&.close
|
||||||
|
end
|
||||||
|
|
||||||
|
# Cleanup old week archives beyond retention period
|
||||||
|
def cleanup_old_weeks
|
||||||
|
week_files = Dir.glob(PARQUET_WEEKS_PATH.join("*.parquet")).sort.reverse
|
||||||
|
|
||||||
|
if week_files.size > WEEK_RETENTION
|
||||||
|
files_to_delete = week_files[WEEK_RETENTION..-1]
|
||||||
|
files_to_delete.each do |file|
|
||||||
|
File.delete(file)
|
||||||
|
Rails.logger.info "[Parquet] Deleted old week archive: #{file}"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
# Ensure Parquet directory structure exists
|
||||||
|
def ensure_parquet_directories
|
||||||
|
[PARQUET_MINUTE_PATH, PARQUET_HOURS_PATH, PARQUET_DAYS_PATH, PARQUET_WEEKS_PATH].each do |path|
|
||||||
|
FileUtils.mkdir_p(path) unless Dir.exist?(path)
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
116
app/services/event_tagger.rb
Normal file
116
app/services/event_tagger.rb
Normal file
@@ -0,0 +1,116 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
# EventTagger - Service for applying tags to events
|
||||||
|
#
|
||||||
|
# Centralizes tagging logic to keep Event model focused on data management.
|
||||||
|
# Tags can come from multiple sources:
|
||||||
|
# 1. Agent-provided tags (from payload)
|
||||||
|
# 2. Matched rule tags (from rule.metadata['tags'])
|
||||||
|
# 3. Future: Policy-based tags, network intelligence tags, etc.
|
||||||
|
#
|
||||||
|
# Usage:
|
||||||
|
# EventTagger.tag_event(event) # Tag single event
|
||||||
|
# EventTagger.tag_batch(Event.where(...)) # Efficiently tag multiple events
|
||||||
|
# EventTagger.retag_for_rule(rule) # Retag all events for a specific rule
|
||||||
|
class EventTagger
|
||||||
|
# Tag a single event with rule tags
|
||||||
|
#
|
||||||
|
# @param event [Event] The event to tag
|
||||||
|
# @return [Array<String>] The final array of tags applied
|
||||||
|
def self.tag_event(event)
|
||||||
|
tags = []
|
||||||
|
|
||||||
|
# 1. Keep agent-provided tags (if any)
|
||||||
|
tags += event.payload&.dig("tags") || []
|
||||||
|
|
||||||
|
# 2. Add tags from matched rule (if any)
|
||||||
|
if event.rule_id.present?
|
||||||
|
rule = event.rule
|
||||||
|
tags += rule&.tags || []
|
||||||
|
end
|
||||||
|
|
||||||
|
# 3. Future: Add tags from policies, network intelligence, etc.
|
||||||
|
# tags += apply_policy_tags(event)
|
||||||
|
# tags += apply_network_tags(event)
|
||||||
|
|
||||||
|
# Deduplicate and update
|
||||||
|
final_tags = tags.uniq
|
||||||
|
event.update_column(:tags, final_tags)
|
||||||
|
final_tags
|
||||||
|
end
|
||||||
|
|
||||||
|
# Efficiently tag multiple events with preloaded rules
|
||||||
|
#
|
||||||
|
# @param events [ActiveRecord::Relation, Array<Event>] Events to tag
|
||||||
|
# @return [Integer] Number of events tagged
|
||||||
|
def self.tag_batch(events)
|
||||||
|
events = events.to_a if events.is_a?(ActiveRecord::Relation)
|
||||||
|
return 0 if events.empty?
|
||||||
|
|
||||||
|
# Preload rules to avoid N+1 queries
|
||||||
|
rule_ids = events.map(&:rule_id).compact.uniq
|
||||||
|
rules_by_id = Rule.where(id: rule_ids).index_by(&:id)
|
||||||
|
|
||||||
|
tagged_count = 0
|
||||||
|
|
||||||
|
events.each do |event|
|
||||||
|
tags = event.payload&.dig("tags") || []
|
||||||
|
|
||||||
|
# Add rule tags if rule exists
|
||||||
|
if event.rule_id && rules_by_id[event.rule_id]
|
||||||
|
tags += rules_by_id[event.rule_id].tags
|
||||||
|
end
|
||||||
|
|
||||||
|
# Update tags
|
||||||
|
event.update_column(:tags, tags.uniq)
|
||||||
|
tagged_count += 1
|
||||||
|
end
|
||||||
|
|
||||||
|
tagged_count
|
||||||
|
end
|
||||||
|
|
||||||
|
# Retag all events that matched a specific rule
|
||||||
|
# Useful when a rule's tags are updated
|
||||||
|
#
|
||||||
|
# @param rule [Rule] The rule whose events should be retagged
|
||||||
|
# @param limit [Integer] Maximum number of events to retag (default: no limit)
|
||||||
|
# @return [Integer] Number of events retagged
|
||||||
|
def self.retag_for_rule(rule, limit: nil)
|
||||||
|
events = Event.where(rule_id: rule.id)
|
||||||
|
events = events.limit(limit) if limit
|
||||||
|
tag_batch(events)
|
||||||
|
end
|
||||||
|
|
||||||
|
# Retag all events (useful for bulk migrations or fixes)
|
||||||
|
#
|
||||||
|
# @param batch_size [Integer] Number of events to process at once
|
||||||
|
# @return [Integer] Total number of events retagged
|
||||||
|
def self.retag_all(batch_size: 1000)
|
||||||
|
total = 0
|
||||||
|
|
||||||
|
Event.find_in_batches(batch_size: batch_size) do |batch|
|
||||||
|
total += tag_batch(batch)
|
||||||
|
Rails.logger.info "[EventTagger] Retagged #{total} events..."
|
||||||
|
end
|
||||||
|
|
||||||
|
total
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
# Future: Apply policy-based tags
|
||||||
|
# def self.apply_policy_tags(event)
|
||||||
|
# tags = []
|
||||||
|
# # Check if event matches any policy conditions
|
||||||
|
# # Add tags based on policy matches
|
||||||
|
# tags
|
||||||
|
# end
|
||||||
|
|
||||||
|
# Future: Apply network intelligence tags
|
||||||
|
# def self.apply_network_tags(event)
|
||||||
|
# tags = []
|
||||||
|
# # Add tags based on network_range attributes
|
||||||
|
# # e.g., ["datacenter", "vpn", "proxy", "country:US"]
|
||||||
|
# tags
|
||||||
|
# end
|
||||||
|
end
|
||||||
@@ -185,7 +185,13 @@
|
|||||||
<div class="bg-white shadow rounded-lg">
|
<div class="bg-white shadow rounded-lg">
|
||||||
<div class="px-6 py-4 border-b border-gray-200">
|
<div class="px-6 py-4 border-b border-gray-200">
|
||||||
<div class="flex items-center justify-between">
|
<div class="flex items-center justify-between">
|
||||||
<h3 class="text-lg font-medium text-gray-900">Events Timeline (Last 24 Hours)</h3>
|
<h3 class="text-lg font-medium text-gray-900">Events Timeline (<%= case @time_period
|
||||||
|
when :hour then "Last Hour"
|
||||||
|
when :day then "Last 24 Hours"
|
||||||
|
when :week then "Last 7 Days"
|
||||||
|
when :month then "Last 30 Days"
|
||||||
|
else "Last 24 Hours"
|
||||||
|
end %>)</h3>
|
||||||
<span class="text-sm text-gray-500">Times shown in your local timezone</span>
|
<span class="text-sm text-gray-500">Times shown in your local timezone</span>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
@@ -381,7 +387,7 @@
|
|||||||
<h3 class="text-lg font-medium text-gray-900">Quick Actions</h3>
|
<h3 class="text-lg font-medium text-gray-900">Quick Actions</h3>
|
||||||
</div>
|
</div>
|
||||||
<div class="p-6">
|
<div class="p-6">
|
||||||
<div class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-4 gap-4">
|
<div class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-5 gap-4">
|
||||||
<%= link_to new_rule_path, class: "flex items-center justify-center px-4 py-3 bg-blue-600 text-white rounded-md hover:bg-blue-700 transition-colors" do %>
|
<%= link_to new_rule_path, class: "flex items-center justify-center px-4 py-3 bg-blue-600 text-white rounded-md hover:bg-blue-700 transition-colors" do %>
|
||||||
<svg class="w-5 h-5 mr-2" fill="currentColor" viewBox="0 0 24 24">
|
<svg class="w-5 h-5 mr-2" fill="currentColor" viewBox="0 0 24 24">
|
||||||
<path d="M19 13h-6v6h-2v-6H5v-2h6V5h2v6h6v2z"/>
|
<path d="M19 13h-6v6h-2v-6H5v-2h6V5h2v6h6v2z"/>
|
||||||
@@ -393,17 +399,24 @@
|
|||||||
<svg class="w-5 h-5 mr-2" fill="currentColor" viewBox="0 0 24 24">
|
<svg class="w-5 h-5 mr-2" fill="currentColor" viewBox="0 0 24 24">
|
||||||
<path d="M19 13h-6v6h-2v-6H5v-2h6V5h2v6h6v2z"/>
|
<path d="M19 13h-6v6h-2v-6H5v-2h6V5h2v6h6v2z"/>
|
||||||
</svg>
|
</svg>
|
||||||
Add Network Range
|
Add Network
|
||||||
<% end %>
|
<% end %>
|
||||||
|
|
||||||
<%= link_to events_path, class: "flex items-center justify-center px-4 py-3 bg-purple-600 text-white rounded-md hover:bg-purple-700 transition-colors" do %>
|
<%= link_to analytics_networks_path, class: "flex items-center justify-center px-4 py-3 bg-purple-600 text-white rounded-md hover:bg-purple-700 transition-colors" do %>
|
||||||
|
<svg class="w-5 h-5 mr-2" fill="currentColor" viewBox="0 0 24 24">
|
||||||
|
<path d="M12 2l3.09 6.26L22 9.27l-5 4.87 1.18 6.88L12 17.77l-6.18 3.25L7 14.14 2 9.27l6.91-1.01L12 2z"/>
|
||||||
|
</svg>
|
||||||
|
Network Analytics
|
||||||
|
<% end %>
|
||||||
|
|
||||||
|
<%= link_to events_path, class: "flex items-center justify-center px-4 py-3 bg-orange-600 text-white rounded-md hover:bg-orange-700 transition-colors" do %>
|
||||||
<svg class="w-5 h-5 mr-2" fill="currentColor" viewBox="0 0 24 24">
|
<svg class="w-5 h-5 mr-2" fill="currentColor" viewBox="0 0 24 24">
|
||||||
<path d="M12 2C6.48 2 2 6.48 2 12s4.48 10 10 10 10-4.48 10-10S17.52 2 12 2zm1 15h-2v-2h2v2zm0-4h-2V7h2v6z"/>
|
<path d="M12 2C6.48 2 2 6.48 2 12s4.48 10 10 10 10-4.48 10-10S17.52 2 12 2zm1 15h-2v-2h2v2zm0-4h-2V7h2v6z"/>
|
||||||
</svg>
|
</svg>
|
||||||
View Events
|
View Events
|
||||||
<% end %>
|
<% end %>
|
||||||
|
|
||||||
<%= link_to rules_path, class: "flex items-center justify-center px-4 py-3 bg-orange-600 text-white rounded-md hover:bg-orange-700 transition-colors" do %>
|
<%= link_to rules_path, class: "flex items-center justify-center px-4 py-3 bg-gray-600 text-white rounded-md hover:bg-gray-700 transition-colors" do %>
|
||||||
<svg class="w-5 h-5 mr-2" fill="currentColor" viewBox="0 0 24 24">
|
<svg class="w-5 h-5 mr-2" fill="currentColor" viewBox="0 0 24 24">
|
||||||
<path d="M12 1L3 5v6c0 5.55 3.84 10.74 9 12 5.16-1.26 9-6.45 9-12V5l-9-4z"/>
|
<path d="M12 1L3 5v6c0 5.55 3.84 10.74 9 12 5.16-1.26 9-6.45 9-12V5l-9-4z"/>
|
||||||
</svg>
|
</svg>
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
# If running the rails server then create or migrate existing database
|
# If running the rails server then create or migrate existing database
|
||||||
if [ "${@: -2:1}" == "./bin/rails" ] && [ "${@: -1:1}" == "server" ]; then
|
if [ "${@: -2:1}" == "./bin/rails" ] && [ "${@: -1:1}" == "server" ]; then
|
||||||
./bin/rails db:prepare
|
./bin/rails db:prepare
|
||||||
|
./bin/rails ducklake:setup
|
||||||
fi
|
fi
|
||||||
|
|
||||||
exec "${@}"
|
exec "${@}"
|
||||||
|
|||||||
@@ -55,6 +55,7 @@ production:
|
|||||||
database: baffle_hub_production
|
database: baffle_hub_production
|
||||||
username: baffle_hub
|
username: baffle_hub
|
||||||
password: <%= ENV["BAFFLE_HUB_DATABASE_PASSWORD"] %>
|
password: <%= ENV["BAFFLE_HUB_DATABASE_PASSWORD"] %>
|
||||||
|
pool: 80
|
||||||
cache:
|
cache:
|
||||||
<<: *sqlite_default
|
<<: *sqlite_default
|
||||||
database: storage/production_cache.sqlite3
|
database: storage/production_cache.sqlite3
|
||||||
|
|||||||
@@ -140,7 +140,7 @@ end
|
|||||||
|
|
||||||
# Add application-specific context
|
# Add application-specific context
|
||||||
app_version = begin
|
app_version = begin
|
||||||
File.read(Rails.root.join('VERSION')).strip
|
BaffleHub::VERSION
|
||||||
rescue
|
rescue
|
||||||
ENV['APP_VERSION'] || ENV['GIT_COMMIT_SHA']&.[](0..7) || 'unknown'
|
ENV['APP_VERSION'] || ENV['GIT_COMMIT_SHA']&.[](0..7) || 'unknown'
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
# frozen_string_literal: true
|
# frozen_string_literal: true
|
||||||
|
|
||||||
module BaffleHub
|
module BaffleHub
|
||||||
VERSION = "0.3.0"
|
VERSION = "0.4.0"
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ default: &default
|
|||||||
batch_size: 500
|
batch_size: 500
|
||||||
workers:
|
workers:
|
||||||
- queues: "*"
|
- queues: "*"
|
||||||
threads: 3
|
threads: <%= ENV.fetch("JOB_THREADS", 3) %>
|
||||||
processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %>
|
processes: <%= ENV.fetch("JOB_PROCESSES", 1) %>
|
||||||
polling_interval: 0.1
|
polling_interval: 0.1
|
||||||
|
|
||||||
development:
|
development:
|
||||||
|
|||||||
@@ -30,8 +30,30 @@ cleanup_old_events:
|
|||||||
queue: background
|
queue: background
|
||||||
schedule: every hour
|
schedule: every hour
|
||||||
|
|
||||||
# Sync events from PostgreSQL to DuckDB for fast analytics
|
# Export events from PostgreSQL to DuckLake for fast analytics
|
||||||
sync_events_to_duckdb:
|
export_events_to_ducklake:
|
||||||
class: SyncEventsToDuckdbJob
|
class: ExportEventsToDucklakeJob
|
||||||
queue: default
|
queue: default
|
||||||
schedule: every 1 minutes
|
schedule: every 1 minutes
|
||||||
|
|
||||||
|
# Merge DuckLake files and clean up immediately after
|
||||||
|
merge_ducklake_files:
|
||||||
|
class: MergeDucklakeFilesJob
|
||||||
|
queue: background
|
||||||
|
schedule: every 15 minutes
|
||||||
|
|
||||||
|
# OLD PARQUET SYSTEM (DISABLED - using DuckLake now)
|
||||||
|
# export_events_to_parquet:
|
||||||
|
# class: ExportEventsToParquetJob
|
||||||
|
# queue: default
|
||||||
|
# schedule: every 1 minutes
|
||||||
|
#
|
||||||
|
# consolidate_parquet_hourly:
|
||||||
|
# class: ConsolidateParquetHourlyJob
|
||||||
|
# queue: default
|
||||||
|
# schedule: "5 * * * *" # At 5 minutes past every hour
|
||||||
|
#
|
||||||
|
# consolidate_parquet_weekly:
|
||||||
|
# class: ConsolidateParquetWeeklyJob
|
||||||
|
# queue: default
|
||||||
|
# schedule: "5 0 * * 1" # Monday at 00:05
|
||||||
|
|||||||
@@ -0,0 +1,67 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
class RemovePostgresEventsCountFromNetworkRanges < ActiveRecord::Migration[8.1]
|
||||||
|
def up
|
||||||
|
# Drop triggers first
|
||||||
|
execute <<-SQL
|
||||||
|
DROP TRIGGER IF EXISTS update_network_ranges_events_count_after_insert ON events;
|
||||||
|
DROP TRIGGER IF EXISTS update_network_ranges_events_count_after_delete ON events;
|
||||||
|
DROP FUNCTION IF EXISTS update_network_range_events_count();
|
||||||
|
SQL
|
||||||
|
|
||||||
|
# Remove index and column
|
||||||
|
remove_index :network_ranges, :events_count
|
||||||
|
remove_column :network_ranges, :events_count
|
||||||
|
end
|
||||||
|
|
||||||
|
def down
|
||||||
|
# Add column back (for rollback)
|
||||||
|
add_column :network_ranges, :events_count, :integer, null: false, default: 0
|
||||||
|
add_index :network_ranges, :events_count
|
||||||
|
|
||||||
|
# Recreate trigger function
|
||||||
|
execute <<-SQL
|
||||||
|
CREATE OR REPLACE FUNCTION update_network_range_events_count()
|
||||||
|
RETURNS TRIGGER AS $$
|
||||||
|
BEGIN
|
||||||
|
-- Update all network ranges that contain IP address
|
||||||
|
UPDATE network_ranges
|
||||||
|
SET events_count = events_count +
|
||||||
|
CASE
|
||||||
|
WHEN TG_OP = 'INSERT' THEN 1
|
||||||
|
WHEN TG_OP = 'DELETE' THEN -1
|
||||||
|
ELSE 0
|
||||||
|
END
|
||||||
|
WHERE network >>= NEW.ip_address::inet;
|
||||||
|
|
||||||
|
RETURN COALESCE(NEW, OLD);
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
SQL
|
||||||
|
|
||||||
|
# Recreate triggers
|
||||||
|
execute <<-SQL
|
||||||
|
CREATE TRIGGER update_network_ranges_events_count_after_insert
|
||||||
|
AFTER INSERT ON events
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE FUNCTION update_network_range_events_count();
|
||||||
|
SQL
|
||||||
|
|
||||||
|
execute <<-SQL
|
||||||
|
CREATE TRIGGER update_network_ranges_events_count_after_delete
|
||||||
|
AFTER DELETE ON events
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE FUNCTION update_network_range_events_count();
|
||||||
|
SQL
|
||||||
|
|
||||||
|
# Backfill existing counts
|
||||||
|
execute <<-SQL
|
||||||
|
UPDATE network_ranges
|
||||||
|
SET events_count = (
|
||||||
|
SELECT COUNT(*)
|
||||||
|
FROM events
|
||||||
|
WHERE events.ip_address <<= network_ranges.network
|
||||||
|
);
|
||||||
|
SQL
|
||||||
|
end
|
||||||
|
end
|
||||||
@@ -10,7 +10,7 @@
|
|||||||
#
|
#
|
||||||
# It's strongly recommended that you check this file into your version control system.
|
# It's strongly recommended that you check this file into your version control system.
|
||||||
|
|
||||||
ActiveRecord::Schema[8.1].define(version: 2025_11_20_003554) do
|
ActiveRecord::Schema[8.1].define(version: 2025_12_02_070000) do
|
||||||
# These are extensions that must be enabled in order to support this database
|
# These are extensions that must be enabled in order to support this database
|
||||||
enable_extension "pg_catalog.plpgsql"
|
enable_extension "pg_catalog.plpgsql"
|
||||||
|
|
||||||
@@ -128,7 +128,6 @@ ActiveRecord::Schema[8.1].define(version: 2025_11_20_003554) do
|
|||||||
t.string "country"
|
t.string "country"
|
||||||
t.datetime "created_at", null: false
|
t.datetime "created_at", null: false
|
||||||
t.text "creation_reason"
|
t.text "creation_reason"
|
||||||
t.integer "events_count", default: 0, null: false
|
|
||||||
t.boolean "is_datacenter", default: false
|
t.boolean "is_datacenter", default: false
|
||||||
t.boolean "is_proxy", default: false
|
t.boolean "is_proxy", default: false
|
||||||
t.boolean "is_vpn", default: false
|
t.boolean "is_vpn", default: false
|
||||||
@@ -143,7 +142,6 @@ ActiveRecord::Schema[8.1].define(version: 2025_11_20_003554) do
|
|||||||
t.index ["asn_org"], name: "index_network_ranges_on_asn_org"
|
t.index ["asn_org"], name: "index_network_ranges_on_asn_org"
|
||||||
t.index ["company"], name: "index_network_ranges_on_company"
|
t.index ["company"], name: "index_network_ranges_on_company"
|
||||||
t.index ["country"], name: "index_network_ranges_on_country"
|
t.index ["country"], name: "index_network_ranges_on_country"
|
||||||
t.index ["events_count"], name: "index_network_ranges_on_events_count"
|
|
||||||
t.index ["is_datacenter", "is_proxy", "is_vpn"], name: "idx_network_flags"
|
t.index ["is_datacenter", "is_proxy", "is_vpn"], name: "idx_network_flags"
|
||||||
t.index ["is_datacenter"], name: "index_network_ranges_on_is_datacenter"
|
t.index ["is_datacenter"], name: "index_network_ranges_on_is_datacenter"
|
||||||
t.index ["network"], name: "index_network_ranges_on_network", opclass: :inet_ops, using: :gist
|
t.index ["network"], name: "index_network_ranges_on_network", opclass: :inet_ops, using: :gist
|
||||||
|
|||||||
152
script/backfill_duckdb_new_columns.rb
Normal file
152
script/backfill_duckdb_new_columns.rb
Normal file
@@ -0,0 +1,152 @@
|
|||||||
|
#!/usr/bin/env ruby
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
# One-time backfill script to populate new columns in existing DuckDB events
|
||||||
|
# This uses DuckDB's JOIN-based UPDATE for maximum performance
|
||||||
|
|
||||||
|
require 'csv'
|
||||||
|
require 'tempfile'
|
||||||
|
|
||||||
|
puts "DuckDB Column Backfill Script (JOIN-based UPDATE)"
|
||||||
|
puts "=" * 60
|
||||||
|
puts "This will update existing DuckDB events with data from PostgreSQL"
|
||||||
|
puts "using a fast JOIN-based approach"
|
||||||
|
puts
|
||||||
|
|
||||||
|
BATCH_SIZE = 50_000
|
||||||
|
|
||||||
|
AnalyticsDuckdbService.instance.with_connection do |conn|
|
||||||
|
# Get total events in DuckDB
|
||||||
|
puts "Step 1: Counting events to backfill..."
|
||||||
|
result = conn.query("SELECT COUNT(*) FROM events WHERE request_method IS NULL")
|
||||||
|
total_to_backfill = result.first&.first || 0
|
||||||
|
|
||||||
|
result = conn.query("SELECT COUNT(*) FROM events")
|
||||||
|
total_events = result.first&.first || 0
|
||||||
|
|
||||||
|
puts " Total events in DuckDB: #{total_events}"
|
||||||
|
puts " Events needing backfill: #{total_to_backfill}"
|
||||||
|
|
||||||
|
if total_to_backfill == 0
|
||||||
|
puts "\n✓ All events already have new columns populated!"
|
||||||
|
exit 0
|
||||||
|
end
|
||||||
|
|
||||||
|
# Get min and max event IDs in DuckDB
|
||||||
|
result = conn.query("SELECT MIN(id), MAX(id) FROM events WHERE request_method IS NULL")
|
||||||
|
min_id, max_id = result.first
|
||||||
|
puts " ID range to backfill: #{min_id} to #{max_id}"
|
||||||
|
|
||||||
|
puts "\nStep 2: Exporting PostgreSQL data in batches..."
|
||||||
|
current_id = min_id
|
||||||
|
batch_num = 0
|
||||||
|
total_updated = 0
|
||||||
|
|
||||||
|
# Create temporary CSV file for data transfer
|
||||||
|
temp_csv = Tempfile.new(['events_backfill', '.csv'])
|
||||||
|
|
||||||
|
begin
|
||||||
|
CSV.open(temp_csv.path, 'w') do |csv|
|
||||||
|
# Header
|
||||||
|
csv << ['id', 'request_method', 'response_status', 'rule_id']
|
||||||
|
|
||||||
|
while current_id <= max_id
|
||||||
|
batch_num += 1
|
||||||
|
batch_end_id = [current_id + BATCH_SIZE - 1, max_id].min
|
||||||
|
|
||||||
|
print " Batch #{batch_num}: Exporting IDs #{current_id}-#{batch_end_id}..."
|
||||||
|
|
||||||
|
# Fetch from PostgreSQL
|
||||||
|
pg_events = Event.where("id >= ? AND id <= ?", current_id, batch_end_id)
|
||||||
|
.select(:id, :request_method, :response_status, :rule_id)
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
pg_events.find_each do |event|
|
||||||
|
csv << [
|
||||||
|
event.id,
|
||||||
|
event.request_method,
|
||||||
|
event.response_status,
|
||||||
|
event.rule_id
|
||||||
|
]
|
||||||
|
count += 1
|
||||||
|
end
|
||||||
|
|
||||||
|
puts " #{count} events"
|
||||||
|
current_id = batch_end_id + 1
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
temp_csv.close
|
||||||
|
|
||||||
|
puts "\n✓ Exported to temporary CSV: #{temp_csv.path}"
|
||||||
|
puts " File size: #{(File.size(temp_csv.path) / 1024.0 / 1024.0).round(2)} MB"
|
||||||
|
|
||||||
|
puts "\nStep 3: Loading CSV into temporary DuckDB table..."
|
||||||
|
conn.execute("DROP TABLE IF EXISTS events_updates")
|
||||||
|
conn.execute(<<~SQL)
|
||||||
|
CREATE TABLE events_updates (
|
||||||
|
id BIGINT,
|
||||||
|
request_method INTEGER,
|
||||||
|
response_status INTEGER,
|
||||||
|
rule_id BIGINT
|
||||||
|
)
|
||||||
|
SQL
|
||||||
|
|
||||||
|
conn.execute(<<~SQL)
|
||||||
|
COPY events_updates FROM '#{temp_csv.path}' (FORMAT CSV, HEADER TRUE, NULL '')
|
||||||
|
SQL
|
||||||
|
|
||||||
|
result = conn.query("SELECT COUNT(*) FROM events_updates")
|
||||||
|
loaded_count = result.first&.first || 0
|
||||||
|
puts "✓ Loaded #{loaded_count} rows into temporary table"
|
||||||
|
|
||||||
|
puts "\nStep 4: Performing bulk UPDATE via JOIN..."
|
||||||
|
start_time = Time.current
|
||||||
|
|
||||||
|
# DuckDB's efficient UPDATE...FROM syntax
|
||||||
|
conn.execute(<<~SQL)
|
||||||
|
UPDATE events
|
||||||
|
SET
|
||||||
|
request_method = events_updates.request_method,
|
||||||
|
response_status = events_updates.response_status,
|
||||||
|
rule_id = events_updates.rule_id
|
||||||
|
FROM events_updates
|
||||||
|
WHERE events.id = events_updates.id
|
||||||
|
SQL
|
||||||
|
|
||||||
|
duration = Time.current - start_time
|
||||||
|
puts "✓ Bulk update complete in #{duration.round(2)}s!"
|
||||||
|
|
||||||
|
puts "\nStep 5: Cleaning up temporary table..."
|
||||||
|
conn.execute("DROP TABLE events_updates")
|
||||||
|
puts "✓ Temporary table dropped"
|
||||||
|
|
||||||
|
ensure
|
||||||
|
# Clean up temp file
|
||||||
|
temp_csv.unlink if temp_csv
|
||||||
|
end
|
||||||
|
|
||||||
|
puts "\nStep 6: Verifying backfill..."
|
||||||
|
result = conn.query("SELECT COUNT(*) FROM events WHERE request_method IS NOT NULL OR response_status IS NOT NULL OR rule_id IS NOT NULL")
|
||||||
|
filled_count = result.first&.first || 0
|
||||||
|
|
||||||
|
result = conn.query("SELECT COUNT(*) FROM events WHERE request_method IS NULL AND response_status IS NULL AND rule_id IS NULL")
|
||||||
|
still_null_count = result.first&.first || 0
|
||||||
|
|
||||||
|
puts " Events with new columns populated: #{filled_count}"
|
||||||
|
puts " Events still with NULL columns: #{still_null_count}"
|
||||||
|
|
||||||
|
if still_null_count > 0
|
||||||
|
puts "\n⚠ Note: #{still_null_count} events still have NULL values."
|
||||||
|
puts " This is normal if those events don't exist in PostgreSQL anymore"
|
||||||
|
puts " (they may have been cleaned up due to retention policy)"
|
||||||
|
else
|
||||||
|
puts "\n✓ Backfill complete! All events have new columns populated."
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
puts "\n" + "=" * 60
|
||||||
|
puts "Backfill complete!"
|
||||||
|
puts "\nNext steps:"
|
||||||
|
puts "1. Test the events index page to verify everything works"
|
||||||
|
puts "2. Monitor performance improvements from DuckDB queries"
|
||||||
@@ -561,14 +561,23 @@ class NetworkRangeTest < ActiveSupport::TestCase
|
|||||||
end
|
end
|
||||||
|
|
||||||
# Analytics Methods
|
# Analytics Methods
|
||||||
test "events_count returns counter cache value" do
|
test "has_events? correctly detects if network has events" do
|
||||||
range = NetworkRange.create!(network: "192.168.1.0/24")
|
range = NetworkRange.create!(network: "192.168.1.0/24")
|
||||||
|
|
||||||
assert_equal 0, range.events_count
|
assert_equal false, range.has_events?
|
||||||
|
|
||||||
# Update counter cache manually for testing
|
# Create a test event in this network
|
||||||
range.update_column(:events_count, 5)
|
Event.create!(
|
||||||
assert_equal 5, range.events_count
|
request_id: "test-1",
|
||||||
|
ip_address: "192.168.1.100",
|
||||||
|
network_range: range,
|
||||||
|
waf_action: 1,
|
||||||
|
request_method: 0,
|
||||||
|
response_status: 200
|
||||||
|
)
|
||||||
|
|
||||||
|
# Should now detect events exist
|
||||||
|
assert_equal true, range.has_events?
|
||||||
end
|
end
|
||||||
|
|
||||||
test "events method finds events within range" do
|
test "events method finds events within range" do
|
||||||
|
|||||||
Reference in New Issue
Block a user