Browse Source

first commit

master 0.1.0
Ryan 1 year ago
commit
ce7789b245
  1. 53
      .gitignore
  2. 36
      Cargo.toml
  3. 115
      src/config.rs
  4. 31
      src/constants.rs
  5. 95
      src/database.rs
  6. 18
      src/env.rs
  7. 52
      src/kinesis.rs
  8. 11
      src/lib.rs
  9. 98
      src/log.rs
  10. 42
      src/metrics.rs
  11. 34
      src/model/error.rs
  12. 52
      src/model/kinesis_message.rs
  13. 41
      src/model/kinesis_message_detail.rs
  14. 4
      src/model/mod.rs
  15. 23
      src/model/orchestration.rs
  16. 157
      src/orchestration.rs
  17. 91
      src/parameter_store.rs

53
.gitignore vendored

@ -0,0 +1,53 @@
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb### Rust template
# Generated by Cargo
# will have compiled files and executables
debug/
release/
target/
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock
# These are backup files generated by rustfmt
**/*.rs.bk
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb
*.idea
.idea/
### macOS template
# General
.DS_Store
.AppleDouble
.LSOverride
# Icon must end with two
Icon
# Thumbnails
._*
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
# Gerrit
/codereview
/buildSrc
detekt.yml

36
Cargo.toml

@ -0,0 +1,36 @@
[package]
name = "device-common"
version = "0.1.0"
edition = "2021"
publish = ["nexus","https://maven.anypoint.tv/repository/crate/index"]
repository = "https://Ryan:HEG2XSI4fF88+zR2DkVuiEzNXRMvC9Bd077s7qMpyg@"
authors = ["Ryan Bae <jh.bae@anypointmedia.com>"]
[dependencies]
serde = { version = "1.0.202", features = ["derive"] }
serde_json = "1.0.117"
bincode = "1.3.3"
aws-config = { version = "1.5.0" }
aws-sdk-kinesis = { version = "1.27.0", features = ["behavior-version-latest"] }
aws-sdk-ssm = "1.39"
aws-types="1.3"
aws-credential-types = { version = "1.2.0", features = ["hardcoded-credentials"] }
deadpool-postgres = "0.14"
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-unwrap = "1.0"
tracing-appender = "0.2.3"
thiserror = "1.0"
tokio = "1.38.0"
lapin = "2.3"
sysinfo = "0.30"
prometheus = "0.13"
dashmap = "6.0"
toml = "0.8"
[profile.dev]
opt-level = 0
[profile.release]
opt-level = 3

115
src/config.rs

@ -0,0 +1,115 @@
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use tracing_unwrap::ResultExt;
use tokio::sync::OnceCell;
use crate::constants;
use crate::parameter_store::get_parameter;
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct Log {
pub level: String,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct Rabbitmq {
pub mapper: RabbitmqDetail,
pub orchestration: RabbitmqDetail
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct RabbitmqDetail {
pub host: String,
pub port: i64,
pub username: String,
pub password: String,
pub exchange: String,
pub queue: String,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct Database {
pub host: String,
pub port: i64,
pub user: String,
pub password: String,
pub name: String,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct AwsKinesisStreamDetail {
pub access_key: String,
pub secret_key: String,
pub region: String,
pub name: String,
pub partition_key: String,
pub arn: String,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct AwsKinesisStream {
pub node: AwsKinesisStreamDetail,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct Config {
pub database: Database,
pub rabbitmq: Rabbitmq,
pub kinesis_stream: AwsKinesisStream,
}
static CONFIG: OnceCell<Arc<Config>> = OnceCell::const_new();
async fn initialize_data() -> Arc<Config> {
let config = Config {
database: Database {
host: get_parameter(constants::DB_HOST_KEY.to_string()).await,
port: get_parameter( constants::DB_PORT_KEY.to_string())
.await
.parse()
.unwrap_or_log(),
user: get_parameter(constants::DB_USER_KEY.to_string()).await,
password: get_parameter(constants::DB_PASSWORD_KEY.to_string()).await,
name: get_parameter(constants::DB_NAME_KEY.to_string()).await,
},
rabbitmq: Rabbitmq {
mapper: RabbitmqDetail {
host: get_parameter(constants::RABBITMQ_HOST_KEY.to_string()).await,
port: get_parameter(constants::RABBITMQ_PORT_KEY.to_string())
.await
.parse()
.unwrap_or_log(),
username: get_parameter(constants::RABBITMQ_USER_KEY.to_string()).await,
password: get_parameter(constants::RABBITMQ_PASSWORD_KEY.to_string()).await,
exchange: get_parameter(constants::RABBITMQ_MAPPER_EXCHANGE_KEY.to_string()).await,
queue: get_parameter(constants::RABBITMQ_MAPPER_QUEUE_KEY.to_string()).await,
},
orchestration: RabbitmqDetail {
host: get_parameter(constants::RABBITMQ_HOST_KEY.to_string()).await,
port: get_parameter(constants::RABBITMQ_PORT_KEY.to_string())
.await
.parse()
.unwrap_or_log(),
username: get_parameter(constants::RABBITMQ_USER_KEY.to_string()).await,
password: get_parameter(constants::RABBITMQ_PASSWORD_KEY.to_string()).await,
exchange: get_parameter(constants::RABBITMQ_ORCHESTRATION_EXCHANGE_KEY.to_string()).await,
queue: get_parameter(constants::RABBITMQ_ORCHESTRATION_QUEUE_KEY.to_string()).await,
},
},
kinesis_stream: AwsKinesisStream {
node: AwsKinesisStreamDetail {
access_key: crate::env::get(constants::ENV_AWS_ACCESS_KEY).expect_or_log("env aws_access_key not found"),
secret_key: crate::env::get(constants::ENV_AWS_SECRET_KEY).expect_or_log("env aws_secret_key not found"),
region: crate::env::get(constants::ENV_AWS_REGION).expect_or_log("env aws_region not found"),
name: get_parameter(constants::KINESIS_PERSISTENCE_NAME_KEY.to_string()).await,
partition_key: get_parameter(constants::KINESIS_PERSISTENCE_PARTITION_KEY.to_string()).await,
arn: get_parameter(constants::KINESIS_PERSISTENCE_ARN_KEY.to_string()).await,
},
},
};
Arc::new(config)
}
pub async fn get_config() -> Arc<Config> {
CONFIG.get_or_init(initialize_data).await.clone()
}

31
src/constants.rs

@ -0,0 +1,31 @@
pub const GATEWAY_LOG_LEVEL_KEY: &str = "/flower/device/gateway/log/level";
pub const NODE_LOG_LEVEL_KEY: &str = "/flower/device/node/log/level";
pub const TICKET_LOG_LEVEL_KEY: &str = "/flower/device/ticket/log/level";
pub const FAILOVER_LOG_LEVEL_KEY: &str = "/flower/device/failover/log/level";
pub const RABBITMQ_HOST_KEY: &str = "/flower/device/common/rabbitmq/host";
pub const RABBITMQ_PORT_KEY: &str = "/flower/device/common/rabbitmq/port";
pub const RABBITMQ_USER_KEY: &str = "/flower/device/common/rabbitmq/user";
pub const RABBITMQ_PASSWORD_KEY: &str = "/flower/device/common/rabbitmq/password";
pub const RABBITMQ_MAPPER_EXCHANGE_KEY: &str = "/flower/device/mapper/rabbitmq/exchange";
pub const RABBITMQ_MAPPER_QUEUE_KEY: &str = "/flower/device/mapper/rabbitmq/queue";
pub const RABBITMQ_ORCHESTRATION_EXCHANGE_KEY: &str = "/flower/device/orchestration/rabbitmq/exchange";
pub const RABBITMQ_ORCHESTRATION_QUEUE_KEY: &str = "/flower/device/orchestration/rabbitmq/queue";
pub const KINESIS_PERSISTENCE_NAME_KEY: &str = "/flower/device/persistence/kinesis/name";
pub const KINESIS_PERSISTENCE_PARTITION_KEY: &str = "/flower/device/persistence/kinesis/partition_key";
pub const KINESIS_PERSISTENCE_ARN_KEY: &str = "/flower/device/persistence/kinesis/arn";
pub const KINESIS_FAILOVER_NAME_KEY: &str = "/flower/device/failover/kinesis/name";
pub const KINESIS_FAILOVER_PARTITION_KEY: &str = "/flower/device/failover/kinesis/partition_key";
pub const KINESIS_FAILOVER_ARN_KEY: &str = "/flower/device/failover/kinesis/arn";
pub const DB_HOST_KEY: &str = "/flower/device/common/db/host";
pub const DB_PORT_KEY: &str = "/flower/device/common/db/port";
pub const DB_USER_KEY: &str = "/flower/device/common/db/user";
pub const DB_PASSWORD_KEY: &str = "/flower/device/common/db/password";
pub const DB_NAME_KEY: &str = "/flower/device/common/db/name";
pub const ENV_AWS_ACCESS_KEY: &str = "AWS_ACCESS_KEY";
pub const ENV_AWS_SECRET_KEY: &str = "AWS_SECRET_KEY";
pub const ENV_AWS_REGION: &str = "AWS_REGION";

95
src/database.rs

@ -0,0 +1,95 @@
use std::sync::Arc;
use deadpool_postgres::{Pool, Config, Runtime};
use tokio::sync::OnceCell;
use tokio_postgres::NoTls;
use crate::config::get_config;
static CONNECTION_POOL: OnceCell<Arc<Pool>> = OnceCell::const_new();
async fn initialize_data() -> Arc<Pool> {
let config = get_config().await;
let mut pg = Config::new();
pg.host = Some(config.database.host.clone());
pg.user = Some(config.database.user.clone());
pg.password = Some(config.database.password.clone());
pg.dbname = Some(config.database.name.clone());
pg.port = Some(config.database.port as u16);
let pool = pg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap();
Arc::new(pool)
}
/*
pub async fn initialize_data() -> Arc<Pool> {
let config = get_config().await;
let mut current_dir = current_exe().unwrap();
current_dir.pop();
current_dir.push("db.pem");
let cert_file = File::open(current_dir.display().to_string()).unwrap();
let mut buf = BufReader::new(cert_file);
let mut root_store = rustls::RootCertStore::empty();
for cert in rustls_pemfile::certs(&mut buf) {
root_store.add(cert.unwrap()).unwrap();
}
let tls_config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
let tls = MakeRustlsConnect::new(tls_config);
let mut pg = deadpool_postgres::Config::new();
pg.host = Some(config.database.host.clone());
pg.user = Some(config.database.user.clone());
pg.password = Some(config.database.password.clone());
pg.dbname = Some(config.database.name.clone());
pg.port = Some(config.database.port as u16);
let pool = pg.create_pool(Some(Runtime::Tokio1), tls).unwrap();
Arc::new(pool)
}
*/
/*
pub async fn initialize_data() -> Arc<Pool> {
let config = get_config().await;
let mut current_dir = current_exe().unwrap();
current_dir.pop();
current_dir.push("db.pem");
let cert_file = File::open(current_dir.display().to_string()).unwrap();
let mut buf = BufReader::new(cert_file);
let mut root_store = rustls::RootCertStore::empty();
for cert in rustls_pemfile::certs(&mut buf) {
root_store.add(cert.unwrap()).unwrap();
}
let tls_config = TlsConnector::builder()
.danger_accept_invalid_certs(true)
.build()
.expect_or_log("");
let tls = MakeTlsConnector::new(tls_config);
let mut pg = deadpool_postgres::Config::new();
pg.host = Some(config.database.host.clone());
pg.user = Some(config.database.user.clone());
pg.password = Some(config.database.password.clone());
pg.dbname = Some(config.database.name.clone());
pg.port = Some(config.database.port as u16);
let pool = pg.create_pool(Some(Runtime::Tokio1), tls).unwrap();
Arc::new(pool)
}
*/
pub async fn get_connection_pool() -> Arc<Pool> {
CONNECTION_POOL.get_or_init(initialize_data)
.await
.clone()
}

