b.liu | e958203 | 2025-04-17 19:18:16 +0800 | [diff] [blame] | 1 | --- a/mcproxy/src/proxy/proxy_instance.cpp |
| 2 | +++ b/mcproxy/src/proxy/proxy_instance.cpp |
| 3 | @@ -171,6 +171,9 @@ void proxy_instance::worker_thread() |
| 4 | HC_LOG_TRACE(""); |
| 5 | while (m_running) { |
| 6 | auto msg = m_job_queue.dequeue(); |
| 7 | + |
| 8 | + HC_LOG_DEBUG("Proxy Message: " << msg->get_message_type_name(msg->get_type()) ); |
| 9 | + |
| 10 | switch (msg->get_type()) { |
| 11 | case proxy_msg::TEST_MSG: |
| 12 | (*msg)(); |
| 13 | @@ -193,25 +196,66 @@ void proxy_instance::worker_thread() |
| 14 | } |
| 15 | break; |
| 16 | case proxy_msg::GROUP_RECORD_MSG: { |
| 17 | - auto r = std::static_pointer_cast<group_record_msg>(msg); |
| 18 | + auto gr = std::static_pointer_cast<group_record_msg>(msg); |
| 19 | |
| 20 | if (m_in_debug_testing_mode) { |
| 21 | std::cout << "!!--ACTION: receive record" << std::endl; |
| 22 | - std::cout << *r << std::endl; |
| 23 | + std::cout << *gr << std::endl; |
| 24 | std::cout << std::endl; |
| 25 | } |
| 26 | |
| 27 | - auto it = m_downstreams.find(r->get_if_index()); |
| 28 | + auto slist = gr->get_slist(); |
| 29 | + addr_storage saddr; |
| 30 | + if (slist.empty()) { |
| 31 | + saddr = "0.0.0.0"; |
| 32 | + } else { |
| 33 | + saddr = slist.begin()->saddr; |
| 34 | + } |
| 35 | + auto it = m_downstreams.find(gr->get_if_index()); |
| 36 | if (it != std::end(m_downstreams)) { |
| 37 | - it->second.m_querier->receive_record(msg); |
| 38 | + // Check for input filters |
| 39 | + if (!it->second.m_interface->match_input_filter(interfaces::get_if_name(gr->get_if_index()), saddr, gr->get_gaddr())) |
| 40 | + { |
| 41 | + HC_LOG_DEBUG("group report " << gr->get_gaddr() << " filtered"); |
| 42 | + } |
| 43 | + else |
| 44 | + { |
| 45 | + it->second.m_querier->receive_record(msg); |
| 46 | + } |
| 47 | } else { |
| 48 | - HC_LOG_DEBUG("failed to find querier of interface: " << interfaces::get_if_name(std::static_pointer_cast<timer_msg>(msg)->get_if_index())); |
| 49 | + HC_LOG_DEBUG("failed to find querier of interface: " << interfaces::get_if_name( gr->get_if_index() )); |
| 50 | } |
| 51 | - } |
| 52 | + } |
| 53 | + break; |
| 54 | + case proxy_msg::NEW_SOURCE_MSG: { |
| 55 | + auto sm = std::static_pointer_cast<new_source_msg>(msg); |
| 56 | + // Find the interface |
| 57 | + std::shared_ptr<interface> interf; |
| 58 | + auto it = m_downstreams.find(sm->get_if_index()); |
| 59 | + if (it != std::end(m_downstreams)) { |
| 60 | + interf = it->second.m_interface; |
| 61 | + } else { |
| 62 | + for (auto & e : m_upstreams) { |
| 63 | + if (e.m_if_index == sm->get_if_index()) { |
| 64 | + interf = e.m_interface; |
| 65 | + break; |
| 66 | + } |
| 67 | + } |
| 68 | + } |
| 69 | + if ( !interf ) |
| 70 | + { |
| 71 | + HC_LOG_DEBUG("failed to find interface: " << interfaces::get_if_name( sm->get_if_index() ) << " for Source message " << sm->get_saddr() << " | " << sm->get_gaddr() ); |
| 72 | + break; |
| 73 | + } |
| 74 | + // Check for input filters |
| 75 | + if (!interf->match_input_filter(interfaces::get_if_name(sm->get_if_index()), sm->get_saddr(), sm->get_gaddr())) |
| 76 | + { |
| 77 | + HC_LOG_DEBUG("source " << sm->get_saddr() << " | " << sm->get_gaddr() << " filtered"); |
| 78 | + } else { |
| 79 | + m_routing_management->event_new_source(msg); |
| 80 | + } |
| 81 | + } |
| 82 | break; |
| 83 | - case proxy_msg::NEW_SOURCE_MSG: |
| 84 | - m_routing_management->event_new_source(msg); |
| 85 | - break; |
| 86 | case proxy_msg::NEW_SOURCE_TIMER_MSG: |
| 87 | m_routing_management->timer_triggerd_maintain_routing_table(msg); |
| 88 | break; |