From ce7789b245c68405a129925f1a32c47c8c29e415 Mon Sep 17 00:00:00 2001 From: Ryan Date: Fri, 26 Jul 2024 13:48:48 +0900 Subject: [PATCH] first commit --- .gitignore | 53 ++++++++++ Cargo.toml | 36 +++++++ src/config.rs | 115 ++++++++++++++++++++ src/constants.rs | 31 ++++++ src/database.rs | 95 +++++++++++++++++ src/env.rs | 18 ++++ src/kinesis.rs | 52 +++++++++ src/lib.rs | 11 ++ src/log.rs | 98 +++++++++++++++++ src/metrics.rs | 42 ++++++++ src/model/error.rs | 34 ++++++ src/model/kinesis_message.rs | 52 +++++++++ src/model/kinesis_message_detail.rs | 41 ++++++++ src/model/mod.rs | 4 + src/model/orchestration.rs | 23 ++++ src/orchestration.rs | 157 ++++++++++++++++++++++++++++ src/parameter_store.rs | 91 ++++++++++++++++ 17 files changed, 953 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 src/config.rs create mode 100644 src/constants.rs create mode 100644 src/database.rs create mode 100644 src/env.rs create mode 100644 src/kinesis.rs create mode 100644 src/lib.rs create mode 100644 src/log.rs create mode 100644 src/metrics.rs create mode 100644 src/model/error.rs create mode 100644 src/model/kinesis_message.rs create mode 100644 src/model/kinesis_message_detail.rs create mode 100644 src/model/mod.rs create mode 100644 src/model/orchestration.rs create mode 100644 src/orchestration.rs create mode 100644 src/parameter_store.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..057aa6a --- /dev/null +++ b/.gitignore @@ -0,0 +1,53 @@ +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb### Rust template +# Generated by Cargo +# will have compiled files and executables +debug/ +release/ +target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb +*.idea + +.idea/ + +### macOS template +# General +.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two +Icon + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk + +# Gerrit +/codereview +/buildSrc +detekt.yml \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..6ae6568 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "device-common" +version = "0.1.0" +edition = "2021" +publish = ["nexus","https://maven.anypoint.tv/repository/crate/index"] +repository = "https://Ryan:HEG2XSI4fF88+zR2DkVuiEzNXRMvC9Bd077s7qMpyg@" +authors = ["Ryan Bae "] + +[dependencies] +serde = { version = "1.0.202", features = ["derive"] } +serde_json = "1.0.117" +bincode = "1.3.3" +aws-config = { version = "1.5.0" } +aws-sdk-kinesis = { version = "1.27.0", features = ["behavior-version-latest"] } +aws-sdk-ssm = "1.39" +aws-types="1.3" +aws-credential-types = { version = "1.2.0", features = ["hardcoded-credentials"] } +deadpool-postgres = "0.14" +tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-unwrap = "1.0" +tracing-appender = "0.2.3" +thiserror = "1.0" +tokio = "1.38.0" +lapin = "2.3" +sysinfo = "0.30" +prometheus = "0.13" +dashmap = "6.0" +toml = "0.8" + +[profile.dev] +opt-level = 0 + +[profile.release] +opt-level = 3 \ No newline at end of file diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..114782c --- /dev/null +++ b/src/config.rs @@ -0,0 +1,115 @@ +use std::sync::Arc; +use serde::{Deserialize, Serialize}; +use tracing_unwrap::ResultExt; +use tokio::sync::OnceCell; +use crate::constants; +use crate::parameter_store::get_parameter; + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct Log { + pub level: String, +} +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct Rabbitmq { + pub mapper: RabbitmqDetail, + pub orchestration: RabbitmqDetail +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct RabbitmqDetail { + pub host: String, + pub port: i64, + pub username: String, + pub password: String, + pub exchange: String, + pub queue: String, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct Database { + pub host: String, + pub port: i64, + pub user: String, + pub password: String, + pub name: String, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct AwsKinesisStreamDetail { + pub access_key: String, + pub secret_key: String, + pub region: String, + pub name: String, + pub partition_key: String, + pub arn: String, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct AwsKinesisStream { + pub node: AwsKinesisStreamDetail, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct Config { + pub database: Database, + pub rabbitmq: Rabbitmq, + pub kinesis_stream: AwsKinesisStream, +} + +static CONFIG: OnceCell> = OnceCell::const_new(); + +async fn initialize_data() -> Arc { + let config = Config { + database: Database { + host: get_parameter(constants::DB_HOST_KEY.to_string()).await, + port: get_parameter( constants::DB_PORT_KEY.to_string()) + .await + .parse() + .unwrap_or_log(), + user: get_parameter(constants::DB_USER_KEY.to_string()).await, + password: get_parameter(constants::DB_PASSWORD_KEY.to_string()).await, + name: get_parameter(constants::DB_NAME_KEY.to_string()).await, + }, + rabbitmq: Rabbitmq { + mapper: RabbitmqDetail { + host: get_parameter(constants::RABBITMQ_HOST_KEY.to_string()).await, + port: get_parameter(constants::RABBITMQ_PORT_KEY.to_string()) + .await + .parse() + .unwrap_or_log(), + username: get_parameter(constants::RABBITMQ_USER_KEY.to_string()).await, + password: get_parameter(constants::RABBITMQ_PASSWORD_KEY.to_string()).await, + exchange: get_parameter(constants::RABBITMQ_MAPPER_EXCHANGE_KEY.to_string()).await, + queue: get_parameter(constants::RABBITMQ_MAPPER_QUEUE_KEY.to_string()).await, + }, + + orchestration: RabbitmqDetail { + host: get_parameter(constants::RABBITMQ_HOST_KEY.to_string()).await, + port: get_parameter(constants::RABBITMQ_PORT_KEY.to_string()) + .await + .parse() + .unwrap_or_log(), + username: get_parameter(constants::RABBITMQ_USER_KEY.to_string()).await, + password: get_parameter(constants::RABBITMQ_PASSWORD_KEY.to_string()).await, + exchange: get_parameter(constants::RABBITMQ_ORCHESTRATION_EXCHANGE_KEY.to_string()).await, + queue: get_parameter(constants::RABBITMQ_ORCHESTRATION_QUEUE_KEY.to_string()).await, + }, + }, + kinesis_stream: AwsKinesisStream { + node: AwsKinesisStreamDetail { + access_key: crate::env::get(constants::ENV_AWS_ACCESS_KEY).expect_or_log("env aws_access_key not found"), + secret_key: crate::env::get(constants::ENV_AWS_SECRET_KEY).expect_or_log("env aws_secret_key not found"), + region: crate::env::get(constants::ENV_AWS_REGION).expect_or_log("env aws_region not found"), + name: get_parameter(constants::KINESIS_PERSISTENCE_NAME_KEY.to_string()).await, + partition_key: get_parameter(constants::KINESIS_PERSISTENCE_PARTITION_KEY.to_string()).await, + arn: get_parameter(constants::KINESIS_PERSISTENCE_ARN_KEY.to_string()).await, + }, + }, + }; + + Arc::new(config) +} + +pub async fn get_config() -> Arc { + CONFIG.get_or_init(initialize_data).await.clone() +} \ No newline at end of file diff --git a/src/constants.rs b/src/constants.rs new file mode 100644 index 0000000..d7a7d99 --- /dev/null +++ b/src/constants.rs @@ -0,0 +1,31 @@ +pub const GATEWAY_LOG_LEVEL_KEY: &str = "/flower/device/gateway/log/level"; +pub const NODE_LOG_LEVEL_KEY: &str = "/flower/device/node/log/level"; +pub const TICKET_LOG_LEVEL_KEY: &str = "/flower/device/ticket/log/level"; +pub const FAILOVER_LOG_LEVEL_KEY: &str = "/flower/device/failover/log/level"; + +pub const RABBITMQ_HOST_KEY: &str = "/flower/device/common/rabbitmq/host"; +pub const RABBITMQ_PORT_KEY: &str = "/flower/device/common/rabbitmq/port"; +pub const RABBITMQ_USER_KEY: &str = "/flower/device/common/rabbitmq/user"; +pub const RABBITMQ_PASSWORD_KEY: &str = "/flower/device/common/rabbitmq/password"; +pub const RABBITMQ_MAPPER_EXCHANGE_KEY: &str = "/flower/device/mapper/rabbitmq/exchange"; +pub const RABBITMQ_MAPPER_QUEUE_KEY: &str = "/flower/device/mapper/rabbitmq/queue"; +pub const RABBITMQ_ORCHESTRATION_EXCHANGE_KEY: &str = "/flower/device/orchestration/rabbitmq/exchange"; +pub const RABBITMQ_ORCHESTRATION_QUEUE_KEY: &str = "/flower/device/orchestration/rabbitmq/queue"; + +pub const KINESIS_PERSISTENCE_NAME_KEY: &str = "/flower/device/persistence/kinesis/name"; +pub const KINESIS_PERSISTENCE_PARTITION_KEY: &str = "/flower/device/persistence/kinesis/partition_key"; +pub const KINESIS_PERSISTENCE_ARN_KEY: &str = "/flower/device/persistence/kinesis/arn"; + +pub const KINESIS_FAILOVER_NAME_KEY: &str = "/flower/device/failover/kinesis/name"; +pub const KINESIS_FAILOVER_PARTITION_KEY: &str = "/flower/device/failover/kinesis/partition_key"; +pub const KINESIS_FAILOVER_ARN_KEY: &str = "/flower/device/failover/kinesis/arn"; + +pub const DB_HOST_KEY: &str = "/flower/device/common/db/host"; +pub const DB_PORT_KEY: &str = "/flower/device/common/db/port"; +pub const DB_USER_KEY: &str = "/flower/device/common/db/user"; +pub const DB_PASSWORD_KEY: &str = "/flower/device/common/db/password"; +pub const DB_NAME_KEY: &str = "/flower/device/common/db/name"; + +pub const ENV_AWS_ACCESS_KEY: &str = "AWS_ACCESS_KEY"; +pub const ENV_AWS_SECRET_KEY: &str = "AWS_SECRET_KEY"; +pub const ENV_AWS_REGION: &str = "AWS_REGION"; \ No newline at end of file diff --git a/src/database.rs b/src/database.rs new file mode 100644 index 0000000..dc298b0 --- /dev/null +++ b/src/database.rs @@ -0,0 +1,95 @@ +use std::sync::Arc; +use deadpool_postgres::{Pool, Config, Runtime}; +use tokio::sync::OnceCell; +use tokio_postgres::NoTls; +use crate::config::get_config; + +static CONNECTION_POOL: OnceCell> = OnceCell::const_new(); + +async fn initialize_data() -> Arc { + let config = get_config().await; + + let mut pg = Config::new(); + pg.host = Some(config.database.host.clone()); + pg.user = Some(config.database.user.clone()); + pg.password = Some(config.database.password.clone()); + pg.dbname = Some(config.database.name.clone()); + pg.port = Some(config.database.port as u16); + + let pool = pg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap(); + + Arc::new(pool) +} + +/* +pub async fn initialize_data() -> Arc { + let config = get_config().await; + + let mut current_dir = current_exe().unwrap(); + current_dir.pop(); + current_dir.push("db.pem"); + let cert_file = File::open(current_dir.display().to_string()).unwrap(); + let mut buf = BufReader::new(cert_file); + let mut root_store = rustls::RootCertStore::empty(); + for cert in rustls_pemfile::certs(&mut buf) { + root_store.add(cert.unwrap()).unwrap(); + } + + let tls_config = rustls::ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth(); + + let tls = MakeRustlsConnect::new(tls_config); + + let mut pg = deadpool_postgres::Config::new(); + pg.host = Some(config.database.host.clone()); + pg.user = Some(config.database.user.clone()); + pg.password = Some(config.database.password.clone()); + pg.dbname = Some(config.database.name.clone()); + pg.port = Some(config.database.port as u16); + + let pool = pg.create_pool(Some(Runtime::Tokio1), tls).unwrap(); + + Arc::new(pool) +} +*/ + +/* +pub async fn initialize_data() -> Arc { + let config = get_config().await; + + let mut current_dir = current_exe().unwrap(); + current_dir.pop(); + current_dir.push("db.pem"); + let cert_file = File::open(current_dir.display().to_string()).unwrap(); + let mut buf = BufReader::new(cert_file); + let mut root_store = rustls::RootCertStore::empty(); + for cert in rustls_pemfile::certs(&mut buf) { + root_store.add(cert.unwrap()).unwrap(); + } + + let tls_config = TlsConnector::builder() + .danger_accept_invalid_certs(true) + .build() + .expect_or_log(""); + + let tls = MakeTlsConnector::new(tls_config); + + let mut pg = deadpool_postgres::Config::new(); + pg.host = Some(config.database.host.clone()); + pg.user = Some(config.database.user.clone()); + pg.password = Some(config.database.password.clone()); + pg.dbname = Some(config.database.name.clone()); + pg.port = Some(config.database.port as u16); + + let pool = pg.create_pool(Some(Runtime::Tokio1), tls).unwrap(); + + Arc::new(pool) +} +*/ + +pub async fn get_connection_pool() -> Arc { + CONNECTION_POOL.get_or_init(initialize_data) + .await + .clone() +} \ No newline at end of file diff --git a/src/env.rs b/src/env.rs new file mode 100644 index 0000000..e32d8af --- /dev/null +++ b/src/env.rs @@ -0,0 +1,18 @@ +use std::env; +use std::error::Error; +use std::str::FromStr; + +pub fn set(key: &str, val: &str) { + env::set_var(key, val); +} + +pub fn get(key: &str) -> Result> { + let ret = env::var(key)?; + Ok(ret) +} + +pub fn get_i64(key: &str) -> Result> { + let str_env = env::var(key)?; + let ret = i64::from_str(&str_env)?; + Ok(ret) +} \ No newline at end of file diff --git a/src/kinesis.rs b/src/kinesis.rs new file mode 100644 index 0000000..9d4ba2c --- /dev/null +++ b/src/kinesis.rs @@ -0,0 +1,52 @@ +use std::sync::Arc; +use aws_config::Region; +use aws_sdk_kinesis::{Client, Config}; +use aws_sdk_kinesis::primitives::Blob; +use aws_credential_types::Credentials; +use tokio::sync::OnceCell; +use crate::model::error::DeviceError; +use crate::model::kinesis_message::KinesisMessage; + +static AWS_KINESIS_CONSUMER_CLIENT: OnceCell> = OnceCell::const_new(); + +async fn initialize_data() -> Arc { + let config = crate::config::get_config() + .await; + + let access_key_id = &config.kinesis_stream.node.access_key.clone(); + let secret_access_key = &config.kinesis_stream.node.secret_key.clone(); + let region = &config.kinesis_stream.node.region.clone(); + + let credentials = Credentials::from_keys(access_key_id, secret_access_key, None); + let aws_config = Config::builder() + .credentials_provider(credentials) + .region(Region::new(region.clone())) + .build(); + + Arc::new(Client::from_conf(aws_config)) +} + +async fn get_client() -> Arc { + AWS_KINESIS_CONSUMER_CLIENT.get_or_init(initialize_data) + .await + .clone() +} + +pub async fn kinesis_put_record(msg: &KinesisMessage) -> Result<(), DeviceError> { + let blob = msg.encode(); + let config = crate::config::get_config() + .await + .clone(); + + get_client() + .await + .put_record() + .data(Blob::new(blob)) + .partition_key(&config.kinesis_stream.node.partition_key) + .stream_name(&config.kinesis_stream.node.name) + .stream_arn(&config.kinesis_stream.node.arn) + .send() + .await?; + + Ok(()) +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..39c0268 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,11 @@ + +pub mod config; +pub mod model; +pub mod database; +pub mod kinesis; +pub mod log; +pub mod env; +pub mod orchestration; +pub mod metrics; +pub mod parameter_store; +pub mod constants; \ No newline at end of file diff --git a/src/log.rs b/src/log.rs new file mode 100644 index 0000000..dad4ae2 --- /dev/null +++ b/src/log.rs @@ -0,0 +1,98 @@ +use std::env::current_exe; +use std::fs; +use tracing::{Level, subscriber}; +use tracing_appender::non_blocking::{WorkerGuard}; +use tracing_appender::rolling::{RollingFileAppender, Rotation}; +use tracing_unwrap::{OptionExt, ResultExt}; +use crate::constants::{FAILOVER_LOG_LEVEL_KEY, GATEWAY_LOG_LEVEL_KEY, NODE_LOG_LEVEL_KEY, TICKET_LOG_LEVEL_KEY}; +use crate::parameter_store::get_parameter; + +fn read_cargo_toml() -> toml::Value{ + let cargo_toml_content = fs::read_to_string("Cargo.toml") + .expect_or_log("Failed to read Cargo.toml file"); + + let cargo_toml: toml::Value = toml::from_str(&cargo_toml_content) + .expect_or_log("Failed to parse Cargo.toml"); + + cargo_toml +} + +fn get_app_name() -> Option { + let cargo_toml = read_cargo_toml(); + + let package = cargo_toml + .get("package") + .expect("`[package]` section is missing in Cargo.toml"); + + let name = package + .get("name") + .expect_or_log("`name` field is missing in Cargo.toml"); + + let name_str = name + .as_str() + .expect_or_log("Package name is not a string"); + + Option::from(name_str.to_string()) +} + +fn get_log_level_parameter_key() -> Option { + let mut ret: Option = None; + + match get_app_name() { + None => { ret = None } + Some(app_name) => { + match app_name.as_str() { + "device-gateway" => { ret = Option::from(GATEWAY_LOG_LEVEL_KEY.to_string()) } + "device-node" => { ret = Option::from(NODE_LOG_LEVEL_KEY.to_string()) } + "device-ticket" => { ret = Option::from(TICKET_LOG_LEVEL_KEY.to_string()) } + "device-failover" => { ret = Option::from(FAILOVER_LOG_LEVEL_KEY.to_string()) } + _ => { return ret } + } + } + } + + ret +} + +pub async fn init(log_file_name: &str) -> WorkerGuard { + let mut level: Level = Level::INFO; + + let log_level_parameter_key = get_log_level_parameter_key() + .expect_or_log("log level parameter key not found"); + + let log_level = get_parameter(log_level_parameter_key).await; + + match log_level.to_lowercase().as_str() { + "debug" => { level = Level::DEBUG }, + "info" => { level = Level::INFO }, + "warn" => { level = Level::WARN }, + _ => {} + } + + let file_appender = RollingFileAppender::builder() + .rotation(Rotation::DAILY) + .filename_suffix(log_file_name) + .build(get_log_dir()) + .expect_or_log("failed to initialize rolling file appender"); + + let (file_writer, _guard) = tracing_appender::non_blocking(file_appender); + + let subscriber = tracing_subscriber::fmt() + .with_max_level(level) + .with_writer(file_writer) + //.with_writer(console_writer) + .with_ansi(false) + .finish(); + + subscriber::set_global_default(subscriber) + .expect_or_log("trace init fail"); + + _guard +} + +fn get_log_dir() -> String { + let mut current_dir = current_exe().unwrap(); + current_dir.pop(); + current_dir.push("log"); + current_dir.display().to_string() +} \ No newline at end of file diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 0000000..d000783 --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,42 @@ +use sysinfo::System; +use prometheus::{register_gauge, Gauge}; +use std::sync::OnceLock; + +pub fn get_cpu_usage() -> &'static Gauge { + static GAUGE: OnceLock = OnceLock::new(); + GAUGE.get_or_init(|| { + register_gauge!("cpu_usage", "CPU Usage").unwrap() + }) +} + +pub fn get_memory_used() -> &'static Gauge { + static GAUGE: OnceLock = OnceLock::new(); + GAUGE.get_or_init(|| { + register_gauge!("memory_used", "Memory Used").unwrap() + }) +} + +pub fn get_memory_total() -> &'static Gauge { + static GAUGE: OnceLock = OnceLock::new(); + GAUGE.get_or_init(|| { + register_gauge!("memory_total", "Memory Total").unwrap() + }) +} + +pub fn get_memory_usage() -> &'static Gauge { + static GAUGE: OnceLock = OnceLock::new(); + GAUGE.get_or_init(|| { + register_gauge!("memory_usage", "Memory Usage").unwrap() + }) +} + +pub fn collect(system: &mut System) -> (f64, f64, f64, f64) { + system.refresh_all(); + + let cpu_usage = system.global_cpu_info().cpu_usage() as f64; + let memory_used = system.used_memory() as f64; + let memory_total = system.total_memory() as f64; + let memory_usage = memory_used / memory_total * 100.0; + + (cpu_usage, memory_used, memory_total, memory_usage) +} \ No newline at end of file diff --git a/src/model/error.rs b/src/model/error.rs new file mode 100644 index 0000000..3329658 --- /dev/null +++ b/src/model/error.rs @@ -0,0 +1,34 @@ +use aws_sdk_kinesis::operation::put_record::PutRecordError; +use aws_sdk_ssm::operation::get_parameter::GetParameterError; +use thiserror::Error; +use tokio_postgres::Error as PostgresError; + +#[derive(Error, Debug)] +pub enum DeviceError { + #[error("flower-device-app / An error occurred: {0}")] + GeneralError(String), + + #[error("I/O error")] + IoError(#[from] std::io::Error), + + #[error("VarError")] + VarError(#[from] std::env::VarError), + + #[error("Parse error: {0}")] + ParseError(#[from] std::num::ParseIntError), + + #[error("Database error: {0}")] + DatabaseError(#[from] PostgresError), + + #[error("RabbitMq error: {0}")] + RabbitMqError(#[from] lapin::Error), + + #[error("Kinesis Stream error: {0}")] + PutRecordError(#[from] aws_sdk_kinesis::error::SdkError), + + #[error("Parameter Store error: {0}")] + GetParameterError(#[from] aws_sdk_ssm::error::SdkError), + + #[error("serde_json error: {0}")] + SerdeJsonError(#[from] serde_json::error::Error) +} \ No newline at end of file diff --git a/src/model/kinesis_message.rs b/src/model/kinesis_message.rs new file mode 100644 index 0000000..0597bb7 --- /dev/null +++ b/src/model/kinesis_message.rs @@ -0,0 +1,52 @@ +use std::mem::size_of; +use serde::{Deserialize, Serialize}; + +#[repr(u8)] +#[derive(Debug, PartialEq)] +pub enum KinesisMessageType { + DeviceUpsert = 0x01 +} + +#[repr(C)] +#[derive(Serialize, Deserialize, PartialEq, Debug)] +pub struct KinesisMessage { + pub action: u8, // data type + pub size: i32, // data size + pub data: Vec // data +} + +impl KinesisMessage { + pub fn new(action: u8, size: i32, data: Vec) -> KinesisMessage { + KinesisMessage { + action, + size, + data, + } + } + + pub fn encode(&self) -> Vec { + let mut bin = Vec::new(); + bin.push(self.action); + bin.extend_from_slice(&self.size.to_ne_bytes()); + bin.extend_from_slice(&self.data); + bin + } + + pub fn decode(encoded: Vec) -> KinesisMessage { + let mut current_index = 0; + let action = encoded[current_index]; + current_index = current_index + size_of::(); + + let size_vec = encoded.get(current_index..current_index + size_of::()).unwrap().try_into().unwrap(); + let size = i32::from_ne_bytes(size_vec); + current_index = current_index + size_of::(); + + let data = encoded[current_index..current_index + size as usize].to_vec(); + + KinesisMessage { + action, + size, + data, + } + } +} \ No newline at end of file diff --git a/src/model/kinesis_message_detail.rs b/src/model/kinesis_message_detail.rs new file mode 100644 index 0000000..df94111 --- /dev/null +++ b/src/model/kinesis_message_detail.rs @@ -0,0 +1,41 @@ +use serde::{Deserialize, Serialize}; + +pub trait KinesisMessageDetail { + fn encode(&self) -> Vec; + fn decode(encoded: Vec) -> T; +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +#[repr(C)] +pub struct UpsertKinesisMessageDetail { + pub finger_print: String, + pub so_id: String, + pub uuid: String, + pub use_personalized_ad: bool, + pub test: Option, + pub zip_code: Option, + pub free_storage: i64, + pub used_storage: i64, + pub cached_storage: i64, + pub model_name: String, + pub firmware_build_date: Option, + pub firmware_ver: Option, + pub full_firmware_ver: Option, + pub app_version: String, + pub platform_ad_id: Option, + pub a_key: Option, + pub is_exist: bool, + pub device_id: i64 +} + +impl KinesisMessageDetail for UpsertKinesisMessageDetail +where + T: Serialize + serde::de::DeserializeOwned,{ + fn encode(&self) -> Vec { + bincode::serialize(&self).unwrap() + } + + fn decode(encoded: Vec) -> T { + bincode::deserialize(&encoded).unwrap() + } +} \ No newline at end of file diff --git a/src/model/mod.rs b/src/model/mod.rs new file mode 100644 index 0000000..6d2e461 --- /dev/null +++ b/src/model/mod.rs @@ -0,0 +1,4 @@ +pub mod kinesis_message_detail; +pub mod kinesis_message; +pub mod error; +pub mod orchestration; \ No newline at end of file diff --git a/src/model/orchestration.rs b/src/model/orchestration.rs new file mode 100644 index 0000000..d605905 --- /dev/null +++ b/src/model/orchestration.rs @@ -0,0 +1,23 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Clone)] +pub struct OrchestrationConfig { + pub quotient: i64, + pub shard_count: i32, +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct OrchestrationNodeInfo { + pub id: i64, + pub platform_id: i32, + pub remains: Vec, + pub endpoint: String, + pub status: i32 +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct OrchestrationGatewayInfo { + pub id: i64, + pub endpoint: String, + pub ticket_endpoint: String, +} \ No newline at end of file diff --git a/src/orchestration.rs b/src/orchestration.rs new file mode 100644 index 0000000..5ca29b5 --- /dev/null +++ b/src/orchestration.rs @@ -0,0 +1,157 @@ +use std::env; +use std::str::FromStr; +use std::sync::{Arc, Mutex, OnceLock}; +use dashmap::DashMap; +use tracing::info; +use crate::model::error::DeviceError; +use crate::model::orchestration::{OrchestrationConfig, OrchestrationGatewayInfo, OrchestrationNodeInfo}; + +pub fn get_orchestration_config() -> &'static Arc> { + static ORCHESTRATION_CONFIG: OnceLock>> = OnceLock::new(); + ORCHESTRATION_CONFIG.get_or_init(|| Arc::new(Mutex::new(OrchestrationConfig { + quotient: 0, + shard_count: 0, + }))) +} + +pub fn get_orchestration_node_info() -> &'static Arc> { + static ORCHESTRATION_NODE_INFO: OnceLock>>= OnceLock::new(); + ORCHESTRATION_NODE_INFO.get_or_init(|| Arc::new(DashMap::new())) +} + +pub fn get_orchestration_gateway_info() -> &'static Arc> { + static ORCHESTRATION_GATEWAY_INFO: OnceLock>> = OnceLock::new(); + ORCHESTRATION_GATEWAY_INFO.get_or_init(|| Arc::new(DashMap::new())) +} + +pub async fn load_orchestration_config() -> Result<(), DeviceError> { + let pool = crate::database::get_connection_pool().await; + let client = pool.get().await.unwrap(); + let fetch_query = "SELECT * FROM orchestration_config"; + + let result = client + .query(fetch_query, &[]) + .await?; + + match result.get(0) { + None => { + return Err(DeviceError::GeneralError(String::from("empty orchestration config"))) + } + Some(v) => { + let mut config = get_orchestration_config().lock().unwrap(); + config.quotient = v.get("quotient"); + config.shard_count = v.get("shard_count"); + } + } + + Ok(()) +} + +pub async fn load_orchestration_node_info() -> Result<(), DeviceError> { + let pool = crate::database::get_connection_pool().await; + let client = pool.get().await.unwrap(); + let fetch_query = "SELECT * FROM orchestration_node_info"; + + let result = client + .query(fetch_query, &[]) + .await?; + + let cache = get_orchestration_node_info(); + cache.clear(); + + let mut count = 0; + for row in result.into_iter() { + let id: i64 = row.get("id"); + + let mut node_info = OrchestrationNodeInfo { + id, + platform_id: row.get("platform_id"), + remains: vec![], + endpoint: row.get("endpoint"), + status: row.get("status"), + }; + + let remain_str: String = row.get("remains"); + let remains: Vec<&str> = remain_str.split(',').collect(); + let int_vec = remains + .iter() + .map(|s| s.parse::().expect("parse error : invalid node id")) + .collect(); + node_info.remains = int_vec; + + cache.insert(id, node_info); + count += 1; + } + + info!("node_info loaded, count : {}", count); + + Ok(()) +} + +pub async fn load_orchestration_gateway_info() -> Result<(), DeviceError> { + let pool = crate::database::get_connection_pool().await; + let client = pool.get().await.unwrap(); + let fetch_query = "SELECT * FROM orchestration_gateway_info"; + + let result = client + .query(fetch_query, &[]) + .await?; + + let cache = get_orchestration_gateway_info(); + cache.clear(); + + let mut count = 0; + for row in result.into_iter() { + let id: i64 = row.get("id"); + + let gateway_info = OrchestrationGatewayInfo { + id, + endpoint: row.get("endpoint"), + ticket_endpoint: row.get("ticket_endpoint") + }; + + cache.insert(id, gateway_info); + count += 1; + } + + info!("gateway_info loaded, count : {}", count); + + Ok(()) +} + +pub fn get_node_info(id: i64) -> Option { + let node_cache = get_orchestration_node_info(); + + match node_cache.get_mut(&id) { + None => { None } + Some(node) => { + Option::from(node.value().clone()) + } + } +} + +pub fn get_node_info_by_device_id(device_id: i64) -> Option { + let node_cache = get_orchestration_node_info(); + + for node in node_cache.iter() { + let remain = get_orchestration_config() + .lock() + .unwrap() + .quotient % device_id; + + match node.remains.iter().find(|&&x| x == remain) { + Some(_) => { + return Option::from(node.value().clone()) + } + _ => {} + } + } + + None +} + +pub fn get_current_app_id() -> Result { + let id_str = env::var("ID")?; + let id = i64::from_str(&id_str)?; + Ok(id) +} \ No newline at end of file diff --git a/src/parameter_store.rs b/src/parameter_store.rs new file mode 100644 index 0000000..9c21abb --- /dev/null +++ b/src/parameter_store.rs @@ -0,0 +1,91 @@ +use std::sync::Arc; +use aws_config::meta::region::RegionProviderChain; +use aws_credential_types::Credentials; +use aws_sdk_ssm::Client; +use aws_types::region::Region; +use tokio::sync::OnceCell; +use tracing_unwrap::{OptionExt, ResultExt}; +use crate::constants; + +static AWS_SSM_CLIENT: OnceCell> = OnceCell::const_new(); + +fn get_aws_credential() -> (String, String, String) { + let access_key = crate::env::get(constants::ENV_AWS_ACCESS_KEY) + .expect_or_log("env not found: AWS_ACCESS_KEY"); + let secret_key = crate::env::get(constants::ENV_AWS_SECRET_KEY) + .expect_or_log("env not found: AWS_SECRET_KEY"); + let region = crate::env::get(constants::ENV_AWS_REGION) + .expect_or_log("env not found: AWS_REGION"); + + (access_key, secret_key, region) +} + +async fn initialize_data() -> Arc { + let (access_key, secret_key, region) = get_aws_credential(); + + let region_provider = RegionProviderChain::default_provider().or_else(Region::new(region)); + let region = region_provider.region().await.unwrap(); + + let credentials = Credentials::from_keys( + access_key, + secret_key, + None, + ); + + let config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region(region) + .credentials_provider(credentials) + .load() + .await; + + Arc::new(Client::new(&config)) +} + +async fn get_client() -> Arc { + AWS_SSM_CLIENT.get_or_init(initialize_data) + .await + .clone() +} + +pub fn get_prefix() -> String { + let mut profile: String = "".to_string(); + let name = "RUST_PROFILE"; + + match std::env::var(name) { + Ok(v) => profile = v, + Err(e) => panic!("${} is not set ({})", name, e), + } + + let mut prefix: String = "".to_string(); + match profile.as_str() { + "local" => { + prefix = "/dev".to_string(); + }, + _ => { + prefix = profile + } + }; + + prefix +} + +pub async fn get_parameter(key: String) -> String { + let key_profile = get_prefix() + &key; + let parameter_output = get_client() + .await + .get_parameter() + .name(key_profile) + .send() + .await + .expect_or_log("Parameter request fail"); + + let parameter = parameter_output + .parameter() + .expect_or_log("Parameter not found."); + + let parameter_value = parameter + .value() + .expect_or_log("Parameter has no value."); + + parameter_value.to_string() +} \ No newline at end of file