18
src/env.rs

@ -0,0 +1,18 @@
use std::env;
use std::error::Error;
use std::str::FromStr;
pub fn set(key: &str, val: &str) {
env::set_var(key, val);
}
pub fn get(key: &str) -> Result<String, Box<dyn Error>> {
let ret = env::var(key)?;
Ok(ret)
}
pub fn get_i64(key: &str) -> Result<i64, Box<dyn Error>> {
let str_env = env::var(key)?;
let ret = i64::from_str(&str_env)?;
Ok(ret)
}

52
src/kinesis.rs

@ -0,0 +1,52 @@
use std::sync::Arc;
use aws_config::Region;
use aws_sdk_kinesis::{Client, Config};
use aws_sdk_kinesis::primitives::Blob;
use aws_credential_types::Credentials;
use tokio::sync::OnceCell;
use crate::model::error::DeviceError;
use crate::model::kinesis_message::KinesisMessage;
static AWS_KINESIS_CONSUMER_CLIENT: OnceCell<Arc<Client>> = OnceCell::const_new();
async fn initialize_data() -> Arc<Client> {
let config = crate::config::get_config()
.await;
let access_key_id = &config.kinesis_stream.node.access_key.clone();
let secret_access_key = &config.kinesis_stream.node.secret_key.clone();
let region = &config.kinesis_stream.node.region.clone();
let credentials = Credentials::from_keys(access_key_id, secret_access_key, None);
let aws_config = Config::builder()
.credentials_provider(credentials)
.region(Region::new(region.clone()))
.build();
Arc::new(Client::from_conf(aws_config))
}
async fn get_client() -> Arc<Client> {
AWS_KINESIS_CONSUMER_CLIENT.get_or_init(initialize_data)
.await
.clone()
}
pub async fn kinesis_put_record(msg: &KinesisMessage) -> Result<(), DeviceError> {
let blob = msg.encode();
let config = crate::config::get_config()
.await
.clone();
get_client()
.await
.put_record()
.data(Blob::new(blob))
.partition_key(&config.kinesis_stream.node.partition_key)
.stream_name(&config.kinesis_stream.node.name)
.stream_arn(&config.kinesis_stream.node.arn)
.send()
.await?;
Ok(())
}

