1use std::fmt;
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use serde::{Deserialize, Serialize};
11
12use crate::Transport::{
13 gRPCTransport::gRPCTransport,
14 IPCTransport::IPCTransport,
15 WASMTransport::WASMTransportImpl,
16};
17
18#[async_trait]
23pub trait TransportStrategy: Send + Sync {
24 type Error: std::error::Error + Send + Sync + 'static;
26
27 async fn connect(&self) -> Result<(), Self::Error>;
29
30 async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error>;
32
33 async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error>;
35
36 async fn close(&self) -> Result<(), Self::Error>;
38
39 fn is_connected(&self) -> bool;
41
42 fn transport_type(&self) -> TransportType;
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
48pub enum TransportType {
49 gRPC,
51 IPC,
53 WASM,
55 Unknown,
57}
58
59impl fmt::Display for TransportType {
60 fn fmt(&self, f:&mut fmt::Formatter<'_>) -> fmt::Result {
61 match self {
62 Self::gRPC => write!(f, "grpc"),
63 Self::IPC => write!(f, "ipc"),
64 Self::WASM => write!(f, "wasm"),
65 Self::Unknown => write!(f, "unknown"),
66 }
67 }
68}
69
70impl std::str::FromStr for TransportType {
71 type Err = anyhow::Error;
72
73 fn from_str(s:&str) -> Result<Self, Self::Err> {
74 match s.to_lowercase().as_str() {
75 "grpc" => Ok(Self::gRPC),
76 "ipc" => Ok(Self::IPC),
77 "wasm" => Ok(Self::WASM),
78 _ => Err(anyhow::anyhow!("Unknown transport type: {}", s)),
79 }
80 }
81}
82
83#[derive(Debug)]
87pub enum Transport {
88 gRPC(gRPCTransport),
90 IPC(IPCTransport),
92 WASM(WASMTransportImpl),
94}
95
96impl Transport {
97 pub fn transport_type(&self) -> TransportType {
99 match self {
100 Self::gRPC(_) => TransportType::gRPC,
101 Self::IPC(_) => TransportType::IPC,
102 Self::WASM(_) => TransportType::WASM,
103 }
104 }
105
106 pub async fn connect(&self) -> anyhow::Result<()> {
108 match self {
109 Self::gRPC(transport) => {
110 transport
111 .connect()
112 .await
113 .map_err(|e| anyhow::anyhow!("gRPC connect error: {}", e))
114 },
115 Self::IPC(transport) => {
116 transport
117 .connect()
118 .await
119 .map_err(|e| anyhow::anyhow!("IPC connect error: {}", e))
120 },
121 Self::WASM(transport) => {
122 transport
123 .connect()
124 .await
125 .map_err(|e| anyhow::anyhow!("WASM connect error: {}", e))
126 },
127 }
128 }
129
130 pub async fn send(&self, request:&[u8]) -> anyhow::Result<Vec<u8>> {
132 match self {
133 Self::gRPC(transport) => {
134 transport
135 .send(request)
136 .await
137 .map_err(|e| anyhow::anyhow!("gRPC send error: {}", e))
138 },
139 Self::IPC(transport) => {
140 transport
141 .send(request)
142 .await
143 .map_err(|e| anyhow::anyhow!("IPC send error: {}", e))
144 },
145 Self::WASM(transport) => {
146 transport
147 .send(request)
148 .await
149 .map_err(|e| anyhow::anyhow!("WASM send error: {}", e))
150 },
151 }
152 }
153
154 pub async fn send_no_response(&self, data:&[u8]) -> anyhow::Result<()> {
156 match self {
157 Self::gRPC(transport) => {
158 transport
159 .send_no_response(data)
160 .await
161 .map_err(|e| anyhow::anyhow!("gRPC send error: {}", e))
162 },
163 Self::IPC(transport) => {
164 transport
165 .send_no_response(data)
166 .await
167 .map_err(|e| anyhow::anyhow!("IPC send error: {}", e))
168 },
169 Self::WASM(transport) => {
170 transport
171 .send_no_response(data)
172 .await
173 .map_err(|e| anyhow::anyhow!("WASM send error: {}", e))
174 },
175 }
176 }
177
178 pub async fn close(&self) -> anyhow::Result<()> {
180 match self {
181 Self::gRPC(transport) => transport.close().await.map_err(|e| anyhow::anyhow!("gRPC close error: {}", e)),
182 Self::IPC(transport) => transport.close().await.map_err(|e| anyhow::anyhow!("IPC close error: {}", e)),
183 Self::WASM(transport) => transport.close().await.map_err(|e| anyhow::anyhow!("WASM close error: {}", e)),
184 }
185 }
186
187 pub fn is_connected(&self) -> bool {
189 match self {
190 Self::gRPC(transport) => transport.is_connected(),
191 Self::IPC(transport) => transport.is_connected(),
192 Self::WASM(transport) => transport.is_connected(),
193 }
194 }
195
196 pub fn AsgRPC(&self) -> Option<&gRPCTransport> {
198 match self {
199 Self::gRPC(Transport) => Some(Transport),
200 _ => None,
201 }
202 }
203
204 pub fn AsIPC(&self) -> Option<&IPCTransport> {
206 match self {
207 Self::IPC(Transport) => Some(Transport),
208 _ => None,
209 }
210 }
211
212 pub fn as_wasm(&self) -> Option<&WASMTransportImpl> {
214 match self {
215 Self::WASM(transport) => Some(transport),
216 _ => None,
217 }
218 }
219}
220
221impl Default for Transport {
222 fn default() -> Self {
223 Self::gRPC(
224 gRPCTransport::New("127.0.0.1:50050").unwrap_or_else(|_| {
225 gRPCTransport::New("0.0.0.0:50050")
226 .expect("Failed to create default gRPC transport")
227 }),
228 )
229 }
230}
231
232impl fmt::Display for Transport {
233 fn fmt(&self, f:&mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Transport({})", self.transport_type()) }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct TransportMessage {
239 pub message_type:String,
241 pub message_id:String,
243 pub timestamp:u64,
245 pub payload:Bytes,
247 pub metadata:Option<serde_json::Value>,
249}
250
251impl TransportMessage {
252 pub fn new(message_type:impl Into<String>, payload:Bytes) -> Self {
254 Self {
255 message_type:message_type.into(),
256 message_id:uuid::Uuid::new_v4().to_string(),
257 timestamp:std::time::SystemTime::now()
258 .duration_since(std::time::UNIX_EPOCH)
259 .map(|d| d.as_secs())
260 .unwrap_or(0),
261 payload,
262 metadata:None,
263 }
264 }
265
266 pub fn with_metadata(mut self, metadata:serde_json::Value) -> Self {
268 self.metadata = Some(metadata);
269 self
270 }
271
272 pub fn to_bytes(&self) -> anyhow::Result<Bytes> {
274 serde_json::to_vec(self).map(Bytes::from).map_err(|e| anyhow::anyhow!("{}", e))
275 }
276
277 pub fn from_bytes(bytes:&[u8]) -> anyhow::Result<Self> {
279 serde_json::from_slice(bytes).map_err(|e| anyhow::anyhow!("{}", e))
280 }
281}
282
283#[derive(Debug, Clone, Default, Serialize, Deserialize)]
285pub struct TransportStats {
286 pub messages_sent:u64,
288 pub messages_received:u64,
290 pub errors:u64,
292 pub bytes_sent:u64,
294 pub bytes_received:u64,
296 pub avg_latency_us:u64,
298 pub uptime_seconds:u64,
300}
301
302impl TransportStats {
303 pub fn record_sent(&mut self, bytes:u64, latency_us:u64) {
305 self.messages_sent += 1;
306 self.bytes_sent += bytes;
307
308 if self.messages_sent > 0 {
310 self.avg_latency_us = (self.avg_latency_us * (self.messages_sent - 1) + latency_us) / self.messages_sent;
311 }
312 }
313
314 pub fn record_received(&mut self, bytes:u64) {
316 self.messages_received += 1;
317 self.bytes_received += bytes;
318 }
319
320 pub fn record_error(&mut self) { self.errors += 1; }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 #[test]
329 fn test_transport_type_to_string() {
330 assert_eq!(TransportType::gRPC.to_string(), "grpc");
331 assert_eq!(TransportType::IPC.to_string(), "ipc");
332 assert_eq!(TransportType::WASM.to_string(), "wasm");
333 }
334
335 #[test]
336 fn test_transport_type_from_str() {
337 assert_eq!("grpc".parse::<TransportType>().unwrap(), TransportType::gRPC);
338 assert_eq!("ipc".parse::<TransportType>().unwrap(), TransportType::IPC);
339 assert_eq!("wasm".parse::<TransportType>().unwrap(), TransportType::WASM);
340 assert!("unknown".parse::<TransportType>().is_err());
341 }
342
343 #[test]
344 fn test_transport_display() {
345 let transport = Transport::default();
348 let display = format!("{}", transport);
349 assert!(display.contains("Transport"));
350 }
351
352 #[test]
353 fn test_transport_message_creation() {
354 let message = TransportMessage::new("test_type", Bytes::from("hello"));
355 assert_eq!(message.message_type, "test_type");
356 assert_eq!(message.payload, Bytes::from("hello"));
357 assert!(!message.message_id.is_empty());
358 }
359
360 #[test]
361 fn test_transport_message_serialization() {
362 let message = TransportMessage::new("test", Bytes::from("data"));
363 let bytes = message.to_bytes().unwrap();
364 let deserialized = TransportMessage::from_bytes(&bytes).unwrap();
365 assert_eq!(deserialized.message_type, message.message_type);
366 assert_eq!(deserialized.payload, message.payload);
367 }
368
369 #[test]
370 fn test_transport_stats() {
371 let mut stats = TransportStats::default();
372 stats.record_sent(100, 1000);
373 stats.record_received(50);
374 stats.record_error();
375
376 assert_eq!(stats.messages_sent, 1);
377 assert_eq!(stats.messages_received, 1);
378 assert_eq!(stats.errors, 1);
379 assert_eq!(stats.bytes_sent, 100);
380 assert_eq!(stats.bytes_received, 50);
381 assert_eq!(stats.avg_latency_us, 1000);
382 }
383}