+
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions .github/workflows/mobilecoin-workflow-dev-setup-environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -294,17 +294,27 @@ jobs:
object_name: fog-recovery-reader-0-postgresql
src: ${{ env.PG_PATH }}/sec

- name: Generate PostgreSQL values file
run: |
mkdir -p "${VALUES_BASE_PATH}"
cat <<EOF > "${VALUES_BASE_PATH}/postgresql-values.yaml"
architecture: replication
global:
postgresql:
auth:
database: fog_recovery
existingSecret: fog-recovery-postgresql
postgresqlSharedPreloadLibraries: pgaudit,pg_stat_statements
EOF

- name: Deploy PostgreSQL instance
uses: mobilecoinofficial/gha-k8s-toolbox@v1
with:
action: helm-deploy
chart_repo: https://charts.bitnami.com/bitnami
chart_name: postgresql
chart_version: 15.2.2
chart_set: |
--set=global.postgresql.auth.existingSecret=fog-recovery-postgresql
--set=global.postgresql.auth.database=fog_recovery
--set=architecture=replication
chart_values: ${{ env.VALUES_BASE_PATH }}/postgresql-values.yaml
chart_wait_timeout: 5m
release_name: fog-recovery-postgresql
namespace: ${{ inputs.namespace }}
Expand Down
6 changes: 5 additions & 1 deletion fog/view/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use clap::Parser;
use mc_common::ResponderId;
use mc_fog_sql_recovery_db::SqlRecoveryDbConnectionConfig;
use mc_fog_uri::{FogViewRouterUri, FogViewStoreUri, FogViewUri};
use mc_util_parse::parse_duration_in_seconds;
use mc_util_parse::{parse_duration_in_millis, parse_duration_in_seconds};
use mc_util_uri::AdminUri;
use serde::Serialize;
use std::{str::FromStr, time::Duration};
Expand Down Expand Up @@ -69,6 +69,10 @@ pub struct MobileAcctViewConfig {
#[clap(long, default_value = "1000", env = "MC_BLOCK_QUERY_BATCH_SIZE")]
pub block_query_batch_size: usize,

/// Database polling interval in ms.
#[clap(long, default_value = "250", value_parser = parse_duration_in_millis, env = "MC_DB_POLLING_INTERVAL_MS")]
pub db_polling_interval_ms: Duration,

/// Determines which group of TxOuts the Fog View Store instance will
/// process.
#[clap(long, default_value = "default", env = "MC_SHARDING_STRATEGY")]
Expand Down
17 changes: 11 additions & 6 deletions fog/view/server/src/db_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ use std::{
time::Duration,
};

/// Time to wait between database fetch attempts.
pub const DB_POLL_INTERNAL: Duration = Duration::from_millis(100);

/// Approximate maximum number of ETxOutRecords we will collect inside
/// fetched_records before blocking and waiting for the enclave thread to pick
/// them up. Since DB fetching is significantlly faster than enclave insertion
/// them up. Since DB fetching is significantly faster than enclave insertion
/// we need a mechanism that prevents fetched_records from growing indefinitely.
/// This essentially caps the memory usage of the fetched_records array.
/// Assuming each ETxOutRecord is <256 bytes, this gives a worst case scenario
Expand Down Expand Up @@ -77,6 +74,7 @@ pub struct DbFetcher {
impl DbFetcher {
pub fn new<DB, SS>(
db: DB,
db_polling_interval: Duration,
readiness_indicator: ReadinessIndicator,
sharding_strategy: SS,
block_query_batch_size: usize,
Expand Down Expand Up @@ -104,6 +102,7 @@ impl DbFetcher {
.spawn(move || {
DbFetcherThread::start(
db,
db_polling_interval,
thread_stop_requested,
thread_shared_state,
thread_num_queued_records_limiter,
Expand Down Expand Up @@ -179,6 +178,7 @@ where
SS: ShardingStrategy + Clone + Send + Sync + 'static,
{
db: DB,
db_polling_interval: Duration,
stop_requested: Arc<AtomicBool>,
shared_state: Arc<Mutex<DbFetcherSharedState>>,
block_tracker: BlockTracker<SS>,
Expand All @@ -197,6 +197,7 @@ where
{
pub fn start(
db: DB,
db_polling_interval: Duration,
stop_requested: Arc<AtomicBool>,
shared_state: Arc<Mutex<DbFetcherSharedState>>,
num_queued_records_limiter: Arc<(Mutex<usize>, Condvar)>,
Expand All @@ -211,6 +212,7 @@ where
);
let thread = Self {
db,
db_polling_interval,
stop_requested,
shared_state,
block_tracker: BlockTracker::new(logger.clone(), sharding_strategy),
Expand Down Expand Up @@ -243,7 +245,7 @@ where
// loaded into the queue.
self.readiness_indicator.set_ready();

sleep(DB_POLL_INTERNAL);
sleep(self.db_polling_interval);
}
}

Expand Down Expand Up @@ -382,7 +384,7 @@ where
// We might have more work to do, we aren't sure because of the error
may_have_more_work = true;
// Let's back off for one interval when there is an error
sleep(DB_POLL_INTERNAL);
sleep(self.db_polling_interval);
}
}
}
Expand Down Expand Up @@ -415,6 +417,7 @@ mod tests {
let db = db_test_context.get_db_instance();
let db_fetcher = DbFetcher::new(
db.clone(),
Duration::from_millis(100),
Default::default(),
EpochShardingStrategy::default(),
1,
Expand Down Expand Up @@ -651,6 +654,7 @@ mod tests {
let db = db_test_context.get_db_instance();
let db_fetcher = DbFetcher::new(
db.clone(),
Duration::from_millis(100),
Default::default(),
EpochShardingStrategy::default(),
1,
Expand Down Expand Up @@ -714,6 +718,7 @@ mod tests {
let db = db_test_context.get_db_instance();
let db_fetcher = DbFetcher::new(
db.clone(),
Duration::from_millis(100),
Default::default(),
EpochShardingStrategy::default(),
1,
Expand Down
10 changes: 4 additions & 6 deletions fog/view/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,6 @@ where
logger: Logger,
}

/// How long to wait between polling db
const DB_POLL_INTERNAL: Duration = Duration::from_millis(100);

impl<E, DB, SS> DbPollThread<E, DB, SS>
where
E: ViewEnclaveProxy,
Expand Down Expand Up @@ -348,7 +345,7 @@ where
logger: Logger,
) {
log::debug!(logger, "Db poll thread started");

let polling_interval = config.db_polling_interval_ms;
let mut worker = DbPollThreadWorker::new(
config,
stop_requested,
Expand All @@ -369,7 +366,7 @@ where
WorkerTickResult::HasMoreWork => {}

WorkerTickResult::Sleep => {
sleep(DB_POLL_INTERNAL);
sleep(polling_interval);
}
}
}
Expand Down Expand Up @@ -434,7 +431,7 @@ pub enum WorkerTickResult {
Sleep,
}

/// Telemetry: block indes currently being worked on.
/// Telemetry: block index currently being worked on.
const TELEMETRY_BLOCK_INDEX_KEY: Key = telemetry_static_key!("block-index");

impl<E, DB, SS> DbPollThreadWorker<E, DB, SS>
Expand All @@ -457,6 +454,7 @@ where

let db_fetcher = DbFetcher::new(
db.clone(),
config.db_polling_interval_ms,
db_fetcher_readiness_indicator.clone(),
sharding_strategy.clone(),
config.block_query_batch_size,
Expand Down
1 change: 1 addition & 0 deletions fog/view/server/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ impl RouterTestEnvironment {
sharding_strategy,
postgres_config: Default::default(),
block_query_batch_size: 2,
db_polling_interval_ms: Duration::from_millis(100),
};

let enclave = SgxViewEnclave::new(
Expand Down
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载