use crate::auth::AuthService;
use crate::channel::Channel;
use crate::error::Result;
use crate::rpc::pb::v3electionpb::election_client::ElectionClient as PbElectionClient;
use crate::rpc::pb::v3electionpb::{
CampaignRequest as PbCampaignRequest, CampaignResponse as PbCampaignResponse,
LeaderKey as PbLeaderKey, LeaderRequest as PbLeaderRequest, LeaderResponse as PbLeaderResponse,
ProclaimRequest as PbProclaimRequest, ProclaimResponse as PbProclaimResponse,
ResignRequest as PbResignRequest, ResignResponse as PbResignResponse,
};
use crate::rpc::{KeyValue, ResponseHeader};
use http::HeaderValue;
use std::sync::RwLock;
use std::task::{Context, Poll};
use std::{pin::Pin, sync::Arc};
use tokio_stream::Stream;
use tonic::{IntoRequest, Request, Streaming};
#[repr(transparent)]
#[derive(Clone)]
pub struct ElectionClient {
inner: PbElectionClient<AuthService<Channel>>,
}
#[derive(Debug, Default, Clone)]
#[repr(transparent)]
pub struct CampaignOptions(PbCampaignRequest);
impl CampaignOptions {
#[inline]
pub const fn new() -> Self {
Self(PbCampaignRequest {
name: Vec::new(),
lease: 0,
value: Vec::new(),
})
}
#[inline]
fn with_name(mut self, name: impl Into<Vec<u8>>) -> Self {
self.0.name = name.into();
self
}
#[inline]
const fn with_lease(mut self, lease: i64) -> Self {
self.0.lease = lease;
self
}
#[inline]
fn with_value(mut self, value: impl Into<Vec<u8>>) -> Self {
self.0.value = value.into();
self
}
}
impl From<CampaignOptions> for PbCampaignRequest {
#[inline]
fn from(options: CampaignOptions) -> Self {
options.0
}
}
impl IntoRequest<PbCampaignRequest> for CampaignOptions {
#[inline]
fn into_request(self) -> Request<PbCampaignRequest> {
Request::new(self.into())
}
}
#[derive(Debug, Default, Clone)]
#[repr(transparent)]
pub struct ProclaimOptions(PbProclaimRequest);
impl ProclaimOptions {
#[inline]
pub const fn new() -> Self {
Self(PbProclaimRequest {
leader: None,
value: Vec::new(),
})
}
#[inline]
fn with_value(mut self, value: impl Into<Vec<u8>>) -> Self {
self.0.value = value.into();
self
}
#[inline]
pub fn with_leader(mut self, leader: LeaderKey) -> Self {
self.0.leader = Some(leader.into());
self
}
}
impl From<ProclaimOptions> for PbProclaimRequest {
#[inline]
fn from(options: ProclaimOptions) -> Self {
options.0
}
}
impl IntoRequest<PbProclaimRequest> for ProclaimOptions {
#[inline]
fn into_request(self) -> Request<PbProclaimRequest> {
Request::new(self.into())
}
}
#[derive(Debug, Default, Clone)]
#[repr(transparent)]
pub struct LeaderOptions(PbLeaderRequest);
impl LeaderOptions {
#[inline]
pub const fn new() -> Self {
Self(PbLeaderRequest { name: Vec::new() })
}
#[inline]
pub fn with_name(mut self, name: impl Into<Vec<u8>>) -> Self {
self.0.name = name.into();
self
}
}
impl From<LeaderOptions> for PbLeaderRequest {
#[inline]
fn from(options: LeaderOptions) -> Self {
options.0
}
}
impl IntoRequest<PbLeaderRequest> for LeaderOptions {
#[inline]
fn into_request(self) -> Request<PbLeaderRequest> {
Request::new(self.into())
}
}
#[derive(Debug, Default, Clone)]
#[repr(transparent)]
pub struct ResignOptions(PbResignRequest);
impl ResignOptions {
#[inline]
pub const fn new() -> Self {
Self(PbResignRequest { leader: None })
}
#[inline]
pub fn with_leader(mut self, leader: LeaderKey) -> Self {
self.0.leader = Some(leader.into());
self
}
}
impl From<ResignOptions> for PbResignRequest {
#[inline]
fn from(options: ResignOptions) -> Self {
options.0
}
}
impl IntoRequest<PbResignRequest> for ResignOptions {
#[inline]
fn into_request(self) -> Request<PbResignRequest> {
Request::new(self.into())
}
}
#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct CampaignResponse(PbCampaignResponse);
impl CampaignResponse {
#[inline]
const fn new(resp: PbCampaignResponse) -> Self {
Self(resp)
}
#[inline]
pub fn header(&self) -> Option<&ResponseHeader> {
self.0.header.as_ref().map(From::from)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
#[inline]
pub fn leader(&self) -> Option<&LeaderKey> {
self.0.leader.as_ref().map(From::from)
}
#[inline]
pub fn take_leader(&mut self) -> Option<LeaderKey> {
self.0.leader.take().map(From::from)
}
}
#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct ProclaimResponse(PbProclaimResponse);
impl ProclaimResponse {
#[inline]
const fn new(resp: PbProclaimResponse) -> Self {
Self(resp)
}
#[inline]
pub fn header(&self) -> Option<&ResponseHeader> {
self.0.header.as_ref().map(From::from)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
}
#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct LeaderResponse(PbLeaderResponse);
impl LeaderResponse {
#[inline]
const fn new(resp: PbLeaderResponse) -> Self {
Self(resp)
}
#[inline]
pub fn header(&self) -> Option<&ResponseHeader> {
self.0.header.as_ref().map(From::from)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
#[inline]
pub fn kv(&self) -> Option<&KeyValue> {
self.0.kv.as_ref().map(From::from)
}
#[inline]
pub fn take_kv(&mut self) -> Option<KeyValue> {
self.0.kv.take().map(KeyValue::new)
}
}
#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
#[derive(Debug)]
pub struct ObserveStream {
stream: Streaming<PbLeaderResponse>,
}
impl ObserveStream {
#[inline]
const fn new(stream: Streaming<PbLeaderResponse>) -> Self {
Self { stream }
}
#[inline]
pub async fn message(&mut self) -> Result<Option<LeaderResponse>> {
match self.stream.message().await? {
Some(resp) => Ok(Some(LeaderResponse::new(resp))),
None => Ok(None),
}
}
}
impl Stream for ObserveStream {
type Item = Result<LeaderResponse>;
#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.get_mut().stream)
.poll_next(cx)
.map(|t| match t {
Some(Ok(resp)) => Some(Ok(LeaderResponse::new(resp))),
Some(Err(e)) => Some(Err(From::from(e))),
None => None,
})
}
}
#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct ResignResponse(PbResignResponse);
impl ResignResponse {
#[inline]
const fn new(resp: PbResignResponse) -> Self {
Self(resp)
}
#[inline]
pub fn header(&self) -> Option<&ResponseHeader> {
self.0.header.as_ref().map(From::from)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
}
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct LeaderKey(PbLeaderKey);
impl LeaderKey {
#[inline]
pub const fn new() -> Self {
Self(PbLeaderKey {
name: Vec::new(),
key: Vec::new(),
rev: 0,
lease: 0,
})
}
#[inline]
pub fn with_name(mut self, name: impl Into<Vec<u8>>) -> Self {
self.0.name = name.into();
self
}
#[inline]
pub fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
self.0.key = key.into();
self
}
#[inline]
pub const fn with_rev(mut self, rev: i64) -> Self {
self.0.rev = rev;
self
}
#[inline]
pub const fn with_lease(mut self, lease: i64) -> Self {
self.0.lease = lease;
self
}
#[inline]
pub fn name(&self) -> &[u8] {
&self.0.name
}
#[inline]
pub fn name_str(&self) -> Result<&str> {
std::str::from_utf8(self.name()).map_err(From::from)
}
#[inline]
pub unsafe fn name_str_unchecked(&self) -> &str {
std::str::from_utf8_unchecked(self.name())
}
#[inline]
pub fn key(&self) -> &[u8] {
&self.0.key
}
#[inline]
pub fn key_str(&self) -> Result<&str> {
std::str::from_utf8(self.key()).map_err(From::from)
}
#[inline]
pub unsafe fn key_str_unchecked(&self) -> &str {
std::str::from_utf8_unchecked(self.key())
}
#[inline]
pub const fn rev(&self) -> i64 {
self.0.rev
}
#[inline]
pub const fn lease(&self) -> i64 {
self.0.lease
}
}
impl Default for LeaderKey {
#[inline]
fn default() -> Self {
Self::new()
}
}
impl From<LeaderKey> for PbLeaderKey {
#[inline]
fn from(leader_key: LeaderKey) -> Self {
leader_key.0
}
}
impl From<PbLeaderKey> for LeaderKey {
#[inline]
fn from(key: PbLeaderKey) -> Self {
Self(key)
}
}
impl From<&PbLeaderKey> for &LeaderKey {
#[inline]
fn from(src: &PbLeaderKey) -> Self {
unsafe { &*(src as *const _ as *const LeaderKey) }
}
}
impl ElectionClient {
#[inline]
pub(crate) fn new(channel: Channel, auth_token: Arc<RwLock<Option<HeaderValue>>>) -> Self {
let inner = PbElectionClient::new(AuthService::new(channel, auth_token));
Self { inner }
}
#[inline]
pub async fn campaign(
&mut self,
name: impl Into<Vec<u8>>,
value: impl Into<Vec<u8>>,
lease: i64,
) -> Result<CampaignResponse> {
let resp = self
.inner
.campaign(
CampaignOptions::new()
.with_name(name)
.with_value(value)
.with_lease(lease),
)
.await?
.into_inner();
Ok(CampaignResponse::new(resp))
}
#[inline]
pub async fn proclaim(
&mut self,
value: impl Into<Vec<u8>>,
options: Option<ProclaimOptions>,
) -> Result<ProclaimResponse> {
let resp = self
.inner
.proclaim(options.unwrap_or_default().with_value(value))
.await?
.into_inner();
Ok(ProclaimResponse::new(resp))
}
#[inline]
pub async fn leader(&mut self, name: impl Into<Vec<u8>>) -> Result<LeaderResponse> {
let resp = self
.inner
.leader(LeaderOptions::new().with_name(name))
.await?
.into_inner();
Ok(LeaderResponse::new(resp))
}
#[inline]
pub async fn observe(&mut self, name: impl Into<Vec<u8>>) -> Result<ObserveStream> {
let resp = self
.inner
.observe(LeaderOptions::new().with_name(name))
.await?
.into_inner();
Ok(ObserveStream::new(resp))
}
#[inline]
pub async fn resign(&mut self, option: Option<ResignOptions>) -> Result<ResignResponse> {
let resp = self
.inner
.resign(option.unwrap_or_default())
.await?
.into_inner();
Ok(ResignResponse::new(resp))
}
}