Files
baffle-hub/test/services/waf_policy_matcher_test.rb
Dan Milne 6433f6c5bb Updates
2025-11-14 16:35:49 +11:00

530 lines
19 KiB
Ruby

require "test_helper"
class WafPolicyMatcherTest < ActiveSupport::TestCase
setup do
@user = users(:jason)
@network_range = NetworkRange.create!(
network: "192.168.1.0/24",
country: "BR",
company: "Test Company",
asn: 12345,
is_datacenter: false
)
@matcher = WafPolicyMatcher.new(network_range: @network_range)
end
teardown do
WafPolicy.delete_all
Rule.delete_all
NetworkRange.delete_all
end
# Initialization
test "initializes with network range" do
assert_equal @network_range, @matcher.network_range
assert_equal [], @matcher.matching_policies
assert_equal [], @matcher.generated_rules
end
test "handles nil network range" do
matcher = WafPolicyMatcher.new(network_range: nil)
assert_nil matcher.network_range
end
# Policy Matching
test "find_matching_policies returns policies that match network range" do
# Create policies that should match
brazil_policy = WafPolicy.create_country_policy(['BR'], 'deny', @user, "Block Brazil")
test_asn_policy = WafPolicy.create_asn_policy([12345], 'challenge', @user, "Challenge ASN")
test_company_policy = WafPolicy.create_company_policy(['Test Company'], 'redirect', @user, "Redirect Company")
# Create policies that should not match
us_policy = WafPolicy.create_country_policy(['US'], 'deny', @user, "Block US")
other_asn_policy = WafPolicy.create_asn_policy([67890], 'deny', @user, "Block Other ASN")
matching_policies = @matcher.find_matching_policies
assert_includes matching_policies, brazil_policy
assert_includes matching_policies, test_asn_policy
assert_includes matching_policies, test_company_policy
assert_not_includes matching_policies, us_policy
assert_not_includes matching_policies, other_asn_policy
end
test "find_matching_policies sorts by policy type priority" do
# Create different policy types with same creation time
base_time = 1.hour.ago
# Country policy (highest priority)
country_policy = WafPolicy.create!(
name: "Country Policy",
policy_type: "country",
targets: ["BR"],
policy_action: "deny",
user: @user,
created_at: base_time
)
# ASN policy (second priority)
asn_policy = WafPolicy.create!(
name: "ASN Policy",
policy_type: "asn",
targets: [12345],
policy_action: "deny",
user: @user,
created_at: base_time
)
# Company policy (third priority)
company_policy = WafPolicy.create!(
name: "Company Policy",
policy_type: "company",
targets: ["Test Company"],
policy_action: "deny",
user: @user,
created_at: base_time
)
# Network type policy (lowest priority)
network_type_policy = WafPolicy.create!(
name: "Network Type Policy",
policy_type: "network_type",
targets: ["standard"],
policy_action: "deny",
user: @user,
created_at: base_time
)
matching_policies = @matcher.find_matching_policies
# Should be ordered by priority: country > asn > company > network_type
assert_equal country_policy, matching_policies[0]
assert_equal asn_policy, matching_policies[1]
assert_equal company_policy, matching_policies[2]
assert_equal network_type_policy, matching_policies[3]
end
test "find_matching_policies sorts by creation date for same priority" do
# Create two country policies with different creation times
older_policy = WafPolicy.create!(
name: "Older Policy",
policy_type: "country",
targets: ["BR"],
policy_action: "deny",
user: @user,
created_at: 2.hours.ago
)
newer_policy = WafPolicy.create!(
name: "Newer Policy",
policy_type: "country",
targets: ["BR"],
policy_action: "deny",
user: @user,
created_at: 1.hour.ago
)
matching_policies = @matcher.find_matching_policies
# Newer policy should come first
assert_equal newer_policy, matching_policies[0]
assert_equal older_policy, matching_policies[1]
end
test "find_matching_policies skips inactive policies" do
# Create active policy
active_policy = WafPolicy.create_country_policy(['BR'], 'deny', @user, "Active Policy")
# Create disabled policy
disabled_policy = WafPolicy.create_country_policy(['BR'], 'deny', @user, "Disabled Policy")
disabled_policy.update!(enabled: false)
# Create expired policy
expired_policy = WafPolicy.create_country_policy(['BR'], 'deny', @user, "Expired Policy")
expired_policy.update!(expires_at: 1.hour.ago)
matching_policies = @matcher.find_matching_policies
assert_includes matching_policies, active_policy
assert_not_includes matching_policies, disabled_policy
assert_not_includes matching_policies, expired_policy
end
test "find_matching_policies returns empty array for nil network range" do
matcher = WafPolicyMatcher.new(network_range: nil)
matching_policies = matcher.find_matching_policies
assert_equal [], matching_policies
end
# Rule Generation
test "generate_rules creates rules for matching policies" do
brazil_policy = WafPolicy.create_country_policy(['BR'], 'deny', @user, "Block Brazil")
@matcher.instance_variable_set(:@matching_policies, [brazil_policy])
generated_rules = @matcher.generate_rules
assert_equal 1, generated_rules.length
rule = generated_rules.first
assert_equal brazil_policy, rule.waf_policy
assert_equal @network_range, rule.network_range
assert_equal "deny", rule.waf_action
end
test "generate_rules handles multiple matching policies" do
policies = [
WafPolicy.create_country_policy(['BR'], 'deny', @user, "Block Brazil"),
WafPolicy.create_asn_policy([12345], 'challenge', @user, "Challenge ASN"),
WafPolicy.create_company_policy(['Test Company'], 'redirect', @user, "Redirect Company")
]
@matcher.instance_variable_set(:@matching_policies, policies)
generated_rules = @matcher.generate_rules
assert_equal 3, generated_rules.length
assert_equal "deny", generated_rules[0].action
assert_equal "challenge", generated_rules[1].action
assert_equal "redirect", generated_rules[2].action
end
test "generate_rules returns existing rules instead of duplicates" do
policy = WafPolicy.create_country_policy(['BR'], 'deny', @user, "Block Brazil")
# Create existing rule
existing_rule = Rule.create!(
rule_type: "network",
action: "deny",
network_range: @network_range,
waf_policy: policy,
user: @user,
enabled: true
)
@matcher.instance_variable_set(:@matching_policies, [policy])
generated_rules = @matcher.generate_rules
assert_equal 1, generated_rules.length
assert_equal existing_rule, generated_rules.first
end
test "generate_rules handles policy that fails to create rule" do
policy = WafPolicy.create_country_policy(['BR'], 'deny', @user, "Block Brazil")
# Mock policy to return nil for rule creation (e.g., expired policy)
policy.expects(:create_rule_for_network_range).with(@network_range).returns(nil)
@matcher.instance_variable_set(:@matching_policies, [policy])
generated_rules = @matcher.generate_rules
assert_equal [], generated_rules
end
test "generate_rules returns empty array for no matching policies" do
@matcher.instance_variable_set(:@matching_policies, [])
generated_rules = @matcher.generate_rules
assert_equal [], generated_rules
end
# Combined Operations
test "match_and_generate_rules does both operations" do
brazil_policy = WafPolicy.create_country_policy(['BR'], 'deny', @user, "Block Brazil")
result = @matcher.match_and_generate_rules
assert_equal 1, result[:matching_policies].length
assert_equal 1, result[:generated_rules].length
assert_includes result[:matching_policies], brazil_policy
assert_equal brazil_policy, result[:generated_rules].first.waf_policy
end
# Class Methods
test "self.process_network_range creates matcher and processes" do
policy = WafPolicy.create_country_policy(['BR'], 'deny', @user, "Block Brazil")
result = WafPolicyMatcher.process_network_range(@network_range)
assert_equal 1, result[:matching_policies].length
assert_equal 1, result[:generated_rules].length
end
test "self.evaluate_and_mark! processes and marks as evaluated" do
policy = WafPolicy.create_country_policy(['BR'], 'deny', @user, "Block Brazil")
original_evaluated_at = @network_range.policies_evaluated_at
result = WafPolicyMatcher.evaluate_and_mark!(@network_range)
assert_equal 1, result[:matching_policies].length
assert_equal 1, result[:generated_rules].length
@network_range.reload
assert @network_range.policies_evaluated_at > original_evaluated_at
end
test "self.evaluate_and_mark! handles nil network range" do
result = WafPolicyMatcher.evaluate_and_mark!(nil)
assert_equal({ matching_policies: [], generated_rules: [] }, result)
end
test "self.batch_process_network_ranges processes multiple ranges" do
# Create multiple network ranges
range1 = NetworkRange.create!(network: "192.168.1.0/24", country: "BR")
range2 = NetworkRange.create!(network: "192.168.2.0/24", country: "US")
# Create policies
brazil_policy = WafPolicy.create_country_policy(['BR'], 'deny', @user, "Block Brazil")
us_policy = WafPolicy.create_country_policy(['US'], 'deny', @user, "Block US")
results = WafPolicyMatcher.batch_process_network_ranges([range1, range2])
assert_equal 2, results.length
assert_equal range1, results[0][:network_range]
assert_equal range2, results[1][:network_range]
assert_equal 1, results[0][:matching_policies].length
assert_equal 1, results[1][:matching_policies].length
end
test "self.process_ranges_without_policy_rules finds ranges needing evaluation" do
# Create range with intelligence but no rules
intelligent_range = NetworkRange.create!(
network: "192.168.1.0/24",
country: "BR",
asn: 12345
)
# Create range with no intelligence
dumb_range = NetworkRange.create!(
network: "192.168.2.0/24"
)
# Create range with existing rules
range_with_rules = NetworkRange.create!(
network: "192.168.3.0/24",
country: "US"
)
policy = WafPolicy.create_country_policy(['US'], 'deny', @user, "Block US")
Rule.create!(
rule_type: "network",
action: "deny",
network_range: range_with_rules,
waf_policy: policy,
user: @user
)
results = WafPolicyMatcher.process_ranges_without_policy_rules(limit: 10)
# Should only process the intelligent range without rules
assert_equal 1, results.length
assert_equal intelligent_range, results[0][:network_range]
end
test "self.reprocess_all_for_policy finds potential ranges for policy" do
# Create country policy
brazil_policy = WafPolicy.create_country_policy(['BR'], 'deny', @user, "Block Brazil")
# Create matching and non-matching ranges
brazil_range = NetworkRange.create!(network: "192.168.1.0/24", country: "BR")
us_range = NetworkRange.create!(network: "192.168.2.0/24", country: "US")
results = WafPolicyMatcher.reprocess_all_for_policy(brazil_policy)
assert_equal 1, results.length
assert_equal brazil_range, results[0][:network_range]
assert_not_nil results[0][:generated_rule]
end
test "self.reprocess_all_for_policy handles different policy types" do
# Test ASN policy
asn_policy = WafPolicy.create_asn_policy([12345], 'deny', @user, "Block ASN")
asn_range = NetworkRange.create!(network: "192.168.1.0/24", asn: 12345)
# Test company policy
company_policy = WafPolicy.create_company_policy(['Test Corp'], 'deny', @user, "Block Company")
company_range = NetworkRange.create!(network: "192.168.2.0/24", company: "Test Corporation")
# Test network type policy
network_type_policy = WafPolicy.create_network_type_policy(['datacenter'], 'deny', @user, "Block Datacenter")
dc_range = NetworkRange.create!(network: "192.168.3.0/24", is_datacenter: true)
asn_results = WafPolicyMatcher.reprocess_all_for_policy(asn_policy)
company_results = WafPolicyMatcher.reprocess_all_for_policy(company_policy)
network_type_results = WafPolicyMatcher.reprocess_all_for_policy(network_type_policy)
assert_equal 1, asn_results.length
assert_equal 1, company_results.length
assert_equal 1, network_type_results.length
end
# Statistics and Reporting
test "self.matching_policies_for_network_range returns matching policies" do
policy = WafPolicy.create_country_policy(['BR'], 'deny', @user, "Block Brazil")
matching_policies = WafPolicyMatcher.matching_policies_for_network_range(@network_range)
assert_equal 1, matching_policies.length
assert_includes matching_policies, policy
end
test "self.policy_effectiveness_stats returns correct statistics" do
policy = WafPolicy.create_country_policy(['BR'], 'deny', @user, "Block Brazil")
# Create some rules for the policy
range1 = NetworkRange.create!(network: "192.168.1.0/24", country: "BR")
range2 = NetworkRange.create!(network: "192.168.2.0/24", country: "BR")
rule1 = Rule.create!(
rule_type: "network",
action: "deny",
network_range: range1,
waf_policy: policy,
user: @user
)
rule2 = Rule.create!(
rule_type: "network",
action: "deny",
network_range: range2,
waf_policy: policy,
user: @user,
enabled: false # Disabled rule
)
stats = WafPolicyMatcher.policy_effectiveness_stats(policy, days: 30)
assert_equal policy.name, stats[:policy_name]
assert_equal "country", stats[:policy_type]
assert_equal "deny", stats[:action]
assert_equal 2, stats[:rules_generated]
assert_equal 1, stats[:active_rules] # Only enabled rules
assert_equal 2, stats[:networks_protected]
assert_equal 30, stats[:period_days]
assert_equal 2.0 / 30, stats[:generation_rate]
end
# Network Intelligence Matching
test "matches country policies based on network range country" do
range_with_country = NetworkRange.create!(network: "192.168.1.0/24", country: "BR")
range_without_country = NetworkRange.create!(network: "192.168.2.0/24")
brazil_policy = WafPolicy.create_country_policy(['BR'], 'deny', @user, "Block Brazil")
matcher1 = WafPolicyMatcher.new(network_range: range_with_country)
matcher2 = WafPolicyMatcher.new(network_range: range_without_country)
matching1 = matcher1.find_matching_policies
matching2 = matcher2.find_matching_policies
assert_includes matching1, brazil_policy
assert_not_includes matching2, brazil_policy
end
test "matches network type policies based on intelligence flags" do
dc_range = NetworkRange.create!(network: "192.168.1.0/24", is_datacenter: true)
proxy_range = NetworkRange.create!(network: "192.168.2.0/24", is_proxy: true)
standard_range = NetworkRange.create!(network: "192.168.3.0/24") # All flags false
dc_policy = WafPolicy.create_network_type_policy(['datacenter'], 'deny', @user, "Block Datacenter")
proxy_policy = WafPolicy.create_network_type_policy(['proxy'], 'deny', @user, "Block Proxy")
standard_policy = WafPolicy.create_network_type_policy(['standard'], 'deny', @user, "Block Standard")
dc_matcher = WafPolicyMatcher.new(network_range: dc_range)
proxy_matcher = WafPolicyMatcher.new(network_range: proxy_range)
standard_matcher = WafPolicyMatcher.new(network_range: standard_range)
assert_includes dc_matcher.find_matching_policies, dc_policy
assert_includes proxy_matcher.find_matching_policies, proxy_policy
assert_includes standard_matcher.find_matching_policies, standard_policy
end
# Inheritance Support
test "matches policies based on inherited intelligence" do
# Create parent network with intelligence
parent = NetworkRange.create!(
network: "192.168.0.0/16",
country: "BR",
company: "Test Corp"
)
# Create child network without its own intelligence
child = NetworkRange.create!(network: "192.168.1.0/24")
brazil_policy = WafPolicy.create_country_policy(['BR'], 'deny', @user, "Block Brazil")
company_policy = WafPolicy.create_company_policy(['Test Corp'], 'challenge', @user, "Challenge Corp")
matcher = WafPolicyMatcher.new(network_range: child)
matching_policies = matcher.find_matching_policies
# Should match based on inherited intelligence
assert_includes matching_policies, brazil_policy
assert_includes matching_policies, company_policy
end
# Performance and Edge Cases
test "handles large numbers of policies efficiently" do
# Create many policies
policies = []
100.times do |i|
policies << WafPolicy.create_country_policy(
["US"], "deny", @user, "Policy #{i}"
)
end
# Only one should match (our network is BR, not US)
matching_policies = @matcher.find_matching_policies
assert_equal 0, matching_policies.length
end
test "handles policies with complex additional_data" do
redirect_policy = WafPolicy.create!(
name: "Complex Redirect",
policy_type: "country",
targets: ["BR"],
policy_action: "redirect",
user: @user,
additional_data: {
"redirect_url" => "https://example.com/blocked",
"redirect_status" => 301,
"custom_headers" => {
"X-Block-Reason" => "Country blocked"
}
}
)
rule = redirect_policy.create_rule_for_network_range(@network_range)
assert_not_nil rule
assert_equal "redirect", rule.waf_action
assert rule.metadata['redirect_url'].present?
end
test "handles company name case-insensitive matching" do
range = NetworkRange.create!(
network: "192.168.1.0/24",
company: "Google LLC"
)
# Policies with different case variations
policy1 = WafPolicy.create_company_policy(['google'], 'deny', @user, "Block Google")
policy2 = WafPolicy.create_company_policy(['GOOGLE LLC'], 'deny', @user, "Block Google LLC")
policy3 = WafPolicy.create_company_policy(['Microsoft'], 'deny', @user, "Block Microsoft")
matcher = WafPolicyMatcher.new(network_range: range)
matching_policies = matcher.find_matching_policies
assert_includes matching_policies, policy1
assert_includes matching_policies, policy2
assert_not_includes matching_policies, policy3
end
test "handles partial company name matching" do
range = NetworkRange.create!(
network: "192.168.1.0/24",
company: "Amazon Web Services"
)
# Policy with partial match
policy = WafPolicy.create_company_policy(['Amazon'], 'deny', @user, "Block Amazon")
matcher = WafPolicyMatcher.new(network_range: range)
matching_policies = matcher.find_matching_policies
assert_includes matching_policies, policy
end
end