More use of tags - drop add_header action -> allow + headers+tags
This commit is contained in:
@@ -33,9 +33,11 @@ class AnalyticsDuckdbService
|
||||
is_datacenter BOOLEAN,
|
||||
is_vpn BOOLEAN,
|
||||
is_proxy BOOLEAN,
|
||||
is_bot BOOLEAN,
|
||||
waf_action INTEGER,
|
||||
request_path VARCHAR,
|
||||
user_agent VARCHAR
|
||||
user_agent VARCHAR,
|
||||
tags VARCHAR[]
|
||||
)
|
||||
SQL
|
||||
|
||||
@@ -101,6 +103,9 @@ class AnalyticsDuckdbService
|
||||
batch_count = 0
|
||||
|
||||
begin
|
||||
# Create initial appender
|
||||
appender = conn.appender("events")
|
||||
|
||||
# Use PostgreSQL cursor for memory-efficient streaming
|
||||
Event.where("timestamp >= ? AND id > ?", from_timestamp, max_id)
|
||||
.select(
|
||||
@@ -115,18 +120,14 @@ class AnalyticsDuckdbService
|
||||
:is_datacenter,
|
||||
:is_vpn,
|
||||
:is_proxy,
|
||||
:is_bot,
|
||||
:waf_action,
|
||||
:request_path,
|
||||
:user_agent
|
||||
:user_agent,
|
||||
:tags
|
||||
)
|
||||
.order(:id)
|
||||
.each_row(block_size: BATCH_SIZE) do |event_data|
|
||||
# Create new appender for each batch
|
||||
if batch_count % BATCH_SIZE == 0
|
||||
appender&.close # Close previous appender
|
||||
appender = conn.appender("events")
|
||||
end
|
||||
|
||||
# Unpack event data from cursor row (Hash from each_row)
|
||||
begin
|
||||
appender.append_row(
|
||||
@@ -141,9 +142,11 @@ class AnalyticsDuckdbService
|
||||
event_data["is_datacenter"],
|
||||
event_data["is_vpn"],
|
||||
event_data["is_proxy"],
|
||||
event_data["is_bot"],
|
||||
event_data["waf_action"],
|
||||
event_data["request_path"],
|
||||
event_data["user_agent"]
|
||||
event_data["user_agent"],
|
||||
event_data["tags"] || []
|
||||
)
|
||||
rescue StandardError => e
|
||||
Rails.logger.error "[DuckDB] Error appending event #{event_data['id']}: #{e.message}"
|
||||
@@ -154,8 +157,10 @@ class AnalyticsDuckdbService
|
||||
batch_count += 1
|
||||
total_synced += 1
|
||||
|
||||
# Log progress every BATCH_SIZE events
|
||||
# Flush and recreate appender every BATCH_SIZE events to avoid chunk overflow
|
||||
if batch_count % BATCH_SIZE == 0
|
||||
appender.close
|
||||
appender = conn.appender("events")
|
||||
Rails.logger.info "[DuckDB] Synced batch (total: #{total_synced} events)"
|
||||
end
|
||||
end
|
||||
@@ -222,7 +227,8 @@ class AnalyticsDuckdbService
|
||||
SQL
|
||||
|
||||
# Convert to hash like PostgreSQL returns
|
||||
result.to_a.to_h { |row| [row["waf_action"], row["count"]] }
|
||||
# DuckDB returns arrays: [waf_action, count]
|
||||
result.to_a.to_h { |row| [row[0], row[1]] }
|
||||
end
|
||||
end
|
||||
|
||||
@@ -238,7 +244,8 @@ class AnalyticsDuckdbService
|
||||
LIMIT ?
|
||||
SQL
|
||||
|
||||
result.to_a.map { |row| [row["country"], row["count"]] }
|
||||
# DuckDB returns arrays: [country, count]
|
||||
result.to_a.map { |row| [row[0], row[1]] }
|
||||
end
|
||||
end
|
||||
|
||||
@@ -254,7 +261,8 @@ class AnalyticsDuckdbService
|
||||
LIMIT ?
|
||||
SQL
|
||||
|
||||
result.to_a.map { |row| [row["ip_address"], row["count"]] }
|
||||
# DuckDB returns arrays: [ip_address, count]
|
||||
result.to_a.map { |row| [row[0], row[1]] }
|
||||
end
|
||||
end
|
||||
|
||||
@@ -272,7 +280,8 @@ class AnalyticsDuckdbService
|
||||
SQL
|
||||
|
||||
# Convert to hash with Time keys like PostgreSQL
|
||||
result.to_a.to_h { |row| [row["hour"], row["count"]] }
|
||||
# DuckDB returns arrays: [hour, count]
|
||||
result.to_a.to_h { |row| [row[0], row[1]] }
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
Reference in New Issue
Block a user