Update duckdb. use more duckdb. Fix the display of stats
This commit is contained in:
@@ -2,9 +2,9 @@
|
||||
|
||||
require 'ostruct'
|
||||
|
||||
# EventDdb - DuckDB-backed analytics queries for events
|
||||
# Provides an ActiveRecord-like interface for querying DuckDB events table
|
||||
# Falls back to PostgreSQL Event model if DuckDB is unavailable
|
||||
# EventDdb - DuckLake-backed analytics queries for events
|
||||
# Provides an ActiveRecord-like interface for querying DuckLake events table
|
||||
# Falls back to PostgreSQL Event model if DuckLake is unavailable
|
||||
class EventDdb
|
||||
# Enum mappings from integer to string (matching Event model)
|
||||
ACTION_MAP = {
|
||||
@@ -26,30 +26,24 @@ class EventDdb
|
||||
}.freeze
|
||||
|
||||
class << self
|
||||
# Get DuckDB service
|
||||
# Get DuckLake service
|
||||
def service
|
||||
AnalyticsDuckdbService.instance
|
||||
AnalyticsDucklakeService.new
|
||||
end
|
||||
|
||||
# Helper to load parquet files into in-memory events view
|
||||
# Helper to work with DuckLake events table
|
||||
# This allows all existing queries to work without modification
|
||||
# Uses glob pattern to read all parquet files (excluding .temp files)
|
||||
def with_events_from_parquet(&block)
|
||||
service.with_connection do |conn|
|
||||
# Create events view from all parquet files using glob pattern
|
||||
# Pattern matches: minute/*.parquet, hours/*.parquet, days/*.parquet, weeks/*.parquet
|
||||
# Excludes .temp files automatically (they don't match *.parquet)
|
||||
parquet_pattern = "#{AnalyticsDuckdbService::PARQUET_BASE_PATH}/**/*.parquet"
|
||||
|
||||
conn.execute(<<~SQL)
|
||||
CREATE VIEW events AS
|
||||
SELECT * FROM read_parquet('#{parquet_pattern}')
|
||||
SQL
|
||||
# Ensure schema exists
|
||||
service.setup_schema(conn)
|
||||
|
||||
# Use the DuckLake events table directly
|
||||
# DuckLake automatically manages the Parquet files underneath
|
||||
yield conn
|
||||
end
|
||||
rescue StandardError => e
|
||||
Rails.logger.error "[EventDdb] Error loading parquet files: #{e.message}"
|
||||
Rails.logger.error "[EventDdb] Error accessing DuckLake events: #{e.message}"
|
||||
nil
|
||||
end
|
||||
|
||||
@@ -691,70 +685,14 @@ class EventDdb
|
||||
|
||||
# Search events with filters and pagination
|
||||
# Returns { total_count:, events:[], page:, per_page: }
|
||||
# Supports filters: ip, waf_action, country, rule_id, company, asn, network_type, network_range_id, exclude_bots
|
||||
# Supports filters: ip, waf_action, country, rule_id, company, asn, network_type, network_range_id, exclude_bots, request_path
|
||||
def search(filters = {}, page: 1, per_page: 50)
|
||||
# Get list of Parquet files to query
|
||||
parquet_files = service.parquet_files_for_range(1.year.ago, Time.current)
|
||||
|
||||
if parquet_files.empty?
|
||||
Rails.logger.warn "[EventDdb] No Parquet files found, falling back to DuckDB"
|
||||
return search_duckdb(filters, page, per_page)
|
||||
end
|
||||
|
||||
# Query Parquet files using in-memory DuckDB (no file locks!)
|
||||
service.with_parquet_connection do |conn|
|
||||
# Build WHERE clause
|
||||
where_clause, params = build_where_clause(filters)
|
||||
|
||||
# Build file list for read_parquet
|
||||
file_list = parquet_files.map { |f| "'#{f}'" }.join(", ")
|
||||
|
||||
# Get total count
|
||||
count_sql = "SELECT COUNT(*) FROM read_parquet([#{file_list}])#{where_clause}"
|
||||
count_result = conn.query(count_sql, *params)
|
||||
total_count = count_result.first&.first || 0
|
||||
|
||||
# Get paginated results
|
||||
offset = (page - 1) * per_page
|
||||
|
||||
data_sql = <<~SQL
|
||||
SELECT
|
||||
id, timestamp, ip_address, network_range_id, country, company,
|
||||
asn, asn_org, is_datacenter, is_vpn, is_proxy, is_bot,
|
||||
waf_action, request_method, response_status, rule_id,
|
||||
request_path, user_agent, tags
|
||||
FROM read_parquet([#{file_list}])
|
||||
#{where_clause}
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT ? OFFSET ?
|
||||
SQL
|
||||
|
||||
result = conn.query(data_sql, *params, per_page, offset)
|
||||
|
||||
# Convert rows to event-like objects
|
||||
events = result.to_a.map { |row| row_to_event(row) }
|
||||
|
||||
{
|
||||
total_count: total_count,
|
||||
events: events,
|
||||
page: page,
|
||||
per_page: per_page
|
||||
}
|
||||
end
|
||||
rescue StandardError => e
|
||||
Rails.logger.error "[EventDdb] Error in Parquet search: #{e.message}"
|
||||
Rails.logger.error e.backtrace.join("\n")
|
||||
nil
|
||||
end
|
||||
|
||||
# Fallback to querying DuckDB directly (for backward compatibility)
|
||||
def search_duckdb(filters = {}, page: 1, per_page: 50)
|
||||
with_events_from_parquet do |conn|
|
||||
# Build WHERE clause
|
||||
where_clause, params = build_where_clause(filters)
|
||||
|
||||
# Get total count
|
||||
count_sql = "SELECT COUNT(*) FROM events#{where_clause}"
|
||||
count_sql = "SELECT COUNT(*) FROM baffle.events#{where_clause}"
|
||||
count_result = conn.query(count_sql, *params)
|
||||
total_count = count_result.first&.first || 0
|
||||
|
||||
@@ -767,7 +705,7 @@ class EventDdb
|
||||
asn, asn_org, is_datacenter, is_vpn, is_proxy, is_bot,
|
||||
waf_action, request_method, response_status, rule_id,
|
||||
request_path, user_agent, tags
|
||||
FROM events
|
||||
FROM baffle.events
|
||||
#{where_clause}
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT ? OFFSET ?
|
||||
@@ -786,7 +724,7 @@ class EventDdb
|
||||
}
|
||||
end
|
||||
rescue StandardError => e
|
||||
Rails.logger.error "[EventDdb] Error in DuckDB search: #{e.message}"
|
||||
Rails.logger.error "[EventDdb] Error in DuckLake search: #{e.message}"
|
||||
Rails.logger.error e.backtrace.join("\n")
|
||||
nil
|
||||
end
|
||||
@@ -852,6 +790,12 @@ class EventDdb
|
||||
end
|
||||
end
|
||||
|
||||
# Path filtering
|
||||
if filters[:request_path].present?
|
||||
conditions << "request_path = ?"
|
||||
params << filters[:request_path]
|
||||
end
|
||||
|
||||
# Bot filtering
|
||||
if filters[:exclude_bots] == true || filters[:exclude_bots] == "true"
|
||||
conditions << "is_bot = false"
|
||||
|
||||
Reference in New Issue
Block a user