Browse Source

0.1.6

master
Ryan 1 year ago
parent
commit
667815897f
  1. 11
      Cargo.toml
  2. 5
      README.md
  3. 8
      RELEASE_NOTE.md
  4. 13
      src/aws/credential.rs
  5. 5
      src/aws/kinesis.rs
  6. 4
      src/aws/mod.rs
  7. 14
      src/aws/parameter_store.rs
  8. 84
      src/aws/s3.rs
  9. 2
      src/config.rs
  10. 5
      src/lib.rs
  11. 26
      src/log.rs
  12. 12
      src/model/error.rs
  13. 52
      src/model/kinesis_message.rs
  14. 41
      src/model/kinesis_message_detail.rs
  15. 2
      src/model/mod.rs

11
Cargo.toml

@ -1,16 +1,16 @@
[package]
name = "device-common"
version = "0.1.5"
version = "0.1.6"
edition = "2021"
authors = ["Ryan Bae <jh.bae@anypointmedia.com>"]
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
bincode = "1.3"
aws-config = { version = "1.5" }
aws-sdk-kinesis = { version = "1.36", features = ["behavior-version-latest"] }
aws-sdk-ssm = "1.40"
aws-sdk-kinesis = { version = "1.3", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1.4"
aws-sdk-ssm = "1.4"
aws-types="1.3"
aws-credential-types = { version = "1.2", features = ["hardcoded-credentials"] }
deadpool-postgres = "0.14"
@ -25,8 +25,9 @@ lapin = "2.5"
sysinfo = "0.31"
prometheus = "0.13"
dashmap = "6.0"
serial_test = "3.1"
async-trait = "0.1"
bytes = "1.7"
serial_test = "3.1"
[profile.dev]
opt-level = 0

5
README.md

@ -5,8 +5,9 @@ device-service 에서 공통으로 쓰이는 기능들의 집합
- config.rs : 필수 설정 정보를 AWS Parameter Store 로 부터 읽어온다.
- database.rs : pg 관련 기능(connection pool 등)
- env.rs : 환경 변수 관련 기능 / 환경 변수별 AWS Parameter Store 에서 쓰일 prefix 를 만든다.
- kinesis.rs : AWS Kinesis Stream 관련 기능
- log.rs : log 관련 기능 / static function 으로 사용 가능한 expect_or_log
- metrics.rs : cpu / memory 등 기본 metric 수집 기능
- orchestration.rs : orchestration 정보를 로드하는 기능
- parameter_store.rs : AWS Parameter Store 관련 기능
- aws/kinesis.rs : AWS Kinesis Stream 관련 기능
- aws/parameter_store.rs : AWS Parameter Store 관련 기능
- aws/s3.rs : AWS S3 관련 기능

8
RELEASE_NOTE.md

@ -21,4 +21,10 @@ first deploy
0.1.5
======
- fix bug for log
- fix bug for log
0.1.6
======
- changed kinesis_message(byte struct) to string(json)
- added 'aws' directory
- added s3.rs

13
src/aws/credential.rs

@ -0,0 +1,13 @@
use tracing_unwrap::ResultExt;
use crate::constants;
pub fn get_aws_credential() -> (String, String, String) {
let access_key = crate::env::get(constants::ENV_AWS_ACCESS_KEY)
.expect_or_log("env not found: AWS_ACCESS_KEY");
let secret_key = crate::env::get(constants::ENV_AWS_SECRET_KEY)
.expect_or_log("env not found: AWS_SECRET_KEY");
let region = crate::env::get(constants::ENV_AWS_REGION)
.expect_or_log("env not found: AWS_REGION");
(access_key, secret_key, region)
}

5
src/kinesis.rs → src/aws/kinesis.rs

@ -5,7 +5,6 @@ use aws_sdk_kinesis::primitives::Blob;
use aws_credential_types::Credentials;
use tokio::sync::OnceCell;
use crate::model::error::DeviceError;
use crate::model::kinesis_message::KinesisMessage;
static AWS_KINESIS_CONSUMER_CLIENT: OnceCell<Arc<Client>> = OnceCell::const_new();
@ -32,8 +31,8 @@ async fn get_client() -> Arc<Client> {
.clone()
}
pub async fn kinesis_put_record(msg: &KinesisMessage) -> Result<(), DeviceError> {
let blob = msg.encode();
pub async fn kinesis_put_record(msg: &String) -> Result<(), DeviceError> {
let blob = msg.as_bytes();
let config = crate::config::get_config()
.await
.clone();

4
src/aws/mod.rs

@ -0,0 +1,4 @@
pub mod s3;
pub mod kinesis;
pub mod parameter_store;
mod credential;

14
src/parameter_store.rs → src/aws/parameter_store.rs

@ -5,23 +5,11 @@ use aws_sdk_ssm::Client;
use aws_types::region::Region;
use tokio::sync::OnceCell;
use tracing_unwrap::{OptionExt, ResultExt};
use crate::constants;
static AWS_SSM_CLIENT: OnceCell<Arc<Client>> = OnceCell::const_new();
fn get_aws_credential() -> (String, String, String) {
let access_key = crate::env::get(constants::ENV_AWS_ACCESS_KEY)
.expect_or_log("env not found: AWS_ACCESS_KEY");
let secret_key = crate::env::get(constants::ENV_AWS_SECRET_KEY)
.expect_or_log("env not found: AWS_SECRET_KEY");
let region = crate::env::get(constants::ENV_AWS_REGION)
.expect_or_log("env not found: AWS_REGION");
(access_key, secret_key, region)
}
async fn initialize_data() -> Arc<Client> {
let (access_key, secret_key, region) = get_aws_credential();
let (access_key, secret_key, region) = crate::aws::credential::get_aws_credential();
let region_provider = RegionProviderChain::default_provider().or_else(Region::new(region));
let region = region_provider.region().await.unwrap();

84
src/aws/s3.rs

@ -0,0 +1,84 @@
use std::sync::Arc;
use aws_config::meta::region::RegionProviderChain;
use aws_credential_types::Credentials;
use aws_types::region::Region;
use bytes::Bytes;
use tokio::sync::OnceCell;
use tracing::info;
use crate::model::error::DeviceError;
static AWS_S3_CLIENT: OnceCell<Arc<aws_sdk_s3::Client>> = OnceCell::const_new();
async fn initialize_data() -> Arc<aws_sdk_s3::Client> {
let (access_key, secret_key, region) = crate::aws::credential::get_aws_credential();
let region_provider = RegionProviderChain::default_provider().or_else(Region::new(region));
let region = region_provider.region().await.unwrap();
let credentials = Credentials::from_keys(
access_key,
secret_key,
None,
);
let config = aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(region)
.credentials_provider(credentials)
.load()
.await;
Arc::new(aws_sdk_s3::Client::new(&config))
}
async fn get_client() -> Arc<aws_sdk_s3::Client> {
AWS_S3_CLIENT.get_or_init(initialize_data)
.await
.clone()
}
pub async fn read_file(bucket: &str, key: &str) -> Result<String, DeviceError> {
let client = get_client().await;
let res = client
.get_object()
.bucket(bucket)
.key(key)
.send()
.await?;
let data = res
.body
.collect()
.await?;
let data: Bytes = data.into_bytes();
if data.is_empty() {
return Err(DeviceError::GeneralError("length".to_string()))
}
Ok(String::from_utf8_lossy(&data).into_owned())
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
use crate::constants;
fn setup() {
crate::env::set(constants::ENV_AWS_ACCESS_KEY, "AKIA32XY6L3KQA4QDIAP");
crate::env::set(constants::ENV_AWS_SECRET_KEY, "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");
crate::env::set(constants::ENV_AWS_REGION, "ap-northeast-2");
}
fn teardown() {}
#[tokio::test]
#[serial]
async fn test_read_file() {
setup();
assert!(read_file("apm-bigdata", "flower-device/test.csv").await.is_ok());
teardown();
}
}

2
src/config.rs

@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use tracing_unwrap::ResultExt;
use tokio::sync::OnceCell;
use crate::constants;
use crate::parameter_store::get_parameter;
use crate::aws::parameter_store::get_parameter;
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct Rabbitmq {

5
src/lib.rs

@ -2,10 +2,9 @@
pub mod config;
pub mod model;
pub mod database;
pub mod kinesis;
pub mod log;
pub mod env;
pub mod orchestration;
pub mod metrics;
pub mod parameter_store;
pub mod constants;
pub mod constants;
mod aws;

26
src/log.rs

@ -1,11 +1,11 @@
use std::env::current_exe;
use std::{env, fs};
use std::fs;
use std::path::Path;
use tracing::{error, Level, subscriber};
use tracing_appender::non_blocking::{WorkerGuard};
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use crate::constants::{FAILOVER_LOG_LEVEL_KEY, GATEWAY_LOG_LEVEL_KEY, NODE_LOG_LEVEL_KEY, TICKET_LOG_LEVEL_KEY};
use crate::parameter_store::get_parameter;
use crate::aws::parameter_store::get_parameter;
fn get_log_level_parameter_key() -> Option<String> {
let mut ret: Option<String> = None;
@ -84,26 +84,4 @@ fn get_log_dir() -> String {
current_dir.pop();
current_dir.push("log");
current_dir.display().to_string()
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
fn setup() {
}
fn teardown() {
}
#[test]
#[serial]
fn test_get_log_level_parameter_key() {
setup();
assert!(get_log_level_parameter_key().is_none());
teardown();
}
}

12
src/model/error.rs

@ -1,4 +1,7 @@
use aws_sdk_kinesis::operation::put_record::PutRecordError;
use aws_sdk_s3::operation::get_object::GetObjectError;
use aws_sdk_s3::operation::list_buckets::ListBucketsError;
use aws_sdk_s3::primitives::ByteStreamError;
use aws_sdk_ssm::operation::get_parameter::GetParameterError;
use thiserror::Error;
use tokio_postgres::Error as PostgresError;
@ -29,6 +32,15 @@ pub enum DeviceError {
#[error("Parameter Store error: {0}")]
GetParameterError(#[from] aws_sdk_ssm::error::SdkError<GetParameterError>),
#[error("S3 error: {0}")]
ListBucketsError(#[from] aws_sdk_s3::error::SdkError<ListBucketsError>),
#[error("S3 error: {0}")]
GetObjectError(#[from] aws_sdk_s3::error::SdkError<GetObjectError>),
#[error("S3 error: {0}")]
ByteStreamError(#[from] ByteStreamError),
#[error("serde_json error: {0}")]
SerdeJsonError(#[from] serde_json::error::Error)
}

52
src/model/kinesis_message.rs

@ -1,52 +0,0 @@
use std::mem::size_of;
use serde::{Deserialize, Serialize};
#[repr(u8)]
#[derive(Debug, PartialEq)]
pub enum KinesisMessageType {
DeviceUpsert = 0x01
}
#[repr(C)]
#[derive(Serialize, Deserialize, PartialEq, Debug)]
pub struct KinesisMessage {
pub action: u8, // data type
pub size: i32, // data size
pub data: Vec<u8> // data
}
impl KinesisMessage {
pub fn new(action: u8, size: i32, data: Vec<u8>) -> KinesisMessage {
KinesisMessage {
action,
size,
data,
}
}
pub fn encode(&self) -> Vec<u8> {
let mut bin = Vec::new();
bin.push(self.action);
bin.extend_from_slice(&self.size.to_ne_bytes());
bin.extend_from_slice(&self.data);
bin
}
pub fn decode(encoded: Vec<u8>) -> KinesisMessage {
let mut current_index = 0;
let action = encoded[current_index];
current_index = current_index + size_of::<u8>();
let size_vec = encoded.get(current_index..current_index + size_of::<i32>()).unwrap().try_into().unwrap();
let size = i32::from_ne_bytes(size_vec);
current_index = current_index + size_of::<i32>();
let data = encoded[current_index..current_index + size as usize].to_vec();
KinesisMessage {
action,
size,
data,
}
}
}

41
src/model/kinesis_message_detail.rs

@ -1,41 +0,0 @@
use serde::{Deserialize, Serialize};
pub trait KinesisMessageDetail<T> {
fn encode(&self) -> Vec<u8>;
fn decode(encoded: Vec<u8>) -> T;
}
#[derive(Serialize, Deserialize, PartialEq, Debug)]
#[repr(C)]
pub struct UpsertKinesisMessageDetail {
pub finger_print: String,
pub so_id: String,
pub uuid: String,
pub use_personalized_ad: bool,
pub test: Option<bool>,
pub zip_code: Option<String>,
pub free_storage: i64,
pub used_storage: i64,
pub cached_storage: i64,
pub model_name: String,
pub firmware_build_date: Option<String>,
pub firmware_ver: Option<String>,
pub full_firmware_ver: Option<String>,
pub app_version: String,
pub platform_ad_id: Option<String>,
pub a_key: Option<String>,
pub is_exist: bool,
pub device_id: i64
}
impl<T> KinesisMessageDetail<T> for UpsertKinesisMessageDetail
where
T: Serialize + serde::de::DeserializeOwned,{
fn encode(&self) -> Vec<u8> {
bincode::serialize(&self).unwrap()
}
fn decode(encoded: Vec<u8>) -> T {
bincode::deserialize(&encoded).unwrap()
}
}

2
src/model/mod.rs

@ -1,4 +1,2 @@
pub mod kinesis_message_detail;
pub mod kinesis_message;
pub mod error;
pub mod orchestration;
Loading…
Cancel
Save