11
src/lib.rs

@ -0,0 +1,11 @@
pub mod config;
pub mod model;
pub mod database;
pub mod kinesis;
pub mod log;
pub mod env;
pub mod orchestration;
pub mod metrics;
pub mod parameter_store;
pub mod constants;

98
src/log.rs

@ -0,0 +1,98 @@
use std::env::current_exe;
use std::fs;
use tracing::{Level, subscriber};
use tracing_appender::non_blocking::{WorkerGuard};
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_unwrap::{OptionExt, ResultExt};
use crate::constants::{FAILOVER_LOG_LEVEL_KEY, GATEWAY_LOG_LEVEL_KEY, NODE_LOG_LEVEL_KEY, TICKET_LOG_LEVEL_KEY};
use crate::parameter_store::get_parameter;
fn read_cargo_toml() -> toml::Value{
let cargo_toml_content = fs::read_to_string("Cargo.toml")
.expect_or_log("Failed to read Cargo.toml file");
let cargo_toml: toml::Value = toml::from_str(&cargo_toml_content)
.expect_or_log("Failed to parse Cargo.toml");
cargo_toml
}
fn get_app_name() -> Option<String> {
let cargo_toml = read_cargo_toml();
let package = cargo_toml
.get("package")
.expect("`[package]` section is missing in Cargo.toml");
let name = package
.get("name")
.expect_or_log("`name` field is missing in Cargo.toml");
let name_str = name
.as_str()
.expect_or_log("Package name is not a string");
Option::from(name_str.to_string())
}
fn get_log_level_parameter_key() -> Option<String> {
let mut ret: Option<String> = None;
match get_app_name() {
None => { ret = None }
Some(app_name) => {
match app_name.as_str() {
"device-gateway" => { ret = Option::from(GATEWAY_LOG_LEVEL_KEY.to_string()) }
"device-node" => { ret = Option::from(NODE_LOG_LEVEL_KEY.to_string()) }
"device-ticket" => { ret = Option::from(TICKET_LOG_LEVEL_KEY.to_string()) }
"device-failover" => { ret = Option::from(FAILOVER_LOG_LEVEL_KEY.to_string()) }
_ => { return ret }
}
}
}
ret
}
pub async fn init(log_file_name: &str) -> WorkerGuard {
let mut level: Level = Level::INFO;
let log_level_parameter_key = get_log_level_parameter_key()
.expect_or_log("log level parameter key not found");
let log_level = get_parameter(log_level_parameter_key).await;
match log_level.to_lowercase().as_str() {
"debug" => { level = Level::DEBUG },
"info" => { level = Level::INFO },
"warn" => { level = Level::WARN },
_ => {}
}
let file_appender = RollingFileAppender::builder()
.rotation(Rotation::DAILY)
.filename_suffix(log_file_name)
.build(get_log_dir())
.expect_or_log("failed to initialize rolling file appender");
let (file_writer, _guard) = tracing_appender::non_blocking(file_appender);
let subscriber = tracing_subscriber::fmt()
.with_max_level(level)
.with_writer(file_writer)
//.with_writer(console_writer)
.with_ansi(false)
.finish();
subscriber::set_global_default(subscriber)
.expect_or_log("trace init fail");
_guard
}
fn get_log_dir() -> String {
let mut current_dir = current_exe().unwrap();
current_dir.pop();
current_dir.push("log");
current_dir.display().to_string()
}

