changelog shortlog graph tags branches changeset files revisions annotate raw help

Mercurial > core / rust/bin/alik/lib.rs

changeset 698: 96958d3eb5b0
parent: 94d358919982
author: Richard Westhaver <ellis@rwest.io>
date: Fri, 04 Oct 2024 22:04:59 -0400
permissions: -rw-r--r--
description: fixes
1 /// app/cli/alik/lib.rs --- Alik Lib
2 
3 // Helper of man
4 
5 /// Code:
6 pub mod graphql;
7 pub mod http;
8 pub mod ping;
9 pub mod udp;
10 
11 use db::{rocksdb, Db, DbConfigExt};
12 use net::{
13  axum::{
14  body::{Body, Bytes},
15  extract::State,
16  http::{HeaderMap, HeaderName, HeaderValue, StatusCode},
17  response::{IntoResponse, Response},
18  routing::get,
19  Router,
20  },
21  http::tower::trace::TraceLayer,
22  reqwest::Client,
23 };
24 
25 use serde::{Deserialize, Serialize};
26 
27 use std::{
28  collections::HashMap,
29  fs,
30  path::{Path, PathBuf},
31  sync::Arc,
32 };
33 
34 pub use krypt::KryptConfig;
35 
36 use logger::{info, log, tracing::Span};
37 
38 use obj::{Configure, NetworkConfig, Objective};
39 
40 use std::time::Duration;
41 
42 #[derive(Serialize, Deserialize, Clone, Debug, Default)]
43 pub struct AlikConfig {
44  krypt: KryptConfig,
45  db_path: Option<PathBuf>,
46  db_opts: HashMap<String, String>,
47  net: NetworkConfig,
48 }
49 impl AlikConfig {
50  pub fn new() -> Self {
51  AlikConfig::default()
52  }
53  pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, obj::Error> {
54  let s = fs::read_to_string(path)?;
55  AlikConfig::from_json_str(&s)
56  }
57 }
58 
59 obj::impl_config!(AlikConfig);
60 
61 impl DbConfigExt for AlikConfig {
62  fn db_path(self) -> Option<PathBuf> {
63  self.db_path
64  }
65  fn db_user(self) -> Option<String> {
66  None
67  }
68  fn set_db_config_value(&mut self, key: &str, val: &str) -> Option<String> {
69  self.db_opts.insert(key.to_string(), val.to_string())
70  }
71  fn get_db_config_value(self, key: &str) -> Option<String> {
72  self.db_opts.get(key).cloned()
73  }
74 }
75 
76 pub trait AlikService {}
77 
78 #[derive(Debug)]
79 pub struct Alik {
80  db: Option<rocksdb::DB>,
81  config: Arc<AlikConfig>,
82  router: Option<Router>,
83 }
84 
85 impl Alik {
86  pub fn new() -> Alik {
87  Alik {
88  db: None,
89  config: Arc::new(AlikConfig::new()),
90  router: None,
91  }
92  }
93  pub fn with_config(cfg: &AlikConfig) -> Alik {
94  Alik {
95  db: None,
96  config: Arc::new(cfg.to_owned()),
97  router: None,
98  }
99  }
100  pub fn network_init(&mut self) -> Result<(), net::Error> {
101  let socket = &self.config.net.socket;
102  let peers = self.config.net.peers.as_ref();
103  info!("initializing on socket {:?}", socket);
104  info!("initializing with peers {:?}", peers.unwrap());
105  self.router = Some(Router::new().route("/", get("")));
106  Ok(())
107  }
108 }
109 
110 impl Db for Alik {
111  fn db_init(&self) -> Result<rocksdb::DB, db::Error> {
112  let path = self.config.db_path.as_ref();
113  let opts = &self.config.db_opts;
114  info!("{:?}", opts);
115  rocksdb::DB::open(&rocksdb::Options::default(), path.unwrap()).unwrap();
116  Ok(rocksdb::DB::open_default("").unwrap())
117  }
118  fn db_init_mut(&mut self) -> Result<(), db::Error> {
119  self.db = Some(self.db_init().unwrap());
120  Ok(())
121  }
122  fn db_open(&self) -> Result<(), db::Error> {
123  Ok(())
124  }
125  fn db_close(&self) -> Result<(), db::Error> {
126  if let Some(db) = &self.db {
127  db.cancel_all_background_work(true)
128  };
129  Ok(())
130  }
131  fn db_close_mut(&mut self) -> Result<(), db::Error> {
132  self.db_close().unwrap();
133  self.db = None;
134  Ok(())
135  }
136  fn db_query(&self) -> Result<(), db::Error> {
137  Ok(())
138  }
139  fn db_transaction(&self) -> Result<(), db::Error> {
140  Ok(())
141  }
142 }
143 
144 /// Server
145 pub async fn proxy_via_reqwest(State(client): State<Client>) -> Response {
146  let reqwest_response =
147  match client.get("https://compiler.company").send().await {
148  Ok(res) => res,
149  Err(err) => {
150  log::error!("{} {}", &err, "request failed");
151  return (StatusCode::BAD_REQUEST, Body::empty()).into_response();
152  }
153  };
154 
155  let response_builder =
156  Response::builder().status(reqwest_response.status().as_u16());
157 
158  // different http crate versions?
159  let mut headers = HeaderMap::with_capacity(reqwest_response.headers().len());
160  headers.extend(reqwest_response.headers().into_iter().map(
161  |(name, value)| {
162  let name = HeaderName::from_bytes(name.as_ref()).unwrap();
163  let value = HeaderValue::from_bytes(value.as_ref()).unwrap();
164  (name, value)
165  },
166  ));
167 
168  response_builder
169  .body(Body::from_stream(reqwest_response.bytes_stream()))
170  // body is empty, no error
171  .unwrap()
172 }
173 
174 pub async fn start_http_proxy(addr: &str) {
175  let client = Client::new();
176  let app = Router::new()
177  .route("/", get(proxy_via_reqwest))
178  .layer(TraceLayer::new_for_http().on_body_chunk(
179  |chunk: &Bytes, _latency: Duration, _span: &Span| {
180  log::debug!("streaming {} bytes", chunk.len());
181  },
182  ))
183  .with_state(client);
184  let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
185  println!(
186  "http_proxy running on: http://{}",
187  listener.local_addr().unwrap()
188  );
189  net::axum::serve(listener, app).await.unwrap();
190 }
191 
192 // pub async fn stream_some_data() -> Body {
193 // let stream = net::stream::iter(0..5)
194 // .throttle(Duration::from_secs(1))
195 // .map(|n| n.to_string())
196 // .map(Ok::<_, Infallible>);
197 // Body::from_stream(stream)
198 // }