Browse Source

0.1.2

master 0.1.2
Ryan 1 year ago
parent
commit
3f2e922bcc
  1. 9
      Cargo.toml
  2. 6
      RELEASE_NOTE.md
  3. 19
      src/config.rs
  4. 6
      src/constants.rs
  5. 26
      src/database.rs
  6. 3
      src/lib.rs
  7. 14
      src/log.rs
  8. 30
      src/s3.rs

9
Cargo.toml

@ -1,6 +1,6 @@
[package]
name = "device-common"
version = "0.1.1"
version = "0.1.2"
edition = "2021"
authors = ["Ryan Bae <jh.bae@anypointmedia.com>"]
@ -13,6 +13,8 @@ aws-sdk-kinesis = { version = "1.36", features = ["behavior-version-latest"] }
aws-sdk-ssm = "1.40"
aws-types="1.3"
aws-credential-types = { version = "1.2", features = ["hardcoded-credentials"] }
rusoto_core = "0.48"
rusoto_s3 = "0.48"
deadpool-postgres = "0.14"
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
tracing = "0.1"
@ -21,7 +23,7 @@ tracing-unwrap = "1.0"
tracing-appender = "0.2"
thiserror = "1.0"
tokio = { version = "1.39", features = ["full"] }
lapin = "2.3"
lapin = "2.5"
sysinfo = "0.30"
prometheus = "0.13"
dashmap = "6.0"
@ -29,9 +31,6 @@ toml = "0.8"
serial_test = "3.1"
async-trait = "0.1"
[dev-dependencies]
mockall = "0.13"
[profile.dev]
opt-level = 0

6
RELEASE_NOTE.md

@ -5,4 +5,8 @@ first deploy
0.1.1
======
- write test code sample
- used tokio::sync::oncecell instead of std::sync::oncelock for async
- used tokio::sync::oncecell instead of std::sync::oncelock for async
0.1.2
======
- create log directory

19
src/config.rs