42
src/metrics.rs

@ -0,0 +1,42 @@
use sysinfo::System;
use prometheus::{register_gauge, Gauge};
use std::sync::OnceLock;
pub fn get_cpu_usage() -> &'static Gauge {
static GAUGE: OnceLock<Gauge> = OnceLock::new();
GAUGE.get_or_init(|| {
register_gauge!("cpu_usage", "CPU Usage").unwrap()
})
}
pub fn get_memory_used() -> &'static Gauge {
static GAUGE: OnceLock<Gauge> = OnceLock::new();
GAUGE.get_or_init(|| {
register_gauge!("memory_used", "Memory Used").unwrap()
})
}
pub fn get_memory_total() -> &'static Gauge {
static GAUGE: OnceLock<Gauge> = OnceLock::new();
GAUGE.get_or_init(|| {
register_gauge!("memory_total", "Memory Total").unwrap()
})
}
pub fn get_memory_usage() -> &'static Gauge {
static GAUGE: OnceLock<Gauge> = OnceLock::new();
GAUGE.get_or_init(|| {
register_gauge!("memory_usage", "Memory Usage").unwrap()
})
}
pub fn collect(system: &mut System) -> (f64, f64, f64, f64) {
system.refresh_all();
let cpu_usage = system.global_cpu_info().cpu_usage() as f64;
let memory_used = system.used_memory() as f64;
let memory_total = system.total_memory() as f64;
let memory_usage = memory_used / memory_total * 100.0;
(cpu_usage, memory_used, memory_total, memory_usage)
}

