c2pa_crypto/cose/
sign1.rsuse async_generic::async_generic;
use c2pa_status_tracker::{log_item, validation_codes::CLAIM_SIGNATURE_MISMATCH, StatusTracker};
use ciborium::value::Value;
use coset::{
iana::{self, Algorithm, EnumI64},
CoseSign1, Label, RegisteredLabelWithPrivate, TaggedCborSerializable,
};
use crate::{
cose::{validate_cose_tst_info, validate_cose_tst_info_async, CoseError},
raw_signature::SigningAlg,
};
pub fn parse_cose_sign1(
cose_bytes: &[u8],
data: &[u8],
validation_log: &mut impl StatusTracker,
) -> Result<CoseSign1, CoseError> {
let mut sign1 = <coset::CoseSign1 as TaggedCborSerializable>::from_tagged_slice(cose_bytes)
.map_err(|coset_error| {
log_item!(
"Cose_Sign1",
"could not parse signature",
"parse_cose_sign1"
)
.validation_status(CLAIM_SIGNATURE_MISMATCH)
.failure_no_throw(
validation_log,
CoseError::CborParsingError(coset_error.to_string()),
);
CoseError::CborParsingError(coset_error.to_string())
})?;
sign1.payload = Some(data.to_vec());
Ok(sign1)
}
pub fn signing_alg_from_sign1(sign1: &coset::CoseSign1) -> Result<SigningAlg, CoseError> {
let Some(ref alg) = sign1.protected.header.alg else {
return Err(CoseError::UnsupportedSigningAlgorithm);
};
match alg {
RegisteredLabelWithPrivate::PrivateUse(a) => match a {
-39 => Ok(SigningAlg::Ps512),
-38 => Ok(SigningAlg::Ps384),
-37 => Ok(SigningAlg::Ps256),
-36 => Ok(SigningAlg::Es512),
-35 => Ok(SigningAlg::Es384),
-7 => Ok(SigningAlg::Es256),
-8 => Ok(SigningAlg::Ed25519),
_ => Err(CoseError::UnsupportedSigningAlgorithm),
},
RegisteredLabelWithPrivate::Assigned(a) => match a {
Algorithm::PS512 => Ok(SigningAlg::Ps512),
Algorithm::PS384 => Ok(SigningAlg::Ps384),
Algorithm::PS256 => Ok(SigningAlg::Ps256),
Algorithm::ES512 => Ok(SigningAlg::Es512),
Algorithm::ES384 => Ok(SigningAlg::Es384),
Algorithm::ES256 => Ok(SigningAlg::Es256),
Algorithm::EdDSA => Ok(SigningAlg::Ed25519),
_ => Err(CoseError::UnsupportedSigningAlgorithm),
},
_ => Err(CoseError::UnsupportedSigningAlgorithm),
}
}
pub fn cert_chain_from_sign1(sign1: &coset::CoseSign1) -> Result<Vec<Vec<u8>>, CoseError> {
let Some(value) = sign1
.protected
.header
.rest
.iter()
.find_map(|x: &(Label, Value)| {
if x.0 == Label::Text("x5chain".to_string())
|| x.0 == Label::Int(iana::HeaderParameter::X5Chain.to_i64())
{
Some(x.1.clone())
} else {
None
}
})
else {
return get_unprotected_header_certs(sign1);
};
if get_unprotected_header_certs(sign1).is_ok() {
return Err(CoseError::MultipleSigningCertificateChains);
}
cert_chain_from_cbor_value(value)
}
fn get_unprotected_header_certs(sign1: &coset::CoseSign1) -> Result<Vec<Vec<u8>>, CoseError> {
let Some(value) = sign1
.unprotected
.rest
.iter()
.find_map(|x: &(Label, Value)| {
if x.0 == Label::Text("x5chain".to_string()) {
Some(x.1.clone())
} else {
None
}
})
else {
return Err(CoseError::MissingSigningCertificateChain);
};
cert_chain_from_cbor_value(value)
}
fn cert_chain_from_cbor_value(value: Value) -> Result<Vec<Vec<u8>>, CoseError> {
match value {
Value::Array(cert_chain) => {
let certs: Vec<Vec<u8>> = cert_chain
.iter()
.filter_map(|c| {
if let Value::Bytes(der_bytes) = c {
Some(der_bytes.clone())
} else {
None
}
})
.collect();
if certs.is_empty() {
Err(CoseError::MissingSigningCertificateChain)
} else {
Ok(certs)
}
}
Value::Bytes(ref der_bytes) => Ok(vec![der_bytes.clone()]),
_ => Err(CoseError::MissingSigningCertificateChain),
}
}
#[async_generic]
pub fn signing_time_from_sign1(
sign1: &coset::CoseSign1,
data: &[u8],
) -> Option<chrono::DateTime<chrono::Utc>> {
let time_stamp_info = if _sync {
validate_cose_tst_info(sign1, data)
} else {
validate_cose_tst_info_async(sign1, data).await
};
if let Ok(tst_info) = time_stamp_info {
Some(tst_info.gen_time.into())
} else {
None
}
}