# frozen_string_literal: true require "test_helper" class WafPolicyIntegrationTest < ActiveSupport::TestCase # Don't load any fixtures self.use_transactional_tests = true def setup # Clean up any existing data Event.delete_all Rule.delete_all NetworkRange.delete_all WafPolicy.delete_all User.delete_all Project.delete_all @user = User.create!(email_address: "test@example.com", password: "password") @project = Project.create!(name: "Test Project", slug: "test-project", public_key: "test-key-123456") # Create a WAF policy to block Brazil @brazil_policy = WafPolicy.create_country_policy( ['BR'], policy_policy_action: 'deny', user: @user, name: "Block Brazil" ) # Sample event data for a Brazilian IP @brazil_ip = "177.104.144.10" # Known Brazilian IP @brazil_event_data = { "request_id" => "brazil-test-123", "timestamp" => Time.now.iso8601, "request" => { "ip" => @brazil_ip, "method" => "GET", "path" => "/api/test", "headers" => { "host" => "example.com", "user-agent" => "TestAgent/1.0" } }, "response" => { "status_code" => 200, "duration_ms" => 150 }, "waf_action" => "allow", "server_name" => "test-server", "environment" => "test", "geo" => { "country_code" => "BR", "city" => "São Paulo" }, "agent" => { "name" => "baffle-agent", "version" => "1.0.0" } } end def teardown Event.delete_all Rule.delete_all NetworkRange.delete_all WafPolicy.delete_all User.delete_all end test "Brazil WAF policy generates block rule when Brazilian event is processed" do # Process the Brazilian event event = Event.create_from_waf_payload!("brazil-test", @brazil_event_data) assert event.persisted? assert_equal "BR", event.country_code assert_equal @brazil_ip, event.ip_address # Ensure network range exists for the Brazilian IP network_range = NetworkRangeGenerator.find_or_create_for_ip(@brazil_ip) assert network_range.persisted? assert network_range.contains_ip?(@brazil_ip) # Set the country on the network range to simulate geo-lookup network_range.update!(country: 'BR') # Process WAF policies for this network range ProcessWafPoliciesJob.perform_now(network_range: network_range, event: event) # Verify that a blocking rule was generated generated_rules = Rule.where( network_range: network_range, policy_action: 'deny', waf_policy: @brazil_policy ) assert_equal 1, generated_rules.count, "Should have generated exactly one blocking rule" rule = generated_rules.first assert_equal 'deny', rule.waf_action assert_equal network_range, rule.network_range assert_equal @brazil_policy, rule.waf_policy assert_equal "policy:Block Brazil", rule.source assert rule.enabled?, "Generated rule should be enabled" # Verify rule metadata contains policy information metadata = rule.metadata assert_equal @brazil_policy.id, metadata['generated_by_policy'] assert_equal "Block Brazil", metadata['policy_name'] assert_equal "country", metadata['policy_type'] assert_equal "country", metadata['matched_field'] assert_equal "BR", metadata['matched_value'] end test "Non-Brazilian event does not generate block rule from Brazil policy" do # Create event data for a US IP us_ip = "8.8.8.8" # Known US IP us_event_data = @brazil_event_data.dup us_event_data["event_id"] = "us-test-123" us_event_data["request"]["ip"] = us_ip us_event_data["geo"]["country_code"] = "US" us_event_data["geo"]["city"] = "Mountain View" # Process the US event event = Event.create_from_waf_payload!("us-test", us_event_data) assert event.persisted? assert_equal "US", event.country_code assert_equal us_ip, event.ip_address # Ensure network range exists for the US IP network_range = NetworkRangeGenerator.find_or_create_for_ip(us_ip) assert network_range.persisted? network_range.update!(country: 'US') # Process WAF policies for this network range ProcessWafPoliciesJob.perform_now(network_range: network_range, event: event) # Verify that no blocking rule was generated generated_rules = Rule.where( network_range: network_range, policy_action: 'deny', waf_policy: @brazil_policy ) assert_equal 0, generated_rules.count, "Should not have generated any blocking rules for US IP" end test "Multiple country policies generate rules for matching countries only" do # Create additional policy to block China china_policy = WafPolicy.create_country_policy( ['CN'], policy_action: 'deny', user: @user, name: "Block China" ) # Create Chinese IP event china_ip = "220.181.38.148" # Known Chinese IP china_event_data = @brazil_event_data.dup china_event_data["event_id"] = "china-test-123" china_event_data["request"]["ip"] = china_ip china_event_data["geo"]["country_code"] = "CN" china_event_data["geo"]["city"] = "Beijing" # Process Chinese event china_event = Event.create_from_waf_payload!("china-test", china_event_data) china_network_range = NetworkRangeGenerator.find_or_create_for_ip(china_ip) china_network_range.update!(country: 'CN') # Process Brazilian event (from setup) brazil_event = Event.create_from_waf_payload!("brazil-test", @brazil_event_data) brazil_network_range = NetworkRangeGenerator.find_or_create_for_ip(@brazil_ip) brazil_network_range.update!(country: 'BR') # Process WAF policies for both network ranges ProcessWafPoliciesJob.perform_now(network_range: brazil_network_range, event: brazil_event) ProcessWafPoliciesJob.perform_now(network_range: china_network_range, event: china_event) # Verify Brazil IP matched Brazil policy only brazil_rules = Rule.where(network_range: brazil_network_range) assert_equal 1, brazil_rules.count brazil_rule = brazil_rules.first assert_equal @brazil_policy, brazil_rule.waf_policy assert_equal "BR", brazil_rule.metadata['matched_value'] # Verify China IP matched China policy only china_rules = Rule.where(network_range: china_network_range) assert_equal 1, china_rules.count china_rule = china_rules.first assert_equal china_policy, china_rule.waf_policy assert_equal "CN", china_rule.metadata['matched_value'] end test "Policy expiration prevents rule generation" do # Create an expired Brazil policy expired_policy = WafPolicy.create_country_policy( ['BR'], policy_action: 'deny', user: @user, name: "Expired Brazil Block", expires_at: 1.day.ago ) # Process Brazilian event event = Event.create_from_waf_payload!("expired-test", @brazil_event_data) network_range = NetworkRangeGenerator.find_or_create_for_ip(@brazil_ip) network_range.update!(country: 'BR') # Process WAF policies ProcessWafPoliciesJob.perform_now(network_range: network_range, event: event) # Verify that no rule was generated from expired policy generated_rules = Rule.where( network_range: network_range, policy_action: 'deny', waf_policy: expired_policy ) assert_equal 0, generated_rules.count, "Expired policy should not generate rules" end test "Disabled policy prevents rule generation" do # Create a disabled Brazil policy disabled_policy = WafPolicy.create_country_policy( ['BR'], policy_action: 'deny', user: @user, name: "Disabled Brazil Block" ) disabled_policy.update!(enabled: false) # Process Brazilian event event = Event.create_from_waf_payload!("disabled-test", @brazil_event_data) network_range = NetworkRangeGenerator.find_or_create_for_ip(@brazil_ip) network_range.update!(country: 'BR') # Process WAF policies ProcessWafPoliciesJob.perform_now(network_range: network_range, event: event) # Verify that no rule was generated from disabled policy generated_rules = Rule.where( network_range: network_range, policy_action: 'deny', waf_policy: disabled_policy ) assert_equal 0, generated_rules.count, "Disabled policy should not generate rules" end test "Policy action types are correctly applied to generated rules" do # Test different policy actions redirect_policy = WafPolicy.create_country_policy( ['BR'], policy_action: 'redirect', user: @user, name: "Redirect Brazil", additional_data: { 'redirect_url' => 'https://example.com/blocked', 'redirect_status' => 302 } ) challenge_policy = WafPolicy.create_country_policy( ['BR'], policy_action: 'challenge', user: @user, name: "Challenge Brazil", additional_data: { 'challenge_type' => 'captcha', 'challenge_message' => 'Please verify you are human' } ) # Process Brazilian event for redirect policy event = Event.create_from_waf_payload!("redirect-test", @brazil_event_data) network_range = NetworkRangeGenerator.find_or_create_for_ip(@brazil_ip) network_range.update!(country: 'BR') # Manually create rule for redirect policy to test metadata handling redirect_rule = redirect_policy.create_rule_for_network_range(network_range) assert redirect_rule.persisted? assert_equal 'redirect', redirect_rule.action assert_equal 'https://example.com/blocked', redirect_rule.redirect_url assert_equal 302, redirect_rule.redirect_status # Manually create rule for challenge policy to test metadata handling challenge_rule = challenge_policy.create_rule_for_network_range(network_range) assert challenge_rule.persisted? assert_equal 'challenge', challenge_rule.action assert_equal 'captcha', challenge_rule.challenge_type assert_equal 'Please verify you are human', challenge_rule.challenge_message end end