@ -5,10 +5,6 @@ use tokio::sync::OnceCell;
use crate::constants;
use crate::parameter_store::get_parameter;
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct Log {
pub level: String,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct Rabbitmq {
pub mapper: RabbitmqDetail,
@ -52,6 +48,7 @@ pub struct AwsKinesisStream {
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct Config {
pub database: Database,
pub campaign_database: Database,
pub rabbitmq: Rabbitmq,
pub kinesis_stream: AwsKinesisStream,
}
@ -70,6 +67,16 @@ async fn initialize_data() -> Arc<Config> {
password: get_parameter(constants::DB_PASSWORD_KEY.to_string()).await,
name: get_parameter(constants::DB_NAME_KEY.to_string()).await,
},
campaign_database: Database {
host: get_parameter(constants::DB_CAMPAIGN_HOST_KEY.to_string()).await,
port: get_parameter( constants::DB_CAMPAIGN_PORT_KEY.to_string())
.await
.parse()
.unwrap_or_log(),
user: get_parameter(constants::DB_CAMPAIGN_USER_KEY.to_string()).await,
password: get_parameter(constants::DB_CAMPAIGN_PASSWORD_KEY.to_string()).await,
name: get_parameter(constants::DB_CAMPAIGN_NAME_KEY.to_string()).await,
},
rabbitmq: Rabbitmq {
mapper: RabbitmqDetail {
host: get_parameter(constants::RABBITMQ_HOST_KEY.to_string()).await,
@ -111,5 +118,7 @@ async fn initialize_data() -> Arc<Config> {
}
pub async fn get_config() -> Arc<Config> {
CONFIG.get_or_init(initialize_data).await.clone()
CONFIG.get_or_init(initialize_data)
.await
.clone()
}

6
src/constants.rs

@ -26,6 +26,12 @@ pub const DB_USER_KEY: &str = "/flower/device/common/db/user";
pub const DB_PASSWORD_KEY: &str = "/flower/device/common/db/password";
pub const DB_NAME_KEY: &str = "/flower/device/common/db/name";
pub const DB_CAMPAIGN_HOST_KEY: &str = "/flower/device/common/campaign/db/host";
pub const DB_CAMPAIGN_PORT_KEY: &str = "/flower/device/common/campaign/db/port";
pub const DB_CAMPAIGN_USER_KEY: &str = "/flower/device/common/campaign/db/user";
pub const DB_CAMPAIGN_PASSWORD_KEY: &str = "/flower/device/common/campaign/db/password";
pub const DB_CAMPAIGN_NAME_KEY: &str = "/flower/device/common/campaign/db/name";
pub const ENV_AWS_ACCESS_KEY: &str = "AWS_ACCESS_KEY";
pub const ENV_AWS_SECRET_KEY: &str = "AWS_SECRET_KEY";
pub const ENV_AWS_REGION: &str = "AWS_REGION";

26
src/database.rs

@ -5,8 +5,9 @@ use tokio_postgres::NoTls;
use crate::config::get_config;
static CONNECTION_POOL: OnceCell<Arc<Pool>> = OnceCell::const_new();
static CAMPAIGN_CONNECTION_POOL: OnceCell<Arc<Pool>> = OnceCell::const_new();
async fn initialize_data() -> Arc<Pool> {
async fn initialize_connection_pool() -> Arc<Pool> {
let config = get_config().await;
let mut pg = Config::new();
@ -22,7 +23,28 @@ async fn initialize_data() -> Arc<Pool> {
}
pub async fn get_connection_pool() -> Arc<Pool> {
CONNECTION_POOL.get_or_init(initialize_data)
CONNECTION_POOL.get_or_init(initialize_connection_pool)
.await
.clone()
}
async fn initialize_campaign_connection_pool() -> Arc<Pool> {
let config = get_config().await;
let mut pg = Config::new();
pg.host = Some(config.database.host.clone());
pg.user = Some(config.database.user.clone());
pg.password = Some(config.database.password.clone());
pg.dbname = Some(config.database.name.clone());
pg.port = Some(config.database.port as u16);
let pool = pg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap();
Arc::new(pool)
}
pub async fn get_campaign_connection_pool() -> Arc<Pool> {
CAMPAIGN_CONNECTION_POOL.get_or_init(initialize_campaign_connection_pool)
.await
.clone()
}

3
src/lib.rs

@ -8,4 +8,5 @@ pub mod env;
pub mod orchestration;
pub mod metrics;
pub mod parameter_store;
pub mod constants;
pub mod constants;
mod s3;

14
src/log.rs

@ -1,5 +1,6 @@
use std::env::current_exe;
use std::fs;
use std::path::Path;
use tracing::{error, Level, subscriber};
use tracing_appender::non_blocking::{WorkerGuard};
use tracing_appender::rolling::{RollingFileAppender, Rotation};
@ -58,7 +59,7 @@ pub async fn init(log_file_name: &str) -> WorkerGuard {
let mut level: Level = Level::INFO;
let log_level_parameter_key = get_log_level_parameter_key()
.expect_or_log("log level parameter key not found");
.expect("log level parameter key not found");
let log_level = get_parameter(log_level_parameter_key).await;
@ -69,11 +70,16 @@ pub async fn init(log_file_name: &str) -> WorkerGuard {
_ => {}
}
let log_dir = get_log_dir();
if !Path::new(&log_dir).exists() {
fs::create_dir_all(&log_dir).expect("failed to create log directory");
}
let file_appender = RollingFileAppender::builder()
.rotation(Rotation::DAILY)
.filename_suffix(log_file_name)
.build(get_log_dir())
.expect_or_log("failed to initialize rolling file appender");
.build(log_dir)
.expect("failed to initialize rolling file appender");
let (file_writer, _guard) = tracing_appender::non_blocking(file_appender);
@ -85,7 +91,7 @@ pub async fn init(log_file_name: &str) -> WorkerGuard {
.finish();
subscriber::set_global_default(subscriber)
.expect_or_log("trace init fail");
.expect("trace init fail");
_guard
}

30
src/s3.rs

@ -0,0 +1,30 @@
use std::sync::Arc;
use aws_credential_types::Credentials;
use aws_sdk_kinesis::{Client, Config};
use rusoto_core::{Region, HttpClient};
use rusoto_core::credential::{StaticProvider, ProvideAwsCredentials};
use rusoto_s3::{S3Client, S3, GetObjectRequest};
use tokio::io::AsyncReadExt;
use tokio::sync::OnceCell;
static AWS_S3_CLIENT: OnceCell<Arc<S3Client>> = OnceCell::const_new();
async fn initialize_data() -> Arc<S3Client> {
let config = crate::config::get_config()
.await;
let access_key_id = &config.kinesis_stream.node.access_key.clone();
let secret_access_key = &config.kinesis_stream.node.secret_key.clone();
let region = &config.kinesis_stream.node.region.clone();
let credentials_provider = StaticProvider::new_minimal(access_key_id.to_string(), secret_access_key.to_string());
let s3_client = S3Client::new_with(HttpClient::new().expect("failed to create request dispatcher"), credentials_provider, region.parse().unwrap());
Arc::new(s3_client)
}
async fn get_client() -> Arc<S3Client> {
AWS_S3_CLIENT.get_or_init(initialize_data)
.await
.clone()
}
Loading…
Cancel
Save