#[derive(Debug)]
pub enum WorkerError {
RequestFailure(reqwest::Error),
InvalidHeaderValue(reqwest::header::InvalidHeaderValue),
UnexpectedStatus(reqwest::StatusCode),
Status504,
Status404 {
error: String,
},
Status403 {
error: String,
},
Status400 {
errors: Vec<String>,
},
Status500 {
golem_error: crate::model::GolemError,
},
Status401 {
error: String,
},
Status409 {
error: String,
},
}
impl From<reqwest::Error> for WorkerError {
fn from(error: reqwest::Error) -> WorkerError {
WorkerError::RequestFailure(error)
}
}
impl From<reqwest::header::InvalidHeaderValue> for WorkerError {
fn from(error: reqwest::header::InvalidHeaderValue) -> WorkerError {
WorkerError::InvalidHeaderValue(error)
}
}
impl WorkerError {
pub fn to_worker_endpoint_error(&self) -> Option<crate::model::WorkerEndpointError> {
match self {
WorkerError::Status500 { golem_error } => Some(crate::model::WorkerEndpointError::Golem { golem_error: golem_error.clone() }),
WorkerError::Status404 { error } => Some(crate::model::WorkerEndpointError::NotFound { error: error.clone() }),
WorkerError::Status504 => Some(crate::model::WorkerEndpointError::GatewayTimeout {}),
WorkerError::Status403 { error } => Some(crate::model::WorkerEndpointError::LimitExceeded { error: error.clone() }),
WorkerError::Status409 { error } => Some(crate::model::WorkerEndpointError::AlreadyExists { error: error.clone() }),
WorkerError::Status401 { error } => Some(crate::model::WorkerEndpointError::Unauthorized { error: error.clone() }),
WorkerError::Status400 { errors } => Some(crate::model::WorkerEndpointError::BadRequest { errors: errors.clone() }),
_ => None
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct WorkerEndpointErrorNotFoundPayload {
pub error: String,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct WorkerEndpointErrorLimitExceededPayload {
pub error: String,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct WorkerEndpointErrorBadRequestPayload {
pub errors: Vec<String>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct WorkerEndpointErrorGolemPayload {
#[serde(rename = "golemError")]
pub golem_error: crate::model::GolemError,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct WorkerEndpointErrorUnauthorizedPayload {
pub error: String,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct WorkerEndpointErrorAlreadyExistsPayload {
pub error: String,
}
#[async_trait::async_trait]
pub trait Worker {
async fn get_worker_by_id(&self, worker_id: &str, authorization: &str) -> Result<crate::model::VersionedWorkerId, WorkerError>;
async fn launch_new_worker(&self, template_id: &str, field0: crate::model::WorkerCreationRequest, authorization: &str) -> Result<crate::model::VersionedWorkerId, WorkerError>;
async fn delete_worker(&self, template_id: &str, worker_name: &str, authorization: &str) -> Result<(), WorkerError>;
async fn get_invocation_key(&self, template_id: &str, worker_name: &str, authorization: &str) -> Result<crate::model::InvocationKey, WorkerError>;
async fn invoke_and_await_function(&self, template_id: &str, worker_name: &str, invocation_key: &str, function: &str, calling_convention: Option<&str>, field0: crate::model::InvokeParameters, authorization: &str) -> Result<crate::model::InvokeResult, WorkerError>;
async fn invoke_function(&self, template_id: &str, worker_name: &str, function: &str, field0: crate::model::InvokeParameters, authorization: &str) -> Result<(), WorkerError>;
async fn complete_promise(&self, template_id: &str, worker_name: &str, field0: crate::model::CompleteParameters, authorization: &str) -> Result<bool, WorkerError>;
async fn interrupt_worker(&self, template_id: &str, worker_name: &str, recover_immediately: Option<bool>, authorization: &str) -> Result<(), WorkerError>;
async fn get_worker_metadata(&self, template_id: &str, worker_name: &str, authorization: &str) -> Result<crate::model::WorkerMetadata, WorkerError>;
}
#[derive(Clone, Debug)]
pub struct WorkerLive {
pub base_url: reqwest::Url,
pub allow_insecure: bool,
}
#[async_trait::async_trait]
impl Worker for WorkerLive {
async fn get_worker_by_id(&self, worker_id: &str, authorization: &str) -> Result<crate::model::VersionedWorkerId, WorkerError> {
let mut url = self.base_url.clone();
url.path_segments_mut().unwrap()
.push("v1")
.push("templates")
.push("workers")
.push(worker_id);
let mut headers = reqwest::header::HeaderMap::new();
headers.append("authorization", reqwest::header::HeaderValue::from_str(&format!("{authorization}"))?);
{
let headers_vec: Vec<(&str, String)> = headers.iter().map(|(k, v)| crate::hide_authorization(k, v)).collect();
tracing::info!(method="get", url=url.to_string(), headers=?headers_vec, body="<no_body>", "get_worker_by_id");
}
let mut builder = reqwest::Client::builder();
if self.allow_insecure {
builder = builder.danger_accept_invalid_certs(true);
}
let client = builder.build()?;
let result = client
.get(url)
.headers(headers)
.send()
.await?;
match result.status().as_u16() {
200 => {
let body = result.json::<crate::model::VersionedWorkerId>().await?;
Ok(body)
}
504 => Err(WorkerError::Status504),
404 => {
let body = result.json::<WorkerEndpointErrorNotFoundPayload>().await?;
Err(WorkerError::Status404 { error: body.error })
}
403 => {
let body = result.json::<WorkerEndpointErrorLimitExceededPayload>().await?;
Err(WorkerError::Status403 { error: body.error })
}
400 => {
let body = result.json::<WorkerEndpointErrorBadRequestPayload>().await?;
Err(WorkerError::Status400 { errors: body.errors })
}
500 => {
let body = result.json::<WorkerEndpointErrorGolemPayload>().await?;
Err(WorkerError::Status500 { golem_error: body.golem_error })
}
401 => {
let body = result.json::<WorkerEndpointErrorUnauthorizedPayload>().await?;
Err(WorkerError::Status401 { error: body.error })
}
409 => {
let body = result.json::<WorkerEndpointErrorAlreadyExistsPayload>().await?;
Err(WorkerError::Status409 { error: body.error })
}
_ => Err(WorkerError::UnexpectedStatus(result.status()))
}
}
async fn launch_new_worker(&self, template_id: &str, field0: crate::model::WorkerCreationRequest, authorization: &str) -> Result<crate::model::VersionedWorkerId, WorkerError> {
let mut url = self.base_url.clone();
url.path_segments_mut().unwrap()
.push("v1")
.push("templates")
.push(template_id)
.push("workers");
let mut headers = reqwest::header::HeaderMap::new();
headers.append("authorization", reqwest::header::HeaderValue::from_str(&format!("{authorization}"))?);
{
let headers_vec: Vec<(&str, String)> = headers.iter().map(|(k, v)| crate::hide_authorization(k, v)).collect();
tracing::info!(method="post", url=url.to_string(), headers=?headers_vec, body=serde_json::to_string(&field0).unwrap(), "launch_new_worker");
}
let mut builder = reqwest::Client::builder();
if self.allow_insecure {
builder = builder.danger_accept_invalid_certs(true);
}
let client = builder.build()?;
let result = client
.post(url)
.headers(headers)
.json(&field0)
.send()
.await?;
match result.status().as_u16() {
200 => {
let body = result.json::<crate::model::VersionedWorkerId>().await?;
Ok(body)
}
504 => Err(WorkerError::Status504),
404 => {
let body = result.json::<WorkerEndpointErrorNotFoundPayload>().await?;
Err(WorkerError::Status404 { error: body.error })
}
403 => {
let body = result.json::<WorkerEndpointErrorLimitExceededPayload>().await?;
Err(WorkerError::Status403 { error: body.error })
}
400 => {
let body = result.json::<WorkerEndpointErrorBadRequestPayload>().await?;
Err(WorkerError::Status400 { errors: body.errors })
}
500 => {
let body = result.json::<WorkerEndpointErrorGolemPayload>().await?;
Err(WorkerError::Status500 { golem_error: body.golem_error })
}
401 => {
let body = result.json::<WorkerEndpointErrorUnauthorizedPayload>().await?;
Err(WorkerError::Status401 { error: body.error })
}
409 => {
let body = result.json::<WorkerEndpointErrorAlreadyExistsPayload>().await?;
Err(WorkerError::Status409 { error: body.error })
}
_ => Err(WorkerError::UnexpectedStatus(result.status()))
}
}
async fn delete_worker(&self, template_id: &str, worker_name: &str, authorization: &str) -> Result<(), WorkerError> {
let mut url = self.base_url.clone();
url.path_segments_mut().unwrap()
.push("v1")
.push("templates")
.push(template_id)
.push("workers")
.push(worker_name);
let mut headers = reqwest::header::HeaderMap::new();
headers.append("authorization", reqwest::header::HeaderValue::from_str(&format!("{authorization}"))?);
{
let headers_vec: Vec<(&str, String)> = headers.iter().map(|(k, v)| crate::hide_authorization(k, v)).collect();
tracing::info!(method="delete", url=url.to_string(), headers=?headers_vec, body="<no_body>", "delete_worker");
}
let mut builder = reqwest::Client::builder();
if self.allow_insecure {
builder = builder.danger_accept_invalid_certs(true);
}
let client = builder.build()?;
let result = client
.delete(url)
.headers(headers)
.send()
.await?;
match result.status().as_u16() {
200 => {
let body = ();
Ok(body)
}
504 => Err(WorkerError::Status504),
404 => {
let body = result.json::<WorkerEndpointErrorNotFoundPayload>().await?;
Err(WorkerError::Status404 { error: body.error })
}
403 => {
let body = result.json::<WorkerEndpointErrorLimitExceededPayload>().await?;
Err(WorkerError::Status403 { error: body.error })
}
400 => {
let body = result.json::<WorkerEndpointErrorBadRequestPayload>().await?;
Err(WorkerError::Status400 { errors: body.errors })
}
500 => {
let body = result.json::<WorkerEndpointErrorGolemPayload>().await?;
Err(WorkerError::Status500 { golem_error: body.golem_error })
}
401 => {
let body = result.json::<WorkerEndpointErrorUnauthorizedPayload>().await?;
Err(WorkerError::Status401 { error: body.error })
}
409 => {
let body = result.json::<WorkerEndpointErrorAlreadyExistsPayload>().await?;
Err(WorkerError::Status409 { error: body.error })
}
_ => Err(WorkerError::UnexpectedStatus(result.status()))
}
}
async fn get_invocation_key(&self, template_id: &str, worker_name: &str, authorization: &str) -> Result<crate::model::InvocationKey, WorkerError> {
let mut url = self.base_url.clone();
url.path_segments_mut().unwrap()
.push("v1")
.push("templates")
.push(template_id)
.push("workers")
.push(worker_name)
.push("key");
let mut headers = reqwest::header::HeaderMap::new();
headers.append("authorization", reqwest::header::HeaderValue::from_str(&format!("{authorization}"))?);
{
let headers_vec: Vec<(&str, String)> = headers.iter().map(|(k, v)| crate::hide_authorization(k, v)).collect();
tracing::info!(method="post", url=url.to_string(), headers=?headers_vec, body="<no_body>", "get_invocation_key");
}
let mut builder = reqwest::Client::builder();
if self.allow_insecure {
builder = builder.danger_accept_invalid_certs(true);
}
let client = builder.build()?;
let result = client
.post(url)
.headers(headers)
.send()
.await?;
match result.status().as_u16() {
200 => {
let body = result.json::<crate::model::InvocationKey>().await?;
Ok(body)
}
504 => Err(WorkerError::Status504),
404 => {
let body = result.json::<WorkerEndpointErrorNotFoundPayload>().await?;
Err(WorkerError::Status404 { error: body.error })
}
403 => {
let body = result.json::<WorkerEndpointErrorLimitExceededPayload>().await?;
Err(WorkerError::Status403 { error: body.error })
}
400 => {
let body = result.json::<WorkerEndpointErrorBadRequestPayload>().await?;
Err(WorkerError::Status400 { errors: body.errors })
}
500 => {
let body = result.json::<WorkerEndpointErrorGolemPayload>().await?;
Err(WorkerError::Status500 { golem_error: body.golem_error })
}
401 => {
let body = result.json::<WorkerEndpointErrorUnauthorizedPayload>().await?;
Err(WorkerError::Status401 { error: body.error })
}
409 => {
let body = result.json::<WorkerEndpointErrorAlreadyExistsPayload>().await?;
Err(WorkerError::Status409 { error: body.error })
}
_ => Err(WorkerError::UnexpectedStatus(result.status()))
}
}
async fn invoke_and_await_function(&self, template_id: &str, worker_name: &str, invocation_key: &str, function: &str, calling_convention: Option<&str>, field0: crate::model::InvokeParameters, authorization: &str) -> Result<crate::model::InvokeResult, WorkerError> {
let mut url = self.base_url.clone();
url.path_segments_mut().unwrap()
.push("v1")
.push("templates")
.push(template_id)
.push("workers")
.push(worker_name)
.push("invoke-and-await");
url.query_pairs_mut().append_pair("invocation-key", &format!("{invocation_key}"));
url.query_pairs_mut().append_pair("function", &format!("{function}"));
if let Some(value) = calling_convention {
url.query_pairs_mut().append_pair("calling-convention", &format!("{value}"));
}
let mut headers = reqwest::header::HeaderMap::new();
headers.append("authorization", reqwest::header::HeaderValue::from_str(&format!("{authorization}"))?);
{
let headers_vec: Vec<(&str, String)> = headers.iter().map(|(k, v)| crate::hide_authorization(k, v)).collect();
tracing::info!(method="post", url=url.to_string(), headers=?headers_vec, body=serde_json::to_string(&field0).unwrap(), "invoke_and_await_function");
}
let mut builder = reqwest::Client::builder();
if self.allow_insecure {
builder = builder.danger_accept_invalid_certs(true);
}
let client = builder.build()?;
let result = client
.post(url)
.headers(headers)
.json(&field0)
.send()
.await?;
match result.status().as_u16() {
200 => {
let body = result.json::<crate::model::InvokeResult>().await?;
Ok(body)
}
504 => Err(WorkerError::Status504),
404 => {
let body = result.json::<WorkerEndpointErrorNotFoundPayload>().await?;
Err(WorkerError::Status404 { error: body.error })
}
403 => {
let body = result.json::<WorkerEndpointErrorLimitExceededPayload>().await?;
Err(WorkerError::Status403 { error: body.error })
}
400 => {
let body = result.json::<WorkerEndpointErrorBadRequestPayload>().await?;
Err(WorkerError::Status400 { errors: body.errors })
}
500 => {
let body = result.json::<WorkerEndpointErrorGolemPayload>().await?;
Err(WorkerError::Status500 { golem_error: body.golem_error })
}
401 => {
let body = result.json::<WorkerEndpointErrorUnauthorizedPayload>().await?;
Err(WorkerError::Status401 { error: body.error })
}
409 => {
let body = result.json::<WorkerEndpointErrorAlreadyExistsPayload>().await?;
Err(WorkerError::Status409 { error: body.error })
}
_ => Err(WorkerError::UnexpectedStatus(result.status()))
}
}
async fn invoke_function(&self, template_id: &str, worker_name: &str, function: &str, field0: crate::model::InvokeParameters, authorization: &str) -> Result<(), WorkerError> {
let mut url = self.base_url.clone();
url.path_segments_mut().unwrap()
.push("v1")
.push("templates")
.push(template_id)
.push("workers")
.push(worker_name)
.push("invoke");
url.query_pairs_mut().append_pair("function", &format!("{function}"));
let mut headers = reqwest::header::HeaderMap::new();
headers.append("authorization", reqwest::header::HeaderValue::from_str(&format!("{authorization}"))?);
{
let headers_vec: Vec<(&str, String)> = headers.iter().map(|(k, v)| crate::hide_authorization(k, v)).collect();
tracing::info!(method="post", url=url.to_string(), headers=?headers_vec, body=serde_json::to_string(&field0).unwrap(), "invoke_function");
}
let mut builder = reqwest::Client::builder();
if self.allow_insecure {
builder = builder.danger_accept_invalid_certs(true);
}
let client = builder.build()?;
let result = client
.post(url)
.headers(headers)
.json(&field0)
.send()
.await?;
match result.status().as_u16() {
200 => {
let body = ();
Ok(body)
}
504 => Err(WorkerError::Status504),
404 => {
let body = result.json::<WorkerEndpointErrorNotFoundPayload>().await?;
Err(WorkerError::Status404 { error: body.error })
}
403 => {
let body = result.json::<WorkerEndpointErrorLimitExceededPayload>().await?;
Err(WorkerError::Status403 { error: body.error })
}
400 => {
let body = result.json::<WorkerEndpointErrorBadRequestPayload>().await?;
Err(WorkerError::Status400 { errors: body.errors })
}
500 => {
let body = result.json::<WorkerEndpointErrorGolemPayload>().await?;
Err(WorkerError::Status500 { golem_error: body.golem_error })
}
401 => {
let body = result.json::<WorkerEndpointErrorUnauthorizedPayload>().await?;
Err(WorkerError::Status401 { error: body.error })
}
409 => {
let body = result.json::<WorkerEndpointErrorAlreadyExistsPayload>().await?;
Err(WorkerError::Status409 { error: body.error })
}
_ => Err(WorkerError::UnexpectedStatus(result.status()))
}
}
async fn complete_promise(&self, template_id: &str, worker_name: &str, field0: crate::model::CompleteParameters, authorization: &str) -> Result<bool, WorkerError> {
let mut url = self.base_url.clone();
url.path_segments_mut().unwrap()
.push("v1")
.push("templates")
.push(template_id)
.push("workers")
.push(worker_name)
.push("complete");
let mut headers = reqwest::header::HeaderMap::new();
headers.append("authorization", reqwest::header::HeaderValue::from_str(&format!("{authorization}"))?);
{
let headers_vec: Vec<(&str, String)> = headers.iter().map(|(k, v)| crate::hide_authorization(k, v)).collect();
tracing::info!(method="post", url=url.to_string(), headers=?headers_vec, body=serde_json::to_string(&field0).unwrap(), "complete_promise");
}
let mut builder = reqwest::Client::builder();
if self.allow_insecure {
builder = builder.danger_accept_invalid_certs(true);
}
let client = builder.build()?;
let result = client
.post(url)
.headers(headers)
.json(&field0)
.send()
.await?;
match result.status().as_u16() {
200 => {
let body = result.json::<bool>().await?;
Ok(body)
}
504 => Err(WorkerError::Status504),
404 => {
let body = result.json::<WorkerEndpointErrorNotFoundPayload>().await?;
Err(WorkerError::Status404 { error: body.error })
}
403 => {
let body = result.json::<WorkerEndpointErrorLimitExceededPayload>().await?;
Err(WorkerError::Status403 { error: body.error })
}
400 => {
let body = result.json::<WorkerEndpointErrorBadRequestPayload>().await?;
Err(WorkerError::Status400 { errors: body.errors })
}
500 => {
let body = result.json::<WorkerEndpointErrorGolemPayload>().await?;
Err(WorkerError::Status500 { golem_error: body.golem_error })
}
401 => {
let body = result.json::<WorkerEndpointErrorUnauthorizedPayload>().await?;
Err(WorkerError::Status401 { error: body.error })
}
409 => {
let body = result.json::<WorkerEndpointErrorAlreadyExistsPayload>().await?;
Err(WorkerError::Status409 { error: body.error })
}
_ => Err(WorkerError::UnexpectedStatus(result.status()))
}
}
async fn interrupt_worker(&self, template_id: &str, worker_name: &str, recover_immediately: Option<bool>, authorization: &str) -> Result<(), WorkerError> {
let mut url = self.base_url.clone();
url.path_segments_mut().unwrap()
.push("v1")
.push("templates")
.push(template_id)
.push("workers")
.push(worker_name)
.push("interrupt");
if let Some(value) = recover_immediately {
url.query_pairs_mut().append_pair("recover-immediately", &format!("{value}"));
}
let mut headers = reqwest::header::HeaderMap::new();
headers.append("authorization", reqwest::header::HeaderValue::from_str(&format!("{authorization}"))?);
{
let headers_vec: Vec<(&str, String)> = headers.iter().map(|(k, v)| crate::hide_authorization(k, v)).collect();
tracing::info!(method="post", url=url.to_string(), headers=?headers_vec, body="<no_body>", "interrupt_worker");
}
let mut builder = reqwest::Client::builder();
if self.allow_insecure {
builder = builder.danger_accept_invalid_certs(true);
}
let client = builder.build()?;
let result = client
.post(url)
.headers(headers)
.send()
.await?;
match result.status().as_u16() {
200 => {
let body = ();
Ok(body)
}
504 => Err(WorkerError::Status504),
404 => {
let body = result.json::<WorkerEndpointErrorNotFoundPayload>().await?;
Err(WorkerError::Status404 { error: body.error })
}
403 => {
let body = result.json::<WorkerEndpointErrorLimitExceededPayload>().await?;
Err(WorkerError::Status403 { error: body.error })
}
400 => {
let body = result.json::<WorkerEndpointErrorBadRequestPayload>().await?;
Err(WorkerError::Status400 { errors: body.errors })
}
500 => {
let body = result.json::<WorkerEndpointErrorGolemPayload>().await?;
Err(WorkerError::Status500 { golem_error: body.golem_error })
}
401 => {
let body = result.json::<WorkerEndpointErrorUnauthorizedPayload>().await?;
Err(WorkerError::Status401 { error: body.error })
}
409 => {
let body = result.json::<WorkerEndpointErrorAlreadyExistsPayload>().await?;
Err(WorkerError::Status409 { error: body.error })
}
_ => Err(WorkerError::UnexpectedStatus(result.status()))
}
}
async fn get_worker_metadata(&self, template_id: &str, worker_name: &str, authorization: &str) -> Result<crate::model::WorkerMetadata, WorkerError> {
let mut url = self.base_url.clone();
url.path_segments_mut().unwrap()
.push("v1")
.push("templates")
.push(template_id)
.push("workers")
.push(worker_name);
let mut headers = reqwest::header::HeaderMap::new();
headers.append("authorization", reqwest::header::HeaderValue::from_str(&format!("{authorization}"))?);
{
let headers_vec: Vec<(&str, String)> = headers.iter().map(|(k, v)| crate::hide_authorization(k, v)).collect();
tracing::info!(method="get", url=url.to_string(), headers=?headers_vec, body="<no_body>", "get_worker_metadata");
}
let mut builder = reqwest::Client::builder();
if self.allow_insecure {
builder = builder.danger_accept_invalid_certs(true);
}
let client = builder.build()?;
let result = client
.get(url)
.headers(headers)
.send()
.await?;
match result.status().as_u16() {
200 => {
let body = result.json::<crate::model::WorkerMetadata>().await?;
Ok(body)
}
504 => Err(WorkerError::Status504),
404 => {
let body = result.json::<WorkerEndpointErrorNotFoundPayload>().await?;
Err(WorkerError::Status404 { error: body.error })
}
403 => {
let body = result.json::<WorkerEndpointErrorLimitExceededPayload>().await?;
Err(WorkerError::Status403 { error: body.error })
}
400 => {
let body = result.json::<WorkerEndpointErrorBadRequestPayload>().await?;
Err(WorkerError::Status400 { errors: body.errors })
}
500 => {
let body = result.json::<WorkerEndpointErrorGolemPayload>().await?;
Err(WorkerError::Status500 { golem_error: body.golem_error })
}
401 => {
let body = result.json::<WorkerEndpointErrorUnauthorizedPayload>().await?;
Err(WorkerError::Status401 { error: body.error })
}
409 => {
let body = result.json::<WorkerEndpointErrorAlreadyExistsPayload>().await?;
Err(WorkerError::Status409 { error: body.error })
}
_ => Err(WorkerError::UnexpectedStatus(result.status()))
}
}
}