Add 'tags' to event model. Add a dataimport system - currently for MaxMind zip files

This commit is contained in:
Dan Milne
2025-11-11 10:31:36 +11:00
parent 772fae7e8b
commit 26216da9ca
34 changed files with 3580 additions and 14 deletions

96
app/models/data_import.rb Normal file
View File

@@ -0,0 +1,96 @@
class DataImport < ApplicationRecord
has_one_attached :file
validates :import_type, presence: true, inclusion: { in: %w[asn country] }
validates :status, presence: true, inclusion: { in: %w[pending processing completed failed] }
validates :filename, presence: true
attribute :import_stats, default: -> { {} }
# Scopes
scope :recent, -> { order(created_at: :desc) }
scope :by_type, ->(type) { where(import_type: type) }
scope :by_status, ->(status) { where(status: status) }
scope :completed, -> { where(status: 'completed') }
scope :failed, -> { where(status: 'failed') }
scope :processing, -> { where(status: 'processing') }
scope :pending, -> { where(status: 'pending') }
# State management
def pending?
status == 'pending'
end
def processing?
status == 'processing'
end
def completed?
status == 'completed'
end
def failed?
status == 'failed'
end
def start_processing!
update!(
status: 'processing',
started_at: Time.current
)
end
def complete!
updates = {
status: 'completed',
completed_at: Time.current
}
updates[:total_records] = processed_records if total_records.zero?
update!(updates)
end
def fail!(error_message = nil)
update!(
status: 'failed',
completed_at: Time.current,
error_message: error_message
)
end
def progress_percentage
if total_records.zero?
processing? ? 0.1 : 0 # Show minimal progress for processing jobs
else
(processed_records.to_f / total_records * 100).round(2)
end
end
def duration
return 0 unless started_at
end_time = completed_at || Time.current
duration_seconds = (end_time - started_at).round(2)
duration_seconds.negative? ? 0 : duration_seconds
end
def records_per_second
# Handle very fast imports that complete in less than 1 second
if duration.zero?
# Use time since started if no duration available yet
time_elapsed = started_at ? (Time.current - started_at) : 0
return 0 if time_elapsed < 1
(processed_records.to_f / time_elapsed).round(2)
else
(processed_records.to_f / duration).round(2)
end
end
def update_progress(processed: nil, failed: nil, total_records: nil, stats: nil)
updates = {}
updates[:processed_records] = processed if processed
updates[:failed_records] = failed if failed
updates[:total_records] = total_records if total_records
updates[:import_stats] = stats if stats
update!(updates) if updates.any?
end
end

View File

@@ -25,6 +25,10 @@ class Event < ApplicationRecord
# Serialize segment IDs as array for easy manipulation in Railssqit
serialize :request_segment_ids, type: Array, coder: JSON
# Tags are stored as JSON arrays with PostgreSQL jsonb type
# This provides direct array access and efficient indexing
attribute :tags, :json, default: -> { [] }
validates :event_id, presence: true, uniqueness: true
validates :timestamp, presence: true
@@ -36,6 +40,21 @@ class Event < ApplicationRecord
scope :allowed, -> { where(waf_action: :allow) }
scope :rate_limited, -> { where(waf_action: 'rate_limit') }
# Tag-based filtering scopes using PostgreSQL array operators
scope :with_tag, ->(tag) { where("tags @> ARRAY[?]", tag.to_s) }
scope :with_any_tags, ->(tags) {
return none if tags.blank?
tag_array = Array(tags).map(&:to_s)
where("tags && ARRAY[?]", tag_array)
}
scope :with_all_tags, ->(tags) {
return none if tags.blank?
tag_array = Array(tags).map(&:to_s)
where("tags @> ARRAY[?]", tag_array)
}
# Network-based filtering scopes
scope :by_company, ->(company) {
joins("JOIN network_ranges ON events.ip_address <<= network_ranges.network")
@@ -234,7 +253,8 @@ class Event < ApplicationRecord
end
def tags
payload&.dig("tags") || {}
# Use the dedicated tags column (array), fallback to payload during transition
super.presence || (payload&.dig("tags") || [])
end
def headers
@@ -281,6 +301,25 @@ class Event < ApplicationRecord
URI.parse(request_url).hostname rescue nil
end
# Tag helper methods
def add_tag(tag)
tag_str = tag.to_s
self.tags = (tags + [tag_str]).uniq unless tags.include?(tag_str)
end
def remove_tag(tag)
tag_str = tag.to_s
self.tags = tags - [tag_str] if tags.include?(tag_str)
end
def has_tag?(tag)
tags.include?(tag.to_s)
end
def tag_list
tags.join(', ')
end
# Normalize headers to lower case keys during import phase
def normalize_headers(headers)
return {} unless headers.is_a?(Hash)

View File

@@ -7,7 +7,7 @@
# and classification flags (datacenter, proxy, VPN).
class NetworkRange < ApplicationRecord
# Sources for network range creation
SOURCES = %w[api_imported user_created manual auto_generated inherited].freeze
SOURCES = %w[api_imported user_created manual auto_generated inherited geolite_asn geolite_country].freeze
# Associations
has_many :rules, dependent: :destroy
@@ -29,6 +29,9 @@ class NetworkRange < ApplicationRecord
scope :vpn, -> { where(is_vpn: true) }
scope :user_created, -> { where(source: 'user_created') }
scope :api_imported, -> { where(source: 'api_imported') }
scope :geolite_imported, -> { where(source: ['geolite_asn', 'geolite_country']) }
scope :geolite_asn, -> { where(source: 'geolite_asn') }
scope :geolite_country, -> { where(source: 'geolite_country') }
scope :with_events, -> { where("events_count > 0") }
scope :most_active, -> { order(events_count: :desc) }
@@ -295,4 +298,44 @@ class NetworkRange < ApplicationRecord
# The inherited_intelligence method will pick up the new parent data
end
end
# Import-related class methods
def self.import_stats_by_source
group(:source)
.select(:source, 'COUNT(*) as count', 'MIN(created_at) as first_import', 'MAX(updated_at) as last_update')
.order(:source)
end
def self.geolite_coverage_stats
{
total_networks: geolite_imported.count,
asn_networks: geolite_asn.count,
country_networks: geolite_country.count,
with_asn_data: geolite_imported.where.not(asn: nil).count,
with_country_data: geolite_imported.where.not(country: nil).count,
with_proxy_data: geolite_imported.where(is_proxy: true).count,
unique_countries: geolite_imported.distinct.count(:country),
unique_asns: geolite_imported.distinct.count(:asn),
ipv4_networks: geolite_imported.ipv4.count,
ipv6_networks: geolite_imported.ipv6.count
}
end
def self.find_by_ip_or_network(query)
return none if query.blank?
begin
# Try to parse as IP address first
ip = IPAddr.new(query)
where("network >>= ?", ip.to_s)
rescue IPAddr::InvalidAddressError
# Try to parse as network
begin
network = IPAddr.new(query)
where(network: network.to_s)
rescue IPAddr::InvalidAddressError
none
end
end
end
end

View File

@@ -122,7 +122,7 @@ validate :targets_must_be_array
network_range: network_range,
waf_policy: self,
user: user,
source: "policy:#{name}",
source: "policy",
metadata: build_rule_metadata(network_range),
priority: network_range.prefix_length
)