zino_auth/
user_session.rsuse super::{AccessKeyId, SessionId};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use zino_core::{
application::{Agent, Application},
crypto::Digest,
};
#[cfg(feature = "jwt")]
use super::JwtClaims;
#[cfg(feature = "jwt")]
use zino_core::{error::Error, extension::JsonObjectExt, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserSession<U, R = String, T = U> {
user_id: U,
session_id: Option<SessionId>,
access_key_id: Option<AccessKeyId>,
roles: Vec<R>,
tenant_id: Option<T>,
}
impl<U, R, T> UserSession<U, R, T> {
#[inline]
pub fn new(user_id: U, session_id: impl Into<Option<SessionId>>) -> Self {
Self {
user_id,
session_id: session_id.into(),
access_key_id: None,
roles: Vec::new(),
tenant_id: None,
}
}
#[inline]
pub fn set_session_id(&mut self, session_id: SessionId) {
self.session_id = Some(session_id);
}
#[inline]
pub fn set_access_key_id(&mut self, access_key_id: AccessKeyId) {
if self.session_id.is_none() {
let session_id = SessionId::new::<Digest>(Agent::domain(), access_key_id.as_ref());
self.session_id = Some(session_id);
}
self.access_key_id = Some(access_key_id);
}
#[inline]
pub fn set_roles(&mut self, roles: impl Into<Vec<R>>) {
self.roles = roles.into();
}
#[inline]
pub fn set_tenant_id(&mut self, tenant_id: T) {
self.tenant_id = Some(tenant_id);
}
#[inline]
pub fn user_id(&self) -> &U {
&self.user_id
}
#[inline]
pub fn tenant_id(&self) -> Option<&T> {
self.tenant_id.as_ref()
}
#[inline]
pub fn session_id(&self) -> Option<&SessionId> {
self.session_id.as_ref()
}
#[inline]
pub fn access_key_id(&self) -> Option<&AccessKeyId> {
self.access_key_id.as_ref()
}
#[inline]
pub fn roles(&self) -> &[R] {
&self.roles
}
}
impl<U, R, T> UserSession<U, R, T>
where
U: FromStr,
R: FromStr,
T: FromStr,
<U as FromStr>::Err: std::error::Error + Send + 'static,
{
#[cfg(feature = "jwt")]
pub fn try_from_jwt_claims(claims: JwtClaims) -> Result<Self, Error> {
let data = claims.data();
let user_id = claims
.subject()
.map(|s| s.into())
.or_else(|| data.parse_string("uid"))
.ok_or_else(|| warn!("the subject of a JWT token should be specified"))?
.parse()?;
let mut user_session = Self::new(user_id, None);
if let Some(Ok(roles)) = data
.parse_array("roles")
.or_else(|| data.parse_array("role"))
{
user_session.set_roles(roles);
}
if let Some(tenant_id) = data
.parse_string("tenant_id")
.or_else(|| data.parse_string("tid"))
.and_then(|s| s.parse().ok())
{
user_session.set_tenant_id(tenant_id);
}
Ok(user_session)
}
}
impl<U, T> UserSession<U, String, T> {
#[inline]
pub fn is_superuser(&self) -> bool {
self.roles() == ["superuser"]
}
#[inline]
pub fn is_user(&self) -> bool {
self.roles() == ["user"]
}
#[inline]
pub fn is_guest(&self) -> bool {
self.roles() == ["guest"]
}
#[inline]
pub fn is_agent(&self) -> bool {
self.roles() == ["agent"]
}
pub fn is_admin(&self) -> bool {
let role = "admin";
let role_prefix = format!("{role}:");
for r in &self.roles {
if r == role || r.starts_with(&role_prefix) {
return true;
}
}
false
}
pub fn is_worker(&self) -> bool {
let role = "worker";
let role_prefix = format!("{role}:");
for r in &self.roles {
if r == role || r.starts_with(&role_prefix) {
return true;
}
}
false
}
pub fn is_auditor(&self) -> bool {
let role = "auditor";
let role_prefix = format!("{role}:");
for r in &self.roles {
if r == role || r.starts_with(&role_prefix) {
return true;
}
}
false
}
pub fn has_user_role(&self) -> bool {
self.is_superuser()
|| self.is_user()
|| self.is_admin()
|| self.is_worker()
|| self.is_auditor()
}
pub fn has_admin_role(&self) -> bool {
self.is_superuser() || self.is_admin()
}
pub fn has_worker_role(&self) -> bool {
self.is_superuser() || self.is_worker()
}
pub fn has_auditor_role(&self) -> bool {
self.is_superuser() || self.is_auditor()
}
pub fn has_role(&self, role: &str) -> bool {
let length = role.len();
for r in &self.roles {
if r == role {
return true;
} else {
let remainder = if r.len() > length {
r.strip_prefix(role)
} else {
role.strip_prefix(r.as_str())
};
if remainder.is_some_and(|s| s.starts_with(':')) {
return true;
}
}
}
false
}
pub fn has_any_roles(&self, roles: &[&str]) -> bool {
for role in roles {
if self.has_role(role) {
return true;
}
}
false
}
pub fn has_all_roles(&self, roles: &[&str]) -> bool {
for role in roles {
if !self.has_role(role) {
return false;
}
}
true
}
}