From 05f5993ab59738f51c6f02e3db2a92a84bd9e77b Mon Sep 17 00:00:00 2001 From: Chase Douglas Date: Mon, 17 Mar 2025 10:51:21 -0700 Subject: [PATCH] PR improvements --- Cargo.lock | 2 ++ Cargo.toml | 1 + src/api/core/ciphers.rs | 2 +- src/api/core/sends.rs | 37 +++++++++++++++++++++---------------- src/api/icons.rs | 2 +- src/config.rs | 5 +++-- src/db/models/attachment.rs | 9 ++------- src/util.rs | 18 ++++++++---------- 8 files changed, 39 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index acfccfa0..8a16dbbd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4905,6 +4905,7 @@ checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -5249,6 +5250,7 @@ dependencies = [ "syslog", "time", "tokio", + "tokio-util", "totp-lite", "tracing", "url", diff --git a/Cargo.toml b/Cargo.toml index e0a5e04c..a871cc73 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,6 +74,7 @@ dashmap = "6.1.0" # Async futures futures = "0.3.31" tokio = { version = "1.45.1", features = ["rt-multi-thread", "fs", "io-util", "parking_lot", "time", "signal", "net"] } +tokio-util = { version = "0.7.15", features = ["compat"]} # A generic serialization/deserialization framework serde = { version = "1.0.219", features = ["derive"] } diff --git a/src/api/core/ciphers.rs b/src/api/core/ciphers.rs index 337a26eb..aecbe28a 100644 --- a/src/api/core/ciphers.rs +++ b/src/api/core/ciphers.rs @@ -1265,7 +1265,7 @@ async fn save_attachment( attachment.save(&mut conn).await.expect("Error saving attachment"); } - save_temp_file(PathType::Attachments, &format!("{cipher_id}/{file_id}"), data.data).await?; + save_temp_file(PathType::Attachments, &format!("{cipher_id}/{file_id}"), data.data, true).await?; nt.send_cipher_update( UpdateType::SyncCipherUpdate, diff --git a/src/api/core/sends.rs b/src/api/core/sends.rs index 14bb05bf..a4210862 100644 --- a/src/api/core/sends.rs +++ b/src/api/core/sends.rs @@ -1,3 +1,4 @@ +use std::error::Error as _; use std::path::Path; use std::time::Duration; @@ -271,7 +272,7 @@ async fn post_send_file(data: Form>, headers: Headers, mut conn: let file_id = crate::crypto::generate_send_file_id(); - save_temp_file(PathType::Sends, &format!("{}/{file_id}", send.uuid), data).await?; + save_temp_file(PathType::Sends, &format!("{}/{file_id}", send.uuid), data, true).await?; let mut data_value: Value = serde_json::from_str(&send.data)?; if let Some(o) = data_value.as_object_mut() { @@ -421,20 +422,29 @@ async fn post_send_file_v2_data( err!("Send file size does not match.", format!("Expected a file size of {} got {size}", send_data.size)); } - let operator = CONFIG.opendal_operator_for_path_type(PathType::Sends)?; let file_path = format!("{send_id}/{file_id}"); - // Check if the file already exists, if that is the case do not overwrite it - if operator.exists(&file_path).await.map_err(|e| { + save_temp_file(PathType::Sends, &file_path, data.data, false).await.map_err(|e| { + let was_file_exists_error = e + .source() + .and_then(|e| e.downcast_ref::()) + .and_then(|e| e.get_ref()) + .and_then(|e| e.downcast_ref::()) + .map(|e| e.kind() == opendal::ErrorKind::ConditionNotMatch) + .unwrap_or(false); + + if was_file_exists_error { + return crate::Error::new( + "Send file has already been uploaded.", + format!("File {file_path:?} already exists"), + ); + } + crate::Error::new( "Unexpected error while creating send file", - format!("Error while checking existence of send file at path {file_path}: {e:?}"), + format!("Error while saving send file at path {file_path}: {e:?}"), ) - })? { - err!("Send file has already been uploaded.", format!("File {file_path:?} already exists")) - } - - save_temp_file(PathType::Sends, &file_path, data.data).await?; + })?; nt.send_send_update( UpdateType::SyncSendCreate, @@ -583,12 +593,7 @@ async fn download_url(host: &Host, send_id: &SendId, file_id: &SendFileId) -> Re Ok(format!("{}/api/sends/{send_id}/{file_id}?t={token}", &host.host)) } else { - Ok(operator - .presign_read(&format!("{send_id}/{file_id}"), Duration::from_secs(5 * 60)) - .await - .map_err(Into::::into)? - .uri() - .to_string()) + Ok(operator.presign_read(&format!("{send_id}/{file_id}"), Duration::from_secs(5 * 60)).await?.uri().to_string()) } } diff --git a/src/api/icons.rs b/src/api/icons.rs index 903e0040..acf3943e 100644 --- a/src/api/icons.rs +++ b/src/api/icons.rs @@ -228,7 +228,7 @@ async fn icon_is_negcached(path: &str) -> bool { Ok(true) => { match CONFIG.opendal_operator_for_path_type(PathType::IconCache) { Ok(operator) => { - if let Err(e) = operator.delete_iter([miss_indicator]).await { + if let Err(e) = operator.delete(&miss_indicator).await { error!("Could not remove negative cache indicator for icon {path:?}: {e:?}"); } } diff --git a/src/config.rs b/src/config.rs index e12b25ba..666090fd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1184,7 +1184,7 @@ fn opendal_operator_for_path(path: &str) -> Result { opendal_s3_operator_for_path(path)? } else { let builder = opendal::services::Fs::default().root(path); - opendal::Operator::new(builder).map_err(Into::::into)?.finish() + opendal::Operator::new(builder)?.finish() }; operators_by_path.insert(path.to_string(), operator.clone()); @@ -1233,11 +1233,12 @@ fn opendal_s3_operator_for_path(path: &str) -> Result let builder = opendal::services::S3::default() .customized_credential_load(Box::new(OPEN_DAL_S3_CREDENTIAL_LOADER)) + .enable_virtual_host_style() .bucket(bucket) .root(url.path()) .default_storage_class("INTELLIGENT_TIERING"); - Ok(opendal::Operator::new(builder).map_err(Into::::into)?.finish()) + Ok(opendal::Operator::new(builder)?.finish()) } pub enum PathType { diff --git a/src/db/models/attachment.rs b/src/db/models/attachment.rs index a7e8d4fb..aafb8766 100644 --- a/src/db/models/attachment.rs +++ b/src/db/models/attachment.rs @@ -51,12 +51,7 @@ impl Attachment { let token = encode_jwt(&generate_file_download_claims(self.cipher_uuid.clone(), self.id.clone())); Ok(format!("{host}/attachments/{}/{}?token={token}", self.cipher_uuid, self.id)) } else { - Ok(operator - .presign_read(&self.get_file_path(), Duration::from_secs(5 * 60)) - .await - .map_err(Into::::into)? - .uri() - .to_string()) + Ok(operator.presign_read(&self.get_file_path(), Duration::from_secs(5 * 60)).await?.uri().to_string()) } } @@ -126,7 +121,7 @@ impl Attachment { let operator = CONFIG.opendal_operator_for_path_type(PathType::Attachments)?; let file_path = self.get_file_path(); - if let Err(e) = operator.delete_iter([file_path.clone()]).await { + if let Err(e) = operator.delete(&file_path).await { if e.kind() == opendal::ErrorKind::NotFound { debug!("File '{file_path}' already deleted."); } else { diff --git a/src/util.rs b/src/util.rs index 0d987b8e..c8a86d43 100644 --- a/src/util.rs +++ b/src/util.rs @@ -828,24 +828,22 @@ pub fn is_global(ip: std::net::IpAddr) -> bool { } /// Saves a Rocket temporary file to the OpenDAL Operator at the given path. -/// -/// Ideally we would stream the Rocket TempFile directly to the OpenDAL -/// Operator, but Tempfile exposes a tokio ASyncBufRead trait, which OpenDAL -/// does not support. This could be reworked in the future to read and write -/// chunks to reduce copy overhead. pub async fn save_temp_file( path_type: PathType, path: &str, temp_file: rocket::fs::TempFile<'_>, + overwrite: bool, ) -> Result<(), crate::Error> { - use tokio::io::AsyncReadExt as _; + use futures::AsyncWriteExt as _; + use tokio_util::compat::TokioAsyncReadCompatExt as _; let operator = CONFIG.opendal_operator_for_path_type(path_type)?; - let mut read_stream = temp_file.open().await?; - let mut buf = Vec::with_capacity(temp_file.len() as usize); - read_stream.read_to_end(&mut buf).await?; - operator.write(path, buf).await?; + let mut read_stream = temp_file.open().await?.compat(); + let mut writer = operator.writer_with(path).if_not_exists(!overwrite).await?.into_futures_async_write(); + futures::io::copy(&mut read_stream, &mut writer).await?; + writer.close().await?; + Ok(()) }