From 667815897f7e18d1ac3202035ccad47fc3fa9059 Mon Sep 17 00:00:00 2001 From: Ryan Date: Mon, 5 Aug 2024 11:34:52 +0900 Subject: [PATCH] 0.1.6 --- Cargo.toml | 11 ++-- README.md | 5 +- RELEASE_NOTE.md | 8 ++- src/aws/credential.rs | 13 +++++ src/{ => aws}/kinesis.rs | 5 +- src/aws/mod.rs | 4 ++ src/{ => aws}/parameter_store.rs | 14 +---- src/aws/s3.rs | 84 +++++++++++++++++++++++++++++ src/config.rs | 2 +- src/lib.rs | 5 +- src/log.rs | 26 +-------- src/model/error.rs | 12 +++++ src/model/kinesis_message.rs | 52 ------------------ src/model/kinesis_message_detail.rs | 41 -------------- src/model/mod.rs | 2 - 15 files changed, 137 insertions(+), 147 deletions(-) create mode 100644 src/aws/credential.rs rename src/{ => aws}/kinesis.rs (89%) create mode 100644 src/aws/mod.rs rename src/{ => aws}/parameter_store.rs (73%) create mode 100644 src/aws/s3.rs delete mode 100644 src/model/kinesis_message.rs delete mode 100644 src/model/kinesis_message_detail.rs diff --git a/Cargo.toml b/Cargo.toml index ab95dd2..627b0c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,16 +1,16 @@ [package] name = "device-common" -version = "0.1.5" +version = "0.1.6" edition = "2021" authors = ["Ryan Bae "] [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -bincode = "1.3" aws-config = { version = "1.5" } -aws-sdk-kinesis = { version = "1.36", features = ["behavior-version-latest"] } -aws-sdk-ssm = "1.40" +aws-sdk-kinesis = { version = "1.3", features = ["behavior-version-latest"] } +aws-sdk-s3 = "1.4" +aws-sdk-ssm = "1.4" aws-types="1.3" aws-credential-types = { version = "1.2", features = ["hardcoded-credentials"] } deadpool-postgres = "0.14" @@ -25,8 +25,9 @@ lapin = "2.5" sysinfo = "0.31" prometheus = "0.13" dashmap = "6.0" -serial_test = "3.1" async-trait = "0.1" +bytes = "1.7" +serial_test = "3.1" [profile.dev] opt-level = 0 diff --git a/README.md b/README.md index cee28d4..08586b9 100644 --- a/README.md +++ b/README.md @@ -5,8 +5,9 @@ device-service 에서 공통으로 쓰이는 기능들의 집합 - config.rs : 필수 설정 정보를 AWS Parameter Store 로 부터 읽어온다. - database.rs : pg 관련 기능(connection pool 등) - env.rs : 환경 변수 관련 기능 / 환경 변수별 AWS Parameter Store 에서 쓰일 prefix 를 만든다. -- kinesis.rs : AWS Kinesis Stream 관련 기능 - log.rs : log 관련 기능 / static function 으로 사용 가능한 expect_or_log - metrics.rs : cpu / memory 등 기본 metric 수집 기능 - orchestration.rs : orchestration 정보를 로드하는 기능 -- parameter_store.rs : AWS Parameter Store 관련 기능 \ No newline at end of file +- aws/kinesis.rs : AWS Kinesis Stream 관련 기능 +- aws/parameter_store.rs : AWS Parameter Store 관련 기능 +- aws/s3.rs : AWS S3 관련 기능 \ No newline at end of file diff --git a/RELEASE_NOTE.md b/RELEASE_NOTE.md index 1d17217..aef35d7 100644 --- a/RELEASE_NOTE.md +++ b/RELEASE_NOTE.md @@ -21,4 +21,10 @@ first deploy 0.1.5 ====== -- fix bug for log \ No newline at end of file +- fix bug for log + +0.1.6 +====== +- changed kinesis_message(byte struct) to string(json) +- added 'aws' directory +- added s3.rs \ No newline at end of file diff --git a/src/aws/credential.rs b/src/aws/credential.rs new file mode 100644 index 0000000..b3f2075 --- /dev/null +++ b/src/aws/credential.rs @@ -0,0 +1,13 @@ +use tracing_unwrap::ResultExt; +use crate::constants; + +pub fn get_aws_credential() -> (String, String, String) { + let access_key = crate::env::get(constants::ENV_AWS_ACCESS_KEY) + .expect_or_log("env not found: AWS_ACCESS_KEY"); + let secret_key = crate::env::get(constants::ENV_AWS_SECRET_KEY) + .expect_or_log("env not found: AWS_SECRET_KEY"); + let region = crate::env::get(constants::ENV_AWS_REGION) + .expect_or_log("env not found: AWS_REGION"); + + (access_key, secret_key, region) +} \ No newline at end of file diff --git a/src/kinesis.rs b/src/aws/kinesis.rs similarity index 89% rename from src/kinesis.rs rename to src/aws/kinesis.rs index 9d4ba2c..89c7dda 100644 --- a/src/kinesis.rs +++ b/src/aws/kinesis.rs @@ -5,7 +5,6 @@ use aws_sdk_kinesis::primitives::Blob; use aws_credential_types::Credentials; use tokio::sync::OnceCell; use crate::model::error::DeviceError; -use crate::model::kinesis_message::KinesisMessage; static AWS_KINESIS_CONSUMER_CLIENT: OnceCell> = OnceCell::const_new(); @@ -32,8 +31,8 @@ async fn get_client() -> Arc { .clone() } -pub async fn kinesis_put_record(msg: &KinesisMessage) -> Result<(), DeviceError> { - let blob = msg.encode(); +pub async fn kinesis_put_record(msg: &String) -> Result<(), DeviceError> { + let blob = msg.as_bytes(); let config = crate::config::get_config() .await .clone(); diff --git a/src/aws/mod.rs b/src/aws/mod.rs new file mode 100644 index 0000000..a7644be --- /dev/null +++ b/src/aws/mod.rs @@ -0,0 +1,4 @@ +pub mod s3; +pub mod kinesis; +pub mod parameter_store; +mod credential; \ No newline at end of file diff --git a/src/parameter_store.rs b/src/aws/parameter_store.rs similarity index 73% rename from src/parameter_store.rs rename to src/aws/parameter_store.rs index 4c198f0..b77d153 100644 --- a/src/parameter_store.rs +++ b/src/aws/parameter_store.rs @@ -5,23 +5,11 @@ use aws_sdk_ssm::Client; use aws_types::region::Region; use tokio::sync::OnceCell; use tracing_unwrap::{OptionExt, ResultExt}; -use crate::constants; static AWS_SSM_CLIENT: OnceCell> = OnceCell::const_new(); -fn get_aws_credential() -> (String, String, String) { - let access_key = crate::env::get(constants::ENV_AWS_ACCESS_KEY) - .expect_or_log("env not found: AWS_ACCESS_KEY"); - let secret_key = crate::env::get(constants::ENV_AWS_SECRET_KEY) - .expect_or_log("env not found: AWS_SECRET_KEY"); - let region = crate::env::get(constants::ENV_AWS_REGION) - .expect_or_log("env not found: AWS_REGION"); - - (access_key, secret_key, region) -} - async fn initialize_data() -> Arc { - let (access_key, secret_key, region) = get_aws_credential(); + let (access_key, secret_key, region) = crate::aws::credential::get_aws_credential(); let region_provider = RegionProviderChain::default_provider().or_else(Region::new(region)); let region = region_provider.region().await.unwrap(); diff --git a/src/aws/s3.rs b/src/aws/s3.rs new file mode 100644 index 0000000..ff98bfa --- /dev/null +++ b/src/aws/s3.rs @@ -0,0 +1,84 @@ +use std::sync::Arc; +use aws_config::meta::region::RegionProviderChain; +use aws_credential_types::Credentials; +use aws_types::region::Region; +use bytes::Bytes; +use tokio::sync::OnceCell; +use tracing::info; +use crate::model::error::DeviceError; + +static AWS_S3_CLIENT: OnceCell> = OnceCell::const_new(); + +async fn initialize_data() -> Arc { + let (access_key, secret_key, region) = crate::aws::credential::get_aws_credential(); + + let region_provider = RegionProviderChain::default_provider().or_else(Region::new(region)); + let region = region_provider.region().await.unwrap(); + + let credentials = Credentials::from_keys( + access_key, + secret_key, + None, + ); + + let config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region(region) + .credentials_provider(credentials) + .load() + .await; + + Arc::new(aws_sdk_s3::Client::new(&config)) +} + +async fn get_client() -> Arc { + AWS_S3_CLIENT.get_or_init(initialize_data) + .await + .clone() +} + +pub async fn read_file(bucket: &str, key: &str) -> Result { + let client = get_client().await; + + let res = client + .get_object() + .bucket(bucket) + .key(key) + .send() + .await?; + + let data = res + .body + .collect() + .await?; + + let data: Bytes = data.into_bytes(); + + if data.is_empty() { + return Err(DeviceError::GeneralError("length".to_string())) + } + + Ok(String::from_utf8_lossy(&data).into_owned()) +} + +#[cfg(test)] +mod tests { + use super::*; + use serial_test::serial; + use crate::constants; + + fn setup() { + crate::env::set(constants::ENV_AWS_ACCESS_KEY, "AKIA32XY6L3KQA4QDIAP"); + crate::env::set(constants::ENV_AWS_SECRET_KEY, "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"); + crate::env::set(constants::ENV_AWS_REGION, "ap-northeast-2"); + } + + fn teardown() {} + + #[tokio::test] + #[serial] + async fn test_read_file() { + setup(); + assert!(read_file("apm-bigdata", "flower-device/test.csv").await.is_ok()); + teardown(); + } +} \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 9f84851..3376a56 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use tracing_unwrap::ResultExt; use tokio::sync::OnceCell; use crate::constants; -use crate::parameter_store::get_parameter; +use crate::aws::parameter_store::get_parameter; #[derive(Debug, PartialEq, Serialize, Deserialize)] pub struct Rabbitmq { diff --git a/src/lib.rs b/src/lib.rs index 39c0268..b443434 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,10 +2,9 @@ pub mod config; pub mod model; pub mod database; -pub mod kinesis; pub mod log; pub mod env; pub mod orchestration; pub mod metrics; -pub mod parameter_store; -pub mod constants; \ No newline at end of file +pub mod constants; +mod aws; \ No newline at end of file diff --git a/src/log.rs b/src/log.rs index 83f3fdb..d507fb3 100644 --- a/src/log.rs +++ b/src/log.rs @@ -1,11 +1,11 @@ use std::env::current_exe; -use std::{env, fs}; +use std::fs; use std::path::Path; use tracing::{error, Level, subscriber}; use tracing_appender::non_blocking::{WorkerGuard}; use tracing_appender::rolling::{RollingFileAppender, Rotation}; use crate::constants::{FAILOVER_LOG_LEVEL_KEY, GATEWAY_LOG_LEVEL_KEY, NODE_LOG_LEVEL_KEY, TICKET_LOG_LEVEL_KEY}; -use crate::parameter_store::get_parameter; +use crate::aws::parameter_store::get_parameter; fn get_log_level_parameter_key() -> Option { let mut ret: Option = None; @@ -84,26 +84,4 @@ fn get_log_dir() -> String { current_dir.pop(); current_dir.push("log"); current_dir.display().to_string() -} - -#[cfg(test)] -mod tests { - use super::*; - use serial_test::serial; - - fn setup() { - - } - - fn teardown() { - - } - - #[test] - #[serial] - fn test_get_log_level_parameter_key() { - setup(); - assert!(get_log_level_parameter_key().is_none()); - teardown(); - } } \ No newline at end of file diff --git a/src/model/error.rs b/src/model/error.rs index 3329658..ce3c279 100644 --- a/src/model/error.rs +++ b/src/model/error.rs @@ -1,4 +1,7 @@ use aws_sdk_kinesis::operation::put_record::PutRecordError; +use aws_sdk_s3::operation::get_object::GetObjectError; +use aws_sdk_s3::operation::list_buckets::ListBucketsError; +use aws_sdk_s3::primitives::ByteStreamError; use aws_sdk_ssm::operation::get_parameter::GetParameterError; use thiserror::Error; use tokio_postgres::Error as PostgresError; @@ -29,6 +32,15 @@ pub enum DeviceError { #[error("Parameter Store error: {0}")] GetParameterError(#[from] aws_sdk_ssm::error::SdkError), + #[error("S3 error: {0}")] + ListBucketsError(#[from] aws_sdk_s3::error::SdkError), + + #[error("S3 error: {0}")] + GetObjectError(#[from] aws_sdk_s3::error::SdkError), + + #[error("S3 error: {0}")] + ByteStreamError(#[from] ByteStreamError), + #[error("serde_json error: {0}")] SerdeJsonError(#[from] serde_json::error::Error) } \ No newline at end of file diff --git a/src/model/kinesis_message.rs b/src/model/kinesis_message.rs deleted file mode 100644 index 0597bb7..0000000 --- a/src/model/kinesis_message.rs +++ /dev/null @@ -1,52 +0,0 @@ -use std::mem::size_of; -use serde::{Deserialize, Serialize}; - -#[repr(u8)] -#[derive(Debug, PartialEq)] -pub enum KinesisMessageType { - DeviceUpsert = 0x01 -} - -#[repr(C)] -#[derive(Serialize, Deserialize, PartialEq, Debug)] -pub struct KinesisMessage { - pub action: u8, // data type - pub size: i32, // data size - pub data: Vec // data -} - -impl KinesisMessage { - pub fn new(action: u8, size: i32, data: Vec) -> KinesisMessage { - KinesisMessage { - action, - size, - data, - } - } - - pub fn encode(&self) -> Vec { - let mut bin = Vec::new(); - bin.push(self.action); - bin.extend_from_slice(&self.size.to_ne_bytes()); - bin.extend_from_slice(&self.data); - bin - } - - pub fn decode(encoded: Vec) -> KinesisMessage { - let mut current_index = 0; - let action = encoded[current_index]; - current_index = current_index + size_of::(); - - let size_vec = encoded.get(current_index..current_index + size_of::()).unwrap().try_into().unwrap(); - let size = i32::from_ne_bytes(size_vec); - current_index = current_index + size_of::(); - - let data = encoded[current_index..current_index + size as usize].to_vec(); - - KinesisMessage { - action, - size, - data, - } - } -} \ No newline at end of file diff --git a/src/model/kinesis_message_detail.rs b/src/model/kinesis_message_detail.rs deleted file mode 100644 index df94111..0000000 --- a/src/model/kinesis_message_detail.rs +++ /dev/null @@ -1,41 +0,0 @@ -use serde::{Deserialize, Serialize}; - -pub trait KinesisMessageDetail { - fn encode(&self) -> Vec; - fn decode(encoded: Vec) -> T; -} - -#[derive(Serialize, Deserialize, PartialEq, Debug)] -#[repr(C)] -pub struct UpsertKinesisMessageDetail { - pub finger_print: String, - pub so_id: String, - pub uuid: String, - pub use_personalized_ad: bool, - pub test: Option, - pub zip_code: Option, - pub free_storage: i64, - pub used_storage: i64, - pub cached_storage: i64, - pub model_name: String, - pub firmware_build_date: Option, - pub firmware_ver: Option, - pub full_firmware_ver: Option, - pub app_version: String, - pub platform_ad_id: Option, - pub a_key: Option, - pub is_exist: bool, - pub device_id: i64 -} - -impl KinesisMessageDetail for UpsertKinesisMessageDetail -where - T: Serialize + serde::de::DeserializeOwned,{ - fn encode(&self) -> Vec { - bincode::serialize(&self).unwrap() - } - - fn decode(encoded: Vec) -> T { - bincode::deserialize(&encoded).unwrap() - } -} \ No newline at end of file diff --git a/src/model/mod.rs b/src/model/mod.rs index 6d2e461..c49f011 100644 --- a/src/model/mod.rs +++ b/src/model/mod.rs @@ -1,4 +1,2 @@ -pub mod kinesis_message_detail; -pub mod kinesis_message; pub mod error; pub mod orchestration; \ No newline at end of file