pub use crate::rpc::pb::etcdserverpb::compare::CompareResult as CompareOp;
pub use crate::rpc::pb::etcdserverpb::range_request::{SortOrder, SortTarget};
use crate::auth::AuthService;
use crate::channel::Channel;
use crate::error::Result;
use crate::rpc::pb::etcdserverpb::compare::{CompareTarget, TargetUnion};
use crate::rpc::pb::etcdserverpb::kv_client::KvClient as PbKvClient;
use crate::rpc::pb::etcdserverpb::request_op::Request as PbTxnOp;
use crate::rpc::pb::etcdserverpb::response_op::Response as PbTxnOpResponse;
use crate::rpc::pb::etcdserverpb::{
CompactionRequest as PbCompactionRequest, CompactionRequest,
CompactionResponse as PbCompactionResponse, Compare as PbCompare,
DeleteRangeRequest as PbDeleteRequest, DeleteRangeRequest,
DeleteRangeResponse as PbDeleteResponse, PutRequest as PbPutRequest,
PutResponse as PbPutResponse, RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse,
RequestOp as PbTxnRequestOp, TxnRequest as PbTxnRequest, TxnResponse as PbTxnResponse,
};
use crate::rpc::{get_prefix, KeyRange, KeyValue, ResponseHeader};
use crate::vec::VecExt;
use http::HeaderValue;
use std::mem::ManuallyDrop;
use std::sync::{Arc, RwLock};
use tonic::{IntoRequest, Request};
#[repr(transparent)]
#[derive(Clone)]
pub struct KvClient {
inner: PbKvClient<AuthService<Channel>>,
}
impl KvClient {
#[inline]
pub(crate) fn new(channel: Channel, auth_token: Arc<RwLock<Option<HeaderValue>>>) -> Self {
let inner = PbKvClient::new(AuthService::new(channel, auth_token));
Self { inner }
}
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_decoding_message_size(limit);
self
}
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_encoding_message_size(limit);
self
}
#[inline]
pub async fn put(
&mut self,
key: impl Into<Vec<u8>>,
value: impl Into<Vec<u8>>,
options: Option<PutOptions>,
) -> Result<PutResponse> {
let resp = self
.inner
.put(options.unwrap_or_default().with_kv(key, value))
.await?
.into_inner();
Ok(PutResponse::new(resp))
}
#[inline]
pub async fn get(
&mut self,
key: impl Into<Vec<u8>>,
options: Option<GetOptions>,
) -> Result<GetResponse> {
let resp = self
.inner
.range(options.unwrap_or_default().with_key(key.into()))
.await?
.into_inner();
Ok(GetResponse::new(resp))
}
#[inline]
pub async fn delete(
&mut self,
key: impl Into<Vec<u8>>,
options: Option<DeleteOptions>,
) -> Result<DeleteResponse> {
let resp = self
.inner
.delete_range(options.unwrap_or_default().with_key(key.into()))
.await?
.into_inner();
Ok(DeleteResponse::new(resp))
}
#[inline]
pub async fn compact(
&mut self,
revision: i64,
options: Option<CompactionOptions>,
) -> Result<CompactionResponse> {
let resp = self
.inner
.compact(options.unwrap_or_default().with_revision(revision))
.await?
.into_inner();
Ok(CompactionResponse::new(resp))
}
#[inline]
pub async fn txn(&mut self, txn: Txn) -> Result<TxnResponse> {
let resp = self.inner.txn(txn).await?.into_inner();
Ok(TxnResponse::new(resp))
}
}
#[derive(Debug, Default, Clone)]
#[repr(transparent)]
pub struct PutOptions(PbPutRequest);
impl PutOptions {
#[inline]
fn with_kv(mut self, key: impl Into<Vec<u8>>, value: impl Into<Vec<u8>>) -> Self {
self.0.key = key.into();
self.0.value = value.into();
self
}
#[inline]
pub const fn new() -> Self {
Self(PbPutRequest {
key: Vec::new(),
value: Vec::new(),
lease: 0,
prev_kv: false,
ignore_value: false,
ignore_lease: false,
})
}
#[inline]
pub const fn with_lease(mut self, lease: i64) -> Self {
self.0.lease = lease;
self
}
#[inline]
pub const fn with_prev_key(mut self) -> Self {
self.0.prev_kv = true;
self
}
#[inline]
pub const fn with_ignore_value(mut self) -> Self {
self.0.ignore_value = true;
self
}
#[inline]
pub const fn with_ignore_lease(mut self) -> Self {
self.0.ignore_lease = true;
self
}
}
impl From<PutOptions> for PbPutRequest {
#[inline]
fn from(options: PutOptions) -> Self {
options.0
}
}
impl IntoRequest<PbPutRequest> for PutOptions {
#[inline]
fn into_request(self) -> Request<PbPutRequest> {
Request::new(self.into())
}
}
#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct PutResponse(PbPutResponse);
impl PutResponse {
#[inline]
const fn new(resp: PbPutResponse) -> Self {
Self(resp)
}
#[inline]
pub fn header(&self) -> Option<&ResponseHeader> {
self.0.header.as_ref().map(From::from)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
#[inline]
pub fn prev_key(&self) -> Option<&KeyValue> {
self.0.prev_kv.as_ref().map(From::from)
}
#[inline]
pub fn take_prev_key(&mut self) -> Option<KeyValue> {
self.0.prev_kv.take().map(KeyValue::new)
}
#[inline]
pub(crate) fn strip_prev_key_prefix(&mut self, prefix: &[u8]) {
if let Some(kv) = self.0.prev_kv.as_mut() {
kv.key.strip_key_prefix(prefix);
}
}
}
#[derive(Debug, Default, Clone)]
pub struct GetOptions {
req: PbRangeRequest,
key_range: KeyRange,
}
impl GetOptions {
#[inline]
fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
self.key_range.with_key(key);
self
}
#[inline]
pub const fn new() -> Self {
Self {
req: PbRangeRequest {
key: Vec::new(),
range_end: Vec::new(),
limit: 0,
revision: 0,
sort_order: 0,
sort_target: 0,
serializable: false,
keys_only: false,
count_only: false,
min_mod_revision: 0,
max_mod_revision: 0,
min_create_revision: 0,
max_create_revision: 0,
},
key_range: KeyRange::new(),
}
}
#[inline]
pub fn with_range(mut self, end_key: impl Into<Vec<u8>>) -> Self {
self.key_range.with_range(end_key);
self
}
#[inline]
pub fn with_from_key(mut self) -> Self {
self.key_range.with_from_key();
self
}
#[inline]
pub fn with_prefix(mut self) -> Self {
self.key_range.with_prefix();
self
}
#[inline]
pub fn with_all_keys(mut self) -> Self {
self.key_range.with_all_keys();
self
}
#[inline]
pub const fn with_limit(mut self, limit: i64) -> Self {
self.req.limit = limit;
self
}
#[inline]
pub const fn with_revision(mut self, revision: i64) -> Self {
self.req.revision = revision;
self
}
#[inline]
pub fn with_sort(mut self, target: SortTarget, order: SortOrder) -> Self {
if target == SortTarget::Key && order == SortOrder::Ascend {
self.req.sort_order = SortOrder::None as i32;
} else {
self.req.sort_order = order as i32;
}
self.req.sort_target = target as i32;
self
}
#[inline]
pub const fn with_serializable(mut self) -> Self {
self.req.serializable = true;
self
}
#[inline]
pub const fn with_keys_only(mut self) -> Self {
self.req.keys_only = true;
self
}
#[inline]
pub const fn with_count_only(mut self) -> Self {
self.req.count_only = true;
self
}
#[inline]
pub const fn with_min_mod_revision(mut self, revision: i64) -> Self {
self.req.min_mod_revision = revision;
self
}
#[inline]
pub const fn with_max_mod_revision(mut self, revision: i64) -> Self {
self.req.max_mod_revision = revision;
self
}
#[inline]
pub const fn with_min_create_revision(mut self, revision: i64) -> Self {
self.req.min_create_revision = revision;
self
}
#[inline]
pub const fn with_max_create_revision(mut self, revision: i64) -> Self {
self.req.max_create_revision = revision;
self
}
#[inline]
pub(crate) fn key_range_end_mut(&mut self) -> &mut Vec<u8> {
&mut self.key_range.range_end
}
}
impl From<GetOptions> for PbRangeRequest {
#[inline]
fn from(mut options: GetOptions) -> Self {
let (key, rang_end) = options.key_range.build();
options.req.key = key;
options.req.range_end = rang_end;
options.req
}
}
impl IntoRequest<PbRangeRequest> for GetOptions {
#[inline]
fn into_request(self) -> Request<PbRangeRequest> {
Request::new(self.into())
}
}
#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct GetResponse(PbRangeResponse);
impl GetResponse {
#[inline]
const fn new(resp: PbRangeResponse) -> Self {
Self(resp)
}
#[inline]
pub fn header(&self) -> Option<&ResponseHeader> {
self.0.header.as_ref().map(From::from)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
#[inline]
pub fn kvs(&self) -> &[KeyValue] {
unsafe { &*(self.0.kvs.as_slice() as *const _ as *const [KeyValue]) }
}
#[inline]
pub fn take_kvs(&mut self) -> Vec<KeyValue> {
let kvs = ManuallyDrop::new(std::mem::take(&mut self.0.kvs));
unsafe { Vec::from_raw_parts(kvs.as_ptr() as *mut KeyValue, kvs.len(), kvs.capacity()) }
}
#[inline]
pub(crate) fn strip_kvs_prefix(&mut self, prefix: &[u8]) {
for kv in self.0.kvs.iter_mut() {
kv.key.strip_key_prefix(prefix);
}
}
#[inline]
pub const fn more(&self) -> bool {
self.0.more
}
#[inline]
pub const fn count(&self) -> i64 {
self.0.count
}
}
#[derive(Debug, Default, Clone)]
pub struct DeleteOptions {
req: PbDeleteRequest,
key_range: KeyRange,
}
impl DeleteOptions {
#[inline]
fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
self.key_range.with_key(key);
self
}
#[inline]
pub const fn new() -> Self {
Self {
req: PbDeleteRequest {
key: Vec::new(),
range_end: Vec::new(),
prev_kv: false,
},
key_range: KeyRange::new(),
}
}
#[inline]
pub fn with_range(mut self, end_key: impl Into<Vec<u8>>) -> Self {
self.key_range.with_range(end_key);
self
}
#[inline]
pub fn with_from_key(mut self) -> Self {
self.key_range.with_from_key();
self
}
#[inline]
pub fn with_prefix(mut self) -> Self {
self.key_range.with_prefix();
self
}
#[inline]
pub fn with_all_keys(mut self) -> Self {
self.key_range.with_all_keys();
self
}
#[inline]
pub const fn with_prev_key(mut self) -> Self {
self.req.prev_kv = true;
self
}
#[inline]
pub(crate) fn key_range_end_mut(&mut self) -> &mut Vec<u8> {
&mut self.key_range.range_end
}
}
impl From<DeleteOptions> for PbDeleteRequest {
#[inline]
fn from(mut options: DeleteOptions) -> Self {
let (key, rang_end) = options.key_range.build();
options.req.key = key;
options.req.range_end = rang_end;
options.req
}
}
impl IntoRequest<PbDeleteRequest> for DeleteOptions {
#[inline]
fn into_request(self) -> Request<DeleteRangeRequest> {
Request::new(self.into())
}
}
#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct DeleteResponse(PbDeleteResponse);
impl DeleteResponse {
#[inline]
const fn new(resp: PbDeleteResponse) -> Self {
Self(resp)
}
#[inline]
pub fn header(&self) -> Option<&ResponseHeader> {
self.0.header.as_ref().map(From::from)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
#[inline]
pub const fn deleted(&self) -> i64 {
self.0.deleted
}
#[inline]
pub fn prev_kvs(&self) -> &[KeyValue] {
unsafe { &*(self.0.prev_kvs.as_slice() as *const _ as *const [KeyValue]) }
}
#[inline]
pub fn take_prev_kvs(&mut self) -> Vec<KeyValue> {
let kvs = ManuallyDrop::new(std::mem::take(&mut self.0.prev_kvs));
unsafe { Vec::from_raw_parts(kvs.as_ptr() as *mut KeyValue, kvs.len(), kvs.capacity()) }
}
#[inline]
pub(crate) fn strip_prev_kvs_prefix(&mut self, prefix: &[u8]) {
for kv in self.0.prev_kvs.iter_mut() {
kv.key.strip_key_prefix(prefix);
}
}
}
#[derive(Debug, Default, Clone)]
#[repr(transparent)]
pub struct CompactionOptions(PbCompactionRequest);
impl CompactionOptions {
#[inline]
pub const fn new() -> Self {
Self(PbCompactionRequest {
revision: 0,
physical: false,
})
}
#[inline]
const fn with_revision(mut self, revision: i64) -> Self {
self.0.revision = revision;
self
}
#[inline]
pub const fn with_physical(mut self) -> Self {
self.0.physical = true;
self
}
}
impl IntoRequest<PbCompactionRequest> for CompactionOptions {
#[inline]
fn into_request(self) -> Request<CompactionRequest> {
Request::new(self.0)
}
}
#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct CompactionResponse(PbCompactionResponse);
impl CompactionResponse {
#[inline]
const fn new(resp: PbCompactionResponse) -> Self {
Self(resp)
}
#[inline]
pub fn header(&self) -> Option<&ResponseHeader> {
self.0.header.as_ref().map(From::from)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
}
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct Compare(PbCompare);
impl Compare {
#[inline]
fn new(
key: impl Into<Vec<u8>>,
cmp: CompareOp,
target: CompareTarget,
target_union: TargetUnion,
) -> Self {
Self(PbCompare {
result: cmp as i32,
target: target as i32,
key: key.into(),
range_end: Vec::new(),
target_union: Some(target_union),
})
}
#[inline]
pub fn version(key: impl Into<Vec<u8>>, cmp: CompareOp, version: i64) -> Self {
Self::new(
key,
cmp,
CompareTarget::Version,
TargetUnion::Version(version),
)
}
#[inline]
pub fn create_revision(key: impl Into<Vec<u8>>, cmp: CompareOp, revision: i64) -> Self {
Self::new(
key,
cmp,
CompareTarget::Create,
TargetUnion::CreateRevision(revision),
)
}
#[inline]
pub fn mod_revision(key: impl Into<Vec<u8>>, cmp: CompareOp, revision: i64) -> Self {
Self::new(
key,
cmp,
CompareTarget::Mod,
TargetUnion::ModRevision(revision),
)
}
#[inline]
pub fn value(key: impl Into<Vec<u8>>, cmp: CompareOp, value: impl Into<Vec<u8>>) -> Self {
Self::new(
key,
cmp,
CompareTarget::Value,
TargetUnion::Value(value.into()),
)
}
#[inline]
pub fn lease(key: impl Into<Vec<u8>>, cmp: CompareOp, lease: i64) -> Self {
Self::new(key, cmp, CompareTarget::Lease, TargetUnion::Lease(lease))
}
#[inline]
pub fn with_range(mut self, end: impl Into<Vec<u8>>) -> Self {
self.0.range_end = end.into();
self
}
#[inline]
pub fn with_prefix(mut self) -> Self {
self.0.range_end = get_prefix(&self.0.key);
self
}
}
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct TxnOp(PbTxnOp);
impl TxnOp {
#[inline]
pub fn put(
key: impl Into<Vec<u8>>,
value: impl Into<Vec<u8>>,
options: Option<PutOptions>,
) -> Self {
TxnOp(PbTxnOp::RequestPut(
options.unwrap_or_default().with_kv(key, value).into(),
))
}
#[inline]
pub fn get(key: impl Into<Vec<u8>>, options: Option<GetOptions>) -> Self {
TxnOp(PbTxnOp::RequestRange(
options.unwrap_or_default().with_key(key).into(),
))
}
#[inline]
pub fn delete(key: impl Into<Vec<u8>>, options: Option<DeleteOptions>) -> Self {
TxnOp(PbTxnOp::RequestDeleteRange(
options.unwrap_or_default().with_key(key).into(),
))
}
#[inline]
pub fn txn(txn: Txn) -> Self {
TxnOp(PbTxnOp::RequestTxn(txn.into()))
}
}
impl From<TxnOp> for PbTxnOp {
#[inline]
fn from(op: TxnOp) -> Self {
op.0
}
}
#[derive(Debug, Default, Clone)]
pub struct Txn {
req: PbTxnRequest,
c_when: bool,
c_then: bool,
c_else: bool,
}
impl Txn {
#[inline]
pub const fn new() -> Self {
Self {
req: PbTxnRequest {
compare: Vec::new(),
success: Vec::new(),
failure: Vec::new(),
},
c_when: false,
c_then: false,
c_else: false,
}
}
#[inline]
pub fn when(mut self, compares: impl Into<Vec<Compare>>) -> Self {
assert!(!self.c_when, "cannot call when twice");
assert!(!self.c_then, "cannot call when after and_then");
assert!(!self.c_else, "cannot call when after or_else");
self.c_when = true;
let compares = ManuallyDrop::new(compares.into());
self.req.compare = unsafe {
Vec::from_raw_parts(
compares.as_ptr() as *mut PbCompare,
compares.len(),
compares.capacity(),
)
};
self
}
#[inline]
pub fn and_then(mut self, operations: impl Into<Vec<TxnOp>>) -> Self {
assert!(!self.c_then, "cannot call and_then twice");
assert!(!self.c_else, "cannot call and_then after or_else");
self.c_then = true;
self.req.success = operations
.into()
.into_iter()
.map(|op| PbTxnRequestOp {
request: Some(op.into()),
})
.collect();
self
}
#[inline]
pub fn or_else(mut self, operations: impl Into<Vec<TxnOp>>) -> Self {
assert!(!self.c_else, "cannot call or_else twice");
self.c_else = true;
self.req.failure = operations
.into()
.into_iter()
.map(|op| PbTxnRequestOp {
request: Some(op.into()),
})
.collect();
self
}
#[inline]
pub(crate) fn prefix_with(&mut self, prefix: &[u8]) {
self.req.prefix_with(prefix);
}
}
impl PbTxnRequest {
fn prefix_with(&mut self, prefix: &[u8]) {
let prefix_op = |op: &mut PbTxnRequestOp| {
if let Some(request) = &mut op.request {
match request {
PbTxnOp::RequestRange(req) => {
req.key.prefix_with(prefix);
req.range_end.prefix_range_end_with(prefix);
}
PbTxnOp::RequestPut(req) => {
req.key.prefix_with(prefix);
}
PbTxnOp::RequestDeleteRange(req) => {
req.key.prefix_with(prefix);
req.range_end.prefix_range_end_with(prefix);
}
PbTxnOp::RequestTxn(req) => {
req.prefix_with(prefix);
}
}
}
};
self.compare.iter_mut().for_each(|cmp| {
cmp.key.prefix_with(prefix);
cmp.range_end.prefix_range_end_with(prefix);
});
self.success.iter_mut().for_each(prefix_op);
self.failure.iter_mut().for_each(prefix_op);
}
}
impl From<Txn> for PbTxnRequest {
#[inline]
fn from(txn: Txn) -> Self {
txn.req
}
}
impl IntoRequest<PbTxnRequest> for Txn {
#[inline]
fn into_request(self) -> Request<PbTxnRequest> {
Request::new(self.into())
}
}
#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
#[derive(Debug, Clone)]
pub enum TxnOpResponse {
Put(PutResponse),
Get(GetResponse),
Delete(DeleteResponse),
Txn(TxnResponse),
}
#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct TxnResponse(PbTxnResponse);
impl TxnResponse {
#[inline]
const fn new(resp: PbTxnResponse) -> Self {
Self(resp)
}
#[inline]
pub fn header(&self) -> Option<&ResponseHeader> {
self.0.header.as_ref().map(From::from)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
#[inline]
pub const fn succeeded(&self) -> bool {
self.0.succeeded
}
#[inline]
pub fn op_responses(&self) -> Vec<TxnOpResponse> {
self.0
.responses
.iter()
.map(|resp| match resp.response.as_ref().unwrap() {
PbTxnOpResponse::ResponsePut(put) => {
TxnOpResponse::Put(PutResponse::new(put.clone()))
}
PbTxnOpResponse::ResponseRange(get) => {
TxnOpResponse::Get(GetResponse::new(get.clone()))
}
PbTxnOpResponse::ResponseDeleteRange(delete) => {
TxnOpResponse::Delete(DeleteResponse::new(delete.clone()))
}
PbTxnOpResponse::ResponseTxn(txn) => {
TxnOpResponse::Txn(TxnResponse::new(txn.clone()))
}
})
.collect()
}
#[inline]
pub(crate) fn strip_key_prefix(&mut self, prefix: &[u8]) {
self.0.strip_key_prefix(prefix);
}
}
impl PbTxnResponse {
fn strip_key_prefix(&mut self, prefix: &[u8]) {
self.responses.iter_mut().for_each(|op| {
if let Some(resp) = &mut op.response {
match resp {
PbTxnOpResponse::ResponseRange(r) => {
for kv in r.kvs.iter_mut() {
kv.key.strip_key_prefix(prefix);
}
}
PbTxnOpResponse::ResponsePut(r) => {
if let Some(kv) = r.prev_kv.as_mut() {
kv.key.strip_key_prefix(prefix);
}
}
PbTxnOpResponse::ResponseDeleteRange(r) => {
for kv in r.prev_kvs.iter_mut() {
kv.key.strip_key_prefix(prefix);
}
}
PbTxnOpResponse::ResponseTxn(r) => {
r.strip_key_prefix(prefix);
}
}
}
});
}
}