34
src/model/error.rs

@ -0,0 +1,34 @@
use aws_sdk_kinesis::operation::put_record::PutRecordError;
use aws_sdk_ssm::operation::get_parameter::GetParameterError;
use thiserror::Error;
use tokio_postgres::Error as PostgresError;
#[derive(Error, Debug)]
pub enum DeviceError {
#[error("flower-device-app / An error occurred: {0}")]
GeneralError(String),
#[error("I/O error")]
IoError(#[from] std::io::Error),
#[error("VarError")]
VarError(#[from] std::env::VarError),
#[error("Parse error: {0}")]
ParseError(#[from] std::num::ParseIntError),
#[error("Database error: {0}")]
DatabaseError(#[from] PostgresError),
#[error("RabbitMq error: {0}")]
RabbitMqError(#[from] lapin::Error),
#[error("Kinesis Stream error: {0}")]
PutRecordError(#[from] aws_sdk_kinesis::error::SdkError<PutRecordError>),
#[error("Parameter Store error: {0}")]
GetParameterError(#[from] aws_sdk_ssm::error::SdkError<GetParameterError>),
#[error("serde_json error: {0}")]
SerdeJsonError(#[from] serde_json::error::Error)
}

52
src/model/kinesis_message.rs

@ -0,0 +1,52 @@
use std::mem::size_of;
use serde::{Deserialize, Serialize};
#[repr(u8)]
#[derive(Debug, PartialEq)]
pub enum KinesisMessageType {
DeviceUpsert = 0x01
}
#[repr(C)]
#[derive(Serialize, Deserialize, PartialEq, Debug)]
pub struct KinesisMessage {
pub action: u8, // data type
pub size: i32, // data size
pub data: Vec<u8> // data
}
impl KinesisMessage {
pub fn new(action: u8, size: i32, data: Vec<u8>) -> KinesisMessage {
KinesisMessage {
action,
size,
data,
}
}
pub fn encode(&self) -> Vec<u8> {
let mut bin = Vec::new();
bin.push(self.action);
bin.extend_from_slice(&self.size.to_ne_bytes());
bin.extend_from_slice(&self.data);
bin
}
pub fn decode(encoded: Vec<u8>) -> KinesisMessage {
let mut current_index = 0;
let action = encoded[current_index];
current_index = current_index + size_of::<u8>();
let size_vec = encoded.get(current_index..current_index + size_of::<i32>()).unwrap().try_into().unwrap();
let size = i32::from_ne_bytes(size_vec);
current_index = current_index + size_of::<i32>();
let data = encoded[current_index..current_index + size as usize].to_vec();
KinesisMessage {
action,
size,
data,
}
}
}

41
src/model/kinesis_message_detail.rs

@ -0,0 +1,41 @@
use serde::{Deserialize, Serialize};
pub trait KinesisMessageDetail<T> {
fn encode(&self) -> Vec<u8>;
fn decode(encoded: Vec<u8>) -> T;
}
#[derive(Serialize, Deserialize, PartialEq, Debug)]
#[repr(C)]
pub struct UpsertKinesisMessageDetail {
pub finger_print: String,
pub so_id: String,
pub uuid: String,
pub use_personalized_ad: bool,
pub test: Option<bool>,
pub zip_code: Option<String>,
pub free_storage: i64,
pub used_storage: i64,
pub cached_storage: i64,
pub model_name: String,
pub firmware_build_date: Option<String>,
pub firmware_ver: Option<String>,
pub full_firmware_ver: Option<String>,
pub app_version: String,
pub platform_ad_id: Option<String>,
pub a_key: Option<String>,
pub is_exist: bool,
pub device_id: i64
}
impl<T> KinesisMessageDetail<T> for UpsertKinesisMessageDetail
where
T: Serialize + serde::de::DeserializeOwned,{
fn encode(&self) -> Vec<u8> {
bincode::serialize(&self).unwrap()
}
fn decode(encoded: Vec<u8>) -> T {
bincode::deserialize(&encoded).unwrap()
}
}

4
src/model/mod.rs

@ -0,0 +1,4 @@
pub mod kinesis_message_detail;
pub mod kinesis_message;
pub mod error;
pub mod orchestration;

23
src/model/orchestration.rs

@ -0,0 +1,23 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone)]
pub struct OrchestrationConfig {
pub quotient: i64,
pub shard_count: i32,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct OrchestrationNodeInfo {
pub id: i64,
pub platform_id: i32,
pub remains: Vec<i64>,
pub endpoint: String,
pub status: i32
}
#[derive(Serialize, Deserialize, Clone)]
pub struct OrchestrationGatewayInfo {
pub id: i64,
pub endpoint: String,
pub ticket_endpoint: String,
}

157
src/orchestration.rs

@ -0,0 +1,157 @@
use std::env;
use std::str::FromStr;
use std::sync::{Arc, Mutex, OnceLock};
use dashmap::DashMap;
use tracing::info;
use crate::model::error::DeviceError;
use crate::model::orchestration::{OrchestrationConfig, OrchestrationGatewayInfo, OrchestrationNodeInfo};
pub fn get_orchestration_config() -> &'static Arc<Mutex<OrchestrationConfig>> {
static ORCHESTRATION_CONFIG: OnceLock<Arc<Mutex<OrchestrationConfig>>> = OnceLock::new();
ORCHESTRATION_CONFIG.get_or_init(|| Arc::new(Mutex::new(OrchestrationConfig {
quotient: 0,
shard_count: 0,
})))
}
pub fn get_orchestration_node_info() -> &'static Arc<DashMap<i64, OrchestrationNodeInfo>> {
static ORCHESTRATION_NODE_INFO: OnceLock<Arc<DashMap<i64, OrchestrationNodeInfo>>>= OnceLock::new();
ORCHESTRATION_NODE_INFO.get_or_init(|| Arc::new(DashMap::new()))
}
pub fn get_orchestration_gateway_info() -> &'static Arc<DashMap<i64, OrchestrationGatewayInfo>> {
static ORCHESTRATION_GATEWAY_INFO: OnceLock<Arc<DashMap<i64, OrchestrationGatewayInfo>>> = OnceLock::new();
ORCHESTRATION_GATEWAY_INFO.get_or_init(|| Arc::new(DashMap::new()))
}
pub async fn load_orchestration_config() -> Result<(), DeviceError> {
let pool = crate::database::get_connection_pool().await;
let client = pool.get().await.unwrap();
let fetch_query = "SELECT * FROM orchestration_config";
let result = client
.query(fetch_query, &[])
.await?;
match result.get(0) {
None => {
return Err(DeviceError::GeneralError(String::from("empty orchestration config")))
}
Some(v) => {
let mut config = get_orchestration_config().lock().unwrap();
config.quotient = v.get("quotient");
config.shard_count = v.get("shard_count");
}
}
Ok(())
}
pub async fn load_orchestration_node_info() -> Result<(), DeviceError> {
let pool = crate::database::get_connection_pool().await;
let client = pool.get().await.unwrap();
let fetch_query = "SELECT * FROM orchestration_node_info";
let result = client
.query(fetch_query, &[])
.await?;
let cache = get_orchestration_node_info();
cache.clear();
let mut count = 0;
for row in result.into_iter() {
let id: i64 = row.get("id");
let mut node_info = OrchestrationNodeInfo {
id,
platform_id: row.get("platform_id"),
remains: vec![],
endpoint: row.get("endpoint"),
status: row.get("status"),
};
let remain_str: String = row.get("remains");
let remains: Vec<&str> = remain_str.split(',').collect();
let int_vec = remains
.iter()
.map(|s| s.parse::<i64>().expect("parse error : invalid node id"))
.collect();
node_info.remains = int_vec;
cache.insert(id, node_info);
count += 1;
}
info!("node_info loaded, count : {}", count);
Ok(())
}
pub async fn load_orchestration_gateway_info() -> Result<(), DeviceError> {
let pool = crate::database::get_connection_pool().await;
let client = pool.get().await.unwrap();
let fetch_query = "SELECT * FROM orchestration_gateway_info";
let result = client
.query(fetch_query, &[])
.await?;
let cache = get_orchestration_gateway_info();
cache.clear();
let mut count = 0;
for row in result.into_iter() {
let id: i64 = row.get("id");
let gateway_info = OrchestrationGatewayInfo {
id,
endpoint: row.get("endpoint"),
ticket_endpoint: row.get("ticket_endpoint")
};
cache.insert(id, gateway_info);
count += 1;
}
info!("gateway_info loaded, count : {}", count);
Ok(())
}
pub fn get_node_info(id: i64) -> Option<OrchestrationNodeInfo> {
let node_cache = get_orchestration_node_info();
match node_cache.get_mut(&id) {
None => { None }
Some(node) => {
Option::from(node.value().clone())
}
}
}
pub fn get_node_info_by_device_id(device_id: i64) -> Option<OrchestrationNodeInfo> {
let node_cache = get_orchestration_node_info();
for node in node_cache.iter() {
let remain = get_orchestration_config()
.lock()
.unwrap()
.quotient % device_id;
match node.remains.iter().find(|&&x| x == remain) {
Some(_) => {
return Option::from(node.value().clone())
}
_ => {}
}
}
None
}
pub fn get_current_app_id() -> Result<i64, DeviceError> {
let id_str = env::var("ID")?;
let id = i64::from_str(&id_str)?;
Ok(id)
}

91
src/parameter_store.rs

@ -0,0 +1,91 @@
use std::sync::Arc;
use aws_config::meta::region::RegionProviderChain;
use aws_credential_types::Credentials;
use aws_sdk_ssm::Client;
use aws_types::region::Region;
use tokio::sync::OnceCell;
use tracing_unwrap::{OptionExt, ResultExt};
use crate::constants;
static AWS_SSM_CLIENT: OnceCell<Arc<Client>> = OnceCell::const_new();
fn get_aws_credential() -> (String, String, String) {
let access_key = crate::env::get(constants::ENV_AWS_ACCESS_KEY)
.expect_or_log("env not found: AWS_ACCESS_KEY");
let secret_key = crate::env::get(constants::ENV_AWS_SECRET_KEY)
.expect_or_log("env not found: AWS_SECRET_KEY");
let region = crate::env::get(constants::ENV_AWS_REGION)
.expect_or_log("env not found: AWS_REGION");
(access_key, secret_key, region)
}
async fn initialize_data() -> Arc<Client> {
let (access_key, secret_key, region) = get_aws_credential();
let region_provider = RegionProviderChain::default_provider().or_else(Region::new(region));
let region = region_provider.region().await.unwrap();
let credentials = Credentials::from_keys(
access_key,
secret_key,
None,
);
let config = aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(region)
.credentials_provider(credentials)
.load()
.await;
Arc::new(Client::new(&config))
}
async fn get_client() -> Arc<Client> {
AWS_SSM_CLIENT.get_or_init(initialize_data)
.await
.clone()
}
pub fn get_prefix() -> String {
let mut profile: String = "".to_string();
let name = "RUST_PROFILE";
match std::env::var(name) {
Ok(v) => profile = v,
Err(e) => panic!("${} is not set ({})", name, e),
}
let mut prefix: String = "".to_string();
match profile.as_str() {
"local" => {
prefix = "/dev".to_string();
},
_ => {
prefix = profile
}
};
prefix
}
pub async fn get_parameter(key: String) -> String {
let key_profile = get_prefix() + &key;
let parameter_output = get_client()
.await
.get_parameter()
.name(key_profile)
.send()
.await
.expect_or_log("Parameter request fail");
let parameter = parameter_output
.parameter()
.expect_or_log("Parameter not found.");
let parameter_value = parameter
.value()
.expect_or_log("Parameter has no value.");
parameter_value.to_string()
}
Loading…
Cancel
Save