changelog shortlog graph tags branches changeset files revisions annotate raw help

Mercurial > core / rust/lib/net/src/engine/dns/client.rs

changeset 698: 96958d3eb5b0
parent: 611060cba9f0
author: Richard Westhaver <ellis@rwest.io>
date: Fri, 04 Oct 2024 22:04:59 -0400
permissions: -rw-r--r--
description: fixes
1 use crate::Result;
2 
3 use std::{
4  collections::HashSet,
5  net::IpAddr,
6  sync::{Arc, Mutex},
7  thread::{Builder, JoinHandle},
8 };
9 
10 use tokio::{
11  runtime::Runtime,
12  sync::mpsc::{self, Sender},
13 };
14 
15 use super::{resolver::Lookup, IpMap};
16 
17 type PendingAddrs = HashSet<IpAddr>;
18 
19 const CHANNEL_SIZE: usize = 1_000;
20 
21 pub struct Client {
22  cache: Arc<Mutex<IpMap>>,
23  pending: Arc<Mutex<PendingAddrs>>,
24  tx: Option<Sender<Vec<IpAddr>>>,
25  handle: Option<JoinHandle<()>>,
26 }
27 
28 impl Client {
29  pub fn new<R>(resolver: R, runtime: Runtime) -> Result<Self>
30  where
31  R: Lookup + Send + Sync + 'static,
32  {
33  let cache = Arc::new(Mutex::new(IpMap::new()));
34  let pending = Arc::new(Mutex::new(PendingAddrs::new()));
35  let (tx, mut rx) = mpsc::channel::<Vec<IpAddr>>(CHANNEL_SIZE);
36 
37  let handle = Builder::new().name("resolver".into()).spawn({
38  let cache = cache.clone();
39  let pending = pending.clone();
40  move || {
41  runtime.block_on(async {
42  let resolver = Arc::new(resolver);
43 
44  while let Some(ips) = rx.recv().await {
45  for ip in ips {
46  tokio::spawn({
47  let resolver = resolver.clone();
48  let cache = cache.clone();
49  let pending = pending.clone();
50  async move {
51  if let Some(n) = resolver.lookup(ip).await {
52  let name: String = n.to_owned();
53  cache.lock().unwrap().insert(ip, name);
54  }
55  pending.lock().unwrap().remove(&ip);
56  }
57  });
58  }
59  }
60  });
61  }
62  })?;
63 
64  Ok(Self {
65  cache,
66  pending,
67  tx: Some(tx),
68  handle: Some(handle),
69  })
70  }
71 
72  pub fn resolve(&mut self, ips: Vec<IpAddr>) {
73  // Remove ips that are already being resolved
74  let ips = ips
75  .into_iter()
76  .filter(|ip| self.pending.lock().unwrap().insert(*ip))
77  .collect::<Vec<_>>();
78 
79  if !ips.is_empty() {
80  // Discard the message if the channel is full; it will be retried
81  // eventually
82  let _ = self.tx.as_mut().unwrap().try_send(ips);
83  }
84  }
85 
86  pub fn cache(&mut self) -> IpMap {
87  let cache = self.cache.lock().unwrap();
88  cache.clone()
89  }
90 }
91 
92 impl Drop for Client {
93  fn drop(&mut self) {
94  // Do the Option dance to be able to drop the sender so that the receiver
95  // finishes and the thread can be joined
96  drop(self.tx.take().unwrap());
97  self.handle.take().unwrap().join().unwrap();
98  }
99 }