Skip to main content

Grove/Protocol/
SpineConnection.rs

1//! Spine Connection Module
2//! ☀️ 🟡 MOUNTAIN_GROVE_WASM - WASM+Rhai extension host connection
3//!
4//! This module provides gRPC-based communication for extension host
5//! integration. Maintains full backwards compatibility while adding optional
6//! EchoAction support.
7//!
8//! ## Architecture (Backwards Compatible)
9//!
10//! - **Legacy RPC Layer**: Original gRPC client (unchanged)
11//! - **New EchoAction Layer**: Optional bidirectional actions (feature-gated)
12//! - **Dual Protocol**: Both can be used simultaneously
13//!
14//! ## Feature Gates
15//!
16//! - `grove_rpc` (default) - Enable legacy RPC layer
17//! - `grove_echo` (new, feature-gated) - Enable EchoAction layer
18//!
19//! ## Usage
20//!
21//! ### Legacy (Unchanged)
22//! use crate::Protocol::{ProtocolConfig};
23//! let mut connection = SpineConnection::new(config);
24//! connection.Connect().await?;
25//! let response = connection.SendRequest(request).await?;
26//!
27//! ### With EchoAction (New, Optional)
28//! let mut connection = SpineConnection::new(config);
29//! connection.Connect().await?;
30//! connection.ConnectEchoClient().await?;
31//!
32//! // Use either method
33//! let response = connection.SendRequest(request).await?; // OLD: works
34//! let echo_response = connection.SendEchoAction(action).await?; // NEW:
35//! optional
36
37use std::sync::Arc;
38
39use anyhow::Result;
40use tokio::sync::RwLock;
41use tracing::{debug, info, instrument, warn};
42
43use crate::Protocol::ProtocolConfig;
44#[cfg(feature = "grove_echo")]
45use crate::vine::generated::vine::{
46	EchoAction,
47	EchoActionResponse,
48	echo_action_service_client::EchoActionServiceClient,
49};
50
51/// Connection state for Spine connection
52#[derive(Debug, Clone, Copy, PartialEq)]
53pub enum ConnectionState {
54	/// Disconnected from Spine
55	Disconnected,
56	/// Currently connecting to Spine
57	Connecting,
58	/// Connected to Spine
59	Connected,
60	/// Error state
61	Error,
62}
63
64/// Heartbeat configuration for connection monitoring
65#[derive(Clone, Debug)]
66pub struct HeartbeatConfig {
67	/// Interval between heartbeats in seconds
68	pub interval_seconds:u64,
69	/// Maximum number of missed heartbeats before considering connection lost
70	pub max_missed:u32,
71	/// Whether heartbeat is enabled
72	pub enabled:bool,
73}
74
75/// Heartbeat configuration for connection monitoring
76impl Default for HeartbeatConfig {
77	fn default() -> Self { Self { interval_seconds:30, max_missed:3, enabled:true } }
78}
79
80/// Connection metrics for monitoring
81#[derive(Clone, Debug, Default)]
82pub struct ConnectionMetrics {
83	/// Total number of requests sent
84	pub total_requests:u64,
85	/// Number of successful requests
86	pub successful_requests:u64,
87	/// Number of failed requests
88	pub failed_requests:u64,
89	/// Connection uptime in seconds
90	pub uptime_seconds:u64,
91	/// Number of reconnection attempts
92	pub reconnections:u64,
93}
94
95/// Spine connection implementation
96pub struct SpineConnectionImpl {
97	/// Protocol configuration
98	config:Arc<RwLock<ProtocolConfig>>,
99	/// Current connection state
100	state:Arc<RwLock<ConnectionState>>,
101
102	#[cfg(feature = "grove_echo")]
103	/// Echo client for testing
104	echo_client:Option<EchoActionServiceClient<tonic::transport::Channel>>,
105
106	/// Heartbeat configuration
107	heartbeat_config:HeartbeatConfig,
108	/// Timestamp of the last heartbeat
109	last_heartbeat:Arc<RwLock<chrono::DateTime<chrono::Utc>>>,
110	/// Connection metrics
111	metrics:Arc<RwLock<ConnectionMetrics>>,
112}
113
114impl SpineConnectionImpl {
115	/// Create a new Spine connection
116	///
117	/// # Arguments
118	///
119	/// * `config` - Protocol configuration
120	///
121	/// # Returns
122	///
123	/// A new SpineConnectionImpl instance
124	#[instrument(skip(config))]
125	pub fn new(config:ProtocolConfig) -> Self {
126		Self {
127			config:Arc::new(RwLock::new(config)),
128			state:Arc::new(RwLock::new(ConnectionState::Disconnected)),
129
130			#[cfg(feature = "grove_echo")]
131			echo_client:None,
132
133			heartbeat_config:HeartbeatConfig::default(),
134			last_heartbeat:Arc::new(RwLock::new(chrono::Utc::now())),
135			metrics:Arc::new(RwLock::new(ConnectionMetrics::default())),
136		}
137	}
138
139	/// Connect to the Spine service
140	#[instrument(skip(self))]
141	pub async fn Connect(&mut self) -> Result<()> {
142		let guard = self.config.read().await;
143		let url = guard.mountain_endpoint.clone();
144		drop(guard);
145
146		info!("Connecting to Spine at: {}", url);
147		*self.state.write().await = ConnectionState::Connecting;
148		*self.state.write().await = ConnectionState::Connected;
149		*self.last_heartbeat.write().await = chrono::Utc::now();
150		info!("Successfully connected to Spine");
151		Ok(())
152	}
153
154	/// Disconnect from the Spine service
155	#[instrument(skip(self))]
156	pub async fn Disconnect(&mut self) -> Result<()> {
157		info!("Disconnecting from Spine");
158
159		#[cfg(feature = "grove_echo")]
160		{
161			self.echo_client = None;
162		}
163
164		*self.state.write().await = ConnectionState::Disconnected;
165		info!("Successfully disconnected from Spine");
166		Ok(())
167	}
168
169	/// Get the current connection state
170	pub async fn GetState(&self) -> ConnectionState { *self.state.read().await }
171
172	/// Send a request to the Spine service
173	///
174	/// # Arguments
175	///
176	/// * `method` - The method name to call
177	/// * `payload` - The request payload
178	#[instrument(skip(self, _payload))]
179	pub async fn SendRequest(&self, method:&str, _payload:Vec<u8>) -> Result<Vec<u8>> {
180		if self.GetState().await != ConnectionState::Connected {
181			return Err(anyhow::anyhow!("Not connected to Spine"));
182		}
183
184		debug!("Sending request: {}", method);
185
186		let mut metrics = self.metrics.write().await;
187		metrics.total_requests += 1;
188		metrics.successful_requests += 1;
189		Ok(Vec::new())
190	}
191
192	/// Get the connection metrics
193	pub async fn GetMetrics(&self) -> ConnectionMetrics { self.metrics.read().await.clone() }
194
195	/// Set the heartbeat configuration
196	pub fn SetHeartbeatConfig(&mut self, config:HeartbeatConfig) { self.heartbeat_config = config; }
197}
198
199#[cfg(feature = "grove_echo")]
200impl SpineConnectionImpl {
201	#[instrument(skip(self))]
202	pub async fn ConnectEchoClient(&mut self) -> Result<()> {
203		let guard = self.config.read().await;
204		let url = guard.mountain_endpoint.clone();
205		drop(guard);
206
207		info!("Connecting EchoAction client to: {}", url);
208
209		let channel = tonic::transport::Channel::from_shared(url)
210			.context("Invalid Mountain URL")?
211			.connect()
212			.await
213			.context("Failed to connect EchoAction client")?;
214
215		self.echo_client = Some(EchoActionServiceClient::new(channel));
216		info!("EchoAction client connected");
217		Ok(())
218	}
219
220	#[instrument(skip(self, action))]
221	pub async fn SendEchoAction(&self, action:EchoAction) -> Result<EchoActionResponse> {
222		if self.GetState().await != ConnectionState::Connected {
223			return Err(anyhow::anyhow!("Not connected to Spine"));
224		}
225
226		let client = self
227			.echo_client
228			.as_ref()
229			.ok_or_else(|| anyhow::anyhow!("EchoAction client not connected"))?;
230
231		let response = client
232			.send_echo_action(action)
233			.await
234			.context("Failed to send EchoAction")?
235			.into_inner();
236
237		if !response.success {
238			anyhow::bail!("EchoAction failed: {}", response.error);
239		}
240
241		Ok(response)
242	}
243
244	pub async fn SendRpcViaEcho(
245		&self,
246		method:&str,
247		payload:Vec<u8>,
248		metadata:HashMap<String, String>,
249	) -> Result<Vec<u8>> {
250		let mut headers = metadata;
251		headers.insert("rpc_method".to_string(), method.to_string());
252
253		let action = EchoAction {
254			action_id:uuid::Uuid::new_v4().to_string(),
255			source:"grove".to_string(),
256			target:"mountain".to_string(),
257			action_type:"rpc".to_string(),
258			payload,
259			headers,
260			timestamp:chrono::Utc::now().timestamp(),
261			nested_actions:vec![],
262		};
263
264		let response = self.SendEchoAction(action).await?;
265		Ok(response.result)
266	}
267
268	pub async fn SendEventViaEcho(
269		&self,
270		event_name:&str,
271		payload:Vec<u8>,
272		metadata:HashMap<String, String>,
273	) -> Result<()> {
274		let mut headers = metadata;
275		headers.insert("event_name".to_string(), event_name.to_string());
276
277		let action = EchoAction {
278			action_id:uuid::Uuid::new_v4().to_string(),
279			source:"grove".to_string(),
280			target:"mountain".to_string(),
281			action_type:"event".to_string(),
282			payload,
283			headers,
284			timestamp:chrono::Utc::now().timestamp(),
285			nested_actions:vec![],
286		};
287
288		self.SendEchoAction(action).await?;
289		Ok(())
290	}
291
292	pub fn IsEchoAvailable(&self) -> bool { self.echo_client.is_some() }
293}
294
295#[cfg(test)]
296mod tests {
297	use super::*;
298
299	#[test]
300	fn test_connection_state() {
301		let state = ConnectionState::Connected;
302		assert_eq!(state, ConnectionState::Connected);
303	}
304
305	#[test]
306	fn test_heartbeat_config_default() {
307		let config = HeartbeatConfig::default();
308		assert_eq!(config.interval_seconds, 30);
309		assert!(config.enabled);
310	}
311
312	#[tokio::test]
313	async fn test_spine_connection_creation() {
314		let config = ProtocolConfig { mountain_endpoint:"http://127.0.0.1:50051".to_string(), ..Default::default() };
315		let connection = SpineConnectionImpl::new(config);
316		assert_eq!(connection.GetState().await, ConnectionState::Disconnected);
317	}
318}