product_os_connector/
lib.rs#![no_std]
extern crate no_std_compat as std;
use std::prelude::v1::*;
#[cfg(feature = "definition")]
mod definition;
#[cfg(all(feature = "connectors", feature = "definition"))]
mod authentication;
#[cfg(all(feature = "connectors", feature = "definition"))]
mod rest;
#[cfg(all(feature = "connectors", feature = "definition"))]
mod connector;
#[cfg(all(feature = "connectors", feature = "definition"))]
mod ws;
#[cfg(all(feature = "connectors", feature = "definition"))]
mod graphql;
#[cfg(all(feature = "connectors", feature = "definition"))]
mod interface;
#[cfg(feature = "connectors")]
use std::collections::BTreeMap;
#[cfg(feature = "connectors")]
use std::sync::Arc;
#[cfg(feature = "connectors")]
use std::time::Duration;
#[cfg(feature = "connectors")]
use product_os_capabilities::{Feature, RegistryFeature};
use serde::{Deserialize, Serialize};
#[cfg(feature = "definition")]
pub use crate::definition::Definition;
#[cfg(feature = "connectors")]
use async_trait::async_trait;
#[cfg(feature = "connectors")]
use parking_lot::Mutex;
#[cfg(feature = "connectors")]
use product_os_router::{Body, IntoResponse, Request, Response, StatusCode};
#[cfg(all(feature = "connectors", feature = "definition"))]
use crate::graphql::GraphQL;
#[cfg(all(feature = "connectors", feature = "definition"))]
use crate::interface::Interface;
#[cfg(all(feature = "connectors", feature = "definition"))]
use crate::rest::Rest;
#[cfg(all(feature = "connectors", feature = "definition"))]
use crate::ws::WebSocket;
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ConnectorKind {
Rest,
GraphQL,
WebSocket,
}
#[cfg(all(feature = "connectors", feature = "definition"))]
pub struct ProductOSConnectors {
interfaces: BTreeMap<String, Arc<Mutex<dyn Interface>>>
}
#[cfg(all(feature = "connectors", feature = "definition"))]
impl ProductOSConnectors {
pub fn new(predefined_definitions: BTreeMap<String, Definition>, ) -> Self {
let mut interfaces: BTreeMap<String, Arc<Mutex<dyn Interface>>> = BTreeMap::new();
tracing::debug!("Definitions: {:?}", predefined_definitions);
for (_, definition) in predefined_definitions.iter() {
match definition.kind {
ConnectorKind::Rest => {
let rest = Rest::new(definition);
interfaces.insert(definition.info.identifier.to_owned(), Arc::new(Mutex::new(rest)));
}
ConnectorKind::GraphQL => {
let graph_ql = GraphQL::new(definition);
interfaces.insert(definition.info.identifier.to_owned(), Arc::new(Mutex::new(graph_ql)));
}
ConnectorKind::WebSocket => {
let web_socket = WebSocket::new(definition);
interfaces.insert(definition.info.identifier.to_owned(), Arc::new(Mutex::new(web_socket)));
}
}
}
let interfaces_to_register = Arc::new(interfaces.to_owned());
for (_, interface) in &interfaces {
match interface.try_lock_for(Duration::from_secs(10)) {
None => {}
Some(mut interface) => {
interface.register_interfaces(Some(interfaces_to_register.clone()))
}
}
}
Self {
interfaces,
}
}
pub async fn setup_handlers(&self, router: &mut product_os_router::ProductOSRouter) {
for (_, interface) in self.interfaces.iter() {
match interface.try_lock_for(Duration::from_secs(10)) {
None => {}
Some(mut interface) => {
interface.register(router).await;
}
}
}
}
}
#[cfg(all(feature = "connectors", feature = "definition"))]
#[async_trait]
impl Feature for ProductOSConnectors {
fn identifier(&self) -> String {
"Connectors".to_string()
}
async fn register(&self, feature: Arc<dyn Feature>, base_path: String, router: &mut product_os_router::ProductOSRouter) -> RegistryFeature {
let shared_base_path = base_path.clone();
self.setup_handlers(router).await;
let mut path = shared_base_path;
path.push_str("/*sub_path");
RegistryFeature {
identifier: "Connectors".to_string(),
paths: vec!(path),
feature: Some(feature),
feature_mut: None
}
}
async fn register_mut(&self, feature: Arc<Mutex<dyn Feature>>, base_path: String, router: &mut product_os_router::ProductOSRouter) -> RegistryFeature {
panic!("Mutable connector server not allowed to be registered")
}
async fn request(&self, _: Request<Body>, _: String) -> Response<Body> {
Response::builder()
.status(StatusCode::NOT_IMPLEMENTED)
.body(Body::from("{}"))
.unwrap().into_response()
}
async fn request_mut(&mut self, request: Request<Body>, version: String) -> Response<Body> {
self.request(request, version).await
}
}