292 lines
9.9 KiB
Ruby
292 lines
9.9 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "test_helper"
|
|
|
|
# Custom test class that avoids fixture loading issues
|
|
class WafPolicyBrazilTest < Minitest::Test
|
|
def setup
|
|
# Clean up any existing data
|
|
Event.delete_all
|
|
Rule.delete_all
|
|
NetworkRange.delete_all
|
|
WafPolicy.delete_all
|
|
User.delete_all
|
|
|
|
@user = User.create!(email_address: "test@example.com", password: "password")
|
|
|
|
# Create a WAF policy to block Brazil
|
|
@brazil_policy = WafPolicy.create_country_policy(
|
|
['BR'],
|
|
policy_action: 'deny',
|
|
user: @user,
|
|
name: "Block Brazil"
|
|
)
|
|
|
|
# Sample event data for a Brazilian IP
|
|
@brazil_ip = "177.104.144.0" # Known Brazilian IP
|
|
@brazil_event_data = {
|
|
"request_id" => "brazil-test-123",
|
|
"timestamp" => Time.now.iso8601,
|
|
"request" => {
|
|
"ip" => @brazil_ip,
|
|
"method" => "GET",
|
|
"path" => "/api/test",
|
|
"headers" => {
|
|
"host" => "example.com",
|
|
"user-agent" => "TestAgent/1.0"
|
|
}
|
|
},
|
|
"response" => {
|
|
"status_code" => 200,
|
|
"duration_ms" => 150
|
|
},
|
|
"waf_action" => "allow",
|
|
"server_name" => "test-server",
|
|
"environment" => "test",
|
|
"geo" => {
|
|
"country_code" => "BR",
|
|
"city" => "São Paulo"
|
|
},
|
|
"agent" => {
|
|
"name" => "baffle-agent",
|
|
"version" => "1.0.0"
|
|
}
|
|
}
|
|
end
|
|
|
|
def teardown
|
|
Event.delete_all
|
|
Rule.delete_all
|
|
NetworkRange.delete_all
|
|
WafPolicy.delete_all
|
|
User.delete_all
|
|
end
|
|
|
|
def test_brazil_waf_policy_generates_block_rule_when_brazilian_event_is_processed
|
|
# Process the Brazilian event
|
|
event = Event.create_from_waf_payload!("brazil-test", @brazil_event_data)
|
|
assert event.persisted?
|
|
|
|
# Extract country code from payload geo data
|
|
country_code = event.payload.dig("geo", "country_code")
|
|
assert_equal "BR", country_code
|
|
assert_equal @brazil_ip, event.ip_address.to_s
|
|
|
|
# Ensure network range exists for the Brazilian IP
|
|
network_range = NetworkRangeGenerator.find_or_create_for_ip(@brazil_ip)
|
|
assert network_range.persisted?
|
|
assert network_range.contains_ip?(@brazil_ip)
|
|
|
|
# Set the country on the network range to simulate geo-lookup
|
|
network_range.update!(country: 'BR')
|
|
|
|
# Process WAF policies for this network range
|
|
ProcessWafPoliciesJob.perform_now(network_range: network_range, event: event)
|
|
|
|
# Verify that a blocking rule was generated
|
|
generated_rules = Rule.where(
|
|
network_range: network_range,
|
|
policy_action: 'deny',
|
|
waf_policy: @brazil_policy
|
|
)
|
|
|
|
assert_equal 1, generated_rules.count, "Should have generated exactly one blocking rule"
|
|
|
|
rule = generated_rules.first
|
|
assert_equal 'deny', rule.action
|
|
assert_equal network_range, rule.network_range
|
|
assert_equal @brazil_policy, rule.waf_policy
|
|
assert_equal "policy", rule.source
|
|
assert rule.enabled?, "Generated rule should be enabled"
|
|
|
|
# Verify rule metadata contains policy information
|
|
metadata = rule.metadata
|
|
assert_equal @brazil_policy.id, metadata['generated_by_policy']
|
|
assert_equal "Block Brazil", metadata['policy_name']
|
|
assert_equal "country", metadata['policy_type']
|
|
assert_equal "country", metadata['matched_field']
|
|
assert_equal "BR", metadata['matched_value']
|
|
end
|
|
|
|
def test_non_brazilian_event_does_not_generate_block_rule_from_brazil_policy
|
|
# Create event data for a US IP
|
|
us_ip = "8.8.8.8" # Known US IP
|
|
us_event_data = @brazil_event_data.dup
|
|
us_event_data["event_id"] = "us-test-123"
|
|
us_event_data["request"]["ip"] = us_ip
|
|
us_event_data["geo"]["country_code"] = "US"
|
|
us_event_data["geo"]["city"] = "Mountain View"
|
|
|
|
# Process the US event
|
|
event = Event.create_from_waf_payload!("us-test", us_event_data)
|
|
assert event.persisted?
|
|
|
|
# Extract country code from payload geo data
|
|
country_code = event.payload.dig("geo", "country_code")
|
|
assert_equal "US", country_code
|
|
assert_equal us_ip, event.ip_address.to_s
|
|
|
|
# Ensure network range exists for the US IP
|
|
network_range = NetworkRangeGenerator.find_or_create_for_ip(us_ip)
|
|
assert network_range.persisted?
|
|
network_range.update!(country: 'US')
|
|
|
|
# Process WAF policies for this network range
|
|
ProcessWafPoliciesJob.perform_now(network_range: network_range, event: event)
|
|
|
|
# Verify that no blocking rule was generated
|
|
generated_rules = Rule.where(
|
|
network_range: network_range,
|
|
policy_action: 'deny',
|
|
waf_policy: @brazil_policy
|
|
)
|
|
|
|
assert_equal 0, generated_rules.count, "Should not have generated any blocking rules for US IP"
|
|
end
|
|
|
|
def test_multiple_country_policies_generate_rules_for_matching_countries_only
|
|
# Create additional policy to block China
|
|
china_policy = WafPolicy.create_country_policy(
|
|
['CN'],
|
|
policy_action: 'deny',
|
|
user: @user,
|
|
name: "Block China"
|
|
)
|
|
|
|
# Create Chinese IP event
|
|
china_ip = "220.181.38.148" # Known Chinese IP
|
|
china_event_data = @brazil_event_data.dup
|
|
china_event_data["event_id"] = "china-test-123"
|
|
china_event_data["request"]["ip"] = china_ip
|
|
china_event_data["geo"]["country_code"] = "CN"
|
|
china_event_data["geo"]["city"] = "Beijing"
|
|
|
|
# Process Chinese event
|
|
china_event = Event.create_from_waf_payload!("china-test", china_event_data)
|
|
china_network_range = NetworkRangeGenerator.find_or_create_for_ip(china_ip)
|
|
china_network_range.update!(country: 'CN')
|
|
|
|
# Process Brazilian event (from setup)
|
|
brazil_event = Event.create_from_waf_payload!("brazil-test", @brazil_event_data)
|
|
brazil_network_range = NetworkRangeGenerator.find_or_create_for_ip(@brazil_ip)
|
|
brazil_network_range.update!(country: 'BR')
|
|
|
|
# Process WAF policies for both network ranges
|
|
ProcessWafPoliciesJob.perform_now(network_range: brazil_network_range, event: brazil_event)
|
|
ProcessWafPoliciesJob.perform_now(network_range: china_network_range, event: china_event)
|
|
|
|
# Verify Brazil IP matched Brazil policy only
|
|
brazil_rules = Rule.where(network_range: brazil_network_range)
|
|
assert_equal 1, brazil_rules.count
|
|
brazil_rule = brazil_rules.first
|
|
assert_equal @brazil_policy, brazil_rule.waf_policy
|
|
assert_equal "BR", brazil_rule.metadata['matched_value']
|
|
|
|
# Verify China IP matched China policy only
|
|
china_rules = Rule.where(network_range: china_network_range)
|
|
assert_equal 1, china_rules.count
|
|
china_rule = china_rules.first
|
|
assert_equal china_policy, china_rule.waf_policy
|
|
assert_equal "CN", china_rule.metadata['matched_value']
|
|
end
|
|
|
|
def test_policy_expiration_prevents_rule_generation
|
|
# Create an expired Brazil policy
|
|
expired_policy = WafPolicy.create_country_policy(
|
|
['BR'],
|
|
policy_action: 'deny',
|
|
user: @user,
|
|
name: "Expired Brazil Block",
|
|
expires_at: 1.day.ago
|
|
)
|
|
|
|
# Process Brazilian event
|
|
event = Event.create_from_waf_payload!("expired-test", @brazil_event_data)
|
|
network_range = NetworkRangeGenerator.find_or_create_for_ip(@brazil_ip)
|
|
network_range.update!(country: 'BR')
|
|
|
|
# Process WAF policies
|
|
ProcessWafPoliciesJob.perform_now(network_range: network_range, event: event)
|
|
|
|
# Verify that no rule was generated from expired policy
|
|
generated_rules = Rule.where(
|
|
network_range: network_range,
|
|
policy_action: 'deny',
|
|
waf_policy: expired_policy
|
|
)
|
|
|
|
assert_equal 0, generated_rules.count, "Expired policy should not generate rules"
|
|
end
|
|
|
|
def test_disabled_policy_prevents_rule_generation
|
|
# Create a disabled Brazil policy
|
|
disabled_policy = WafPolicy.create_country_policy(
|
|
['BR'],
|
|
policy_action: 'deny',
|
|
user: @user,
|
|
name: "Disabled Brazil Block"
|
|
)
|
|
disabled_policy.update!(enabled: false)
|
|
|
|
# Process Brazilian event
|
|
event = Event.create_from_waf_payload!("disabled-test", @brazil_event_data)
|
|
network_range = NetworkRangeGenerator.find_or_create_for_ip(@brazil_ip)
|
|
network_range.update!(country: 'BR')
|
|
|
|
# Process WAF policies
|
|
ProcessWafPoliciesJob.perform_now(network_range: network_range, event: event)
|
|
|
|
# Verify that no rule was generated from disabled policy
|
|
generated_rules = Rule.where(
|
|
network_range: network_range,
|
|
policy_action: 'deny',
|
|
waf_policy: disabled_policy
|
|
)
|
|
|
|
assert_equal 0, generated_rules.count, "Disabled policy should not generate rules"
|
|
end
|
|
|
|
def test_policy_action_types_are_correctly_applied_to_generated_rules
|
|
# Test different policy actions
|
|
redirect_policy = WafPolicy.create_country_policy(
|
|
['BR'],
|
|
policy_action: 'redirect',
|
|
user: @user,
|
|
name: "Redirect Brazil",
|
|
additional_data: {
|
|
'redirect_url' => 'https://example.com/blocked',
|
|
'redirect_status' => 302
|
|
}
|
|
)
|
|
|
|
challenge_policy = WafPolicy.create_country_policy(
|
|
['BR'],
|
|
policy_action: 'challenge',
|
|
user: @user,
|
|
name: "Challenge Brazil",
|
|
additional_data: {
|
|
'challenge_type' => 'captcha',
|
|
'challenge_message' => 'Please verify you are human'
|
|
}
|
|
)
|
|
|
|
# Process Brazilian event for redirect policy
|
|
event = Event.create_from_waf_payload!("redirect-test", @brazil_event_data)
|
|
network_range = NetworkRangeGenerator.find_or_create_for_ip(@brazil_ip)
|
|
network_range.update!(country: 'BR')
|
|
|
|
# Manually create rule for redirect policy to test metadata handling
|
|
redirect_rule = redirect_policy.create_rule_for_network_range(network_range)
|
|
assert redirect_rule.persisted?
|
|
assert_equal 'redirect', redirect_rule.action
|
|
assert_equal 'https://example.com/blocked', redirect_rule.redirect_url
|
|
assert_equal 302, redirect_rule.redirect_status
|
|
|
|
# Manually create rule for challenge policy to test metadata handling
|
|
challenge_rule = challenge_policy.create_rule_for_network_range(network_range)
|
|
assert challenge_rule.persisted?
|
|
assert_equal 'challenge', challenge_rule.action
|
|
assert_equal 'captcha', challenge_rule.challenge_type
|
|
assert_equal 'Please verify you are human', challenge_rule.challenge_message
|
|
end
|
|
end |