这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
2 changes: 1 addition & 1 deletion workers/loc.api/bfx.api.router/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class BfxApiRouter extends BaseBfxApiRouter {
['fundingTrades', 90],
['trades', 15],
['statusMessages', 90],
['candles', 30],
['candles', 20],
['orderTrades', 90],
['orderHistory', 90],
['activeOrders', 90],
Expand Down
9 changes: 7 additions & 2 deletions workers/loc.api/sync/currency.converter/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ const depsTypes = (TYPES) => [
TYPES.SyncSchema,
TYPES.FOREX_SYMBS,
TYPES.ALLOWED_COLLS,
TYPES.SYNC_API_METHODS
TYPES.SYNC_API_METHODS,
TYPES.Logger
]
class CurrencyConverter {
constructor (
Expand All @@ -40,7 +41,8 @@ class CurrencyConverter {
syncSchema,
FOREX_SYMBS,
ALLOWED_COLLS,
SYNC_API_METHODS
SYNC_API_METHODS,
logger
) {
this.rService = rService
this.getDataFromApi = getDataFromApi
Expand All @@ -49,6 +51,7 @@ class CurrencyConverter {
this.FOREX_SYMBS = FOREX_SYMBS
this.ALLOWED_COLLS = ALLOWED_COLLS
this.SYNC_API_METHODS = SYNC_API_METHODS
this.logger = logger

this._COLL_NAMES = {
PUBLIC_TRADES: 'publicTrades',
Expand Down Expand Up @@ -114,6 +117,8 @@ class CurrencyConverter {
return this.currenciesSynonymous
}
} catch (err) {
this.logger.debug(err)

return this.currenciesSynonymous
}
}
Expand Down
97 changes: 66 additions & 31 deletions workers/loc.api/sync/data.inserter/data.checker/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,17 @@ class DataChecker {
return filterMethodCollMap(methodCollMap)
}

async checkNewPublicData () {
/*
* `authMap` can be empty
*/
async checkNewPublicData (authMap) {
const methodCollMap = this._getMethodCollMap()

if (this._isInterrupted) {
return filterMethodCollMap(methodCollMap, true)
}

await this._checkNewDataPublicArrObjType(methodCollMap)
await this._checkNewDataPublicArrObjType(authMap, methodCollMap)
await this._checkNewPublicUpdatableData(methodCollMap)

return filterMethodCollMap(methodCollMap, true)
Expand Down Expand Up @@ -179,7 +182,7 @@ class DataChecker {
schema.start.push(freshSyncUserStepData)
}

async _checkNewDataPublicArrObjType (methodCollMap) {
async _checkNewDataPublicArrObjType (authMap, methodCollMap) {
for (const [method, schema] of methodCollMap) {
if (this._isInterrupted) {
return
Expand All @@ -191,7 +194,21 @@ class DataChecker {
this._resetSyncSchemaProps(schema)

if (schema.name === this.ALLOWED_COLLS.CANDLES) {
await this._checkNewCandlesData(method, schema)
// If `authMap` is empty sync candles for all users
const _authMap = (
!(authMap instanceof Map) ||
authMap.size === 0
)
? new Map([['ALL', {}]])
: authMap

for (const authItem of _authMap) {
await this._checkNewCandlesData(
method,
schema,
authItem[1]
)
}
}
if (
schema.name === this.ALLOWED_COLLS.PUBLIC_TRADES ||
Expand Down Expand Up @@ -336,20 +353,29 @@ class DataChecker {
*/
async _checkNewCandlesData (
method,
schema
schema,
auth
) {
if (this._isInterrupted) {
return
}

const { _id: userId, subUser } = auth ?? {}
const { _id: subUserId } = subUser ?? {}
const usersFilter = Number.isInteger(userId)
? { $eq: { user_id: userId } }
: {}

const currMts = Date.now()
const firstLedgerMts = await this._getFirstLedgerMts()
const firstLedgerMts = await this._getFirstLedgerMts(usersFilter)

if (!Number.isInteger(firstLedgerMts)) {
return
}

const uniqueSymbsSet = await this._getUniqueSymbsFromLedgers()
const uniqueSymbsSet = await this._getUniqueSymbsFromLedgers(
usersFilter
)
const candlesPairsSet = new Set()

for (const symbol of uniqueSymbsSet) {
Expand Down Expand Up @@ -389,7 +415,9 @@ class DataChecker {
collName: method,
symbol,
timeframe: CANDLES_TIMEFRAME,
defaultStart: firstLedgerMts
defaultStart: firstLedgerMts,
userId,
subUserId
}
)

Expand All @@ -398,7 +426,6 @@ class DataChecker {
!syncUserStepData.isCurrStepReady
) {
schema.hasNewData = true
schema.start.push(syncUserStepData)
}

const wasStartPointChanged = this._wasStartPointChanged(
Expand All @@ -411,36 +438,38 @@ class DataChecker {
{ dayOfYear: 1 }
)

if (
!wasStartPointChanged &&
!shouldFreshSyncBeAdded
) {
continue
}

const freshSyncUserStepData = this.syncUserStepDataFactory({
...syncUserStepData.getParams(),
isBaseStepReady: true,
isCurrStepReady: true
})

if (wasStartPointChanged) {
freshSyncUserStepData.setParams({
syncUserStepData.setParams({
baseStart: firstLedgerMts,
baseEnd: syncUserStepData.baseStart,
isBaseStepReady: false
})

schema.hasNewData = true
}
if (shouldFreshSyncBeAdded) {
freshSyncUserStepData.setParams({
syncUserStepData.setParams({
currStart: lastElemMtsFromTables,
currEnd: currMts,
isCurrStepReady: false
})

schema.hasNewData = true
}

if (!schema.hasNewData) {
continue
}

// To keep flow: one candles request per currency
if (!syncUserStepData.isBaseStepReady) {
syncUserStepData.setParams({
baseEnd: syncUserStepData.currEnd ?? currMts,
isCurrStepReady: true
})
}

schema.hasNewData = true
schema.start.push(freshSyncUserStepData)
syncUserStepData.auth = auth
schema.start.push(syncUserStepData)
}
}

Expand Down Expand Up @@ -568,8 +597,11 @@ class DataChecker {
return momentDiff > allowedTimeDiff
}

async _getFirstLedgerMts () {
const firstElemFilter = { $not: { currency: 'USD' } }
async _getFirstLedgerMts (usersFilter) {
const firstElemFilter = {
...usersFilter,
$not: { currency: 'USD' }
}
const firstElemOrder = [['mts', 1]]

const tempLedgersTableName = SyncTempTablesManager.getTempTableName(
Expand Down Expand Up @@ -615,9 +647,12 @@ class DataChecker {
return mts
}

async _getUniqueSymbsFromLedgers () {
async _getUniqueSymbsFromLedgers (usersFilter) {
const ledgerParams = {
filter: { $not: { currency: this.FOREX_SYMBS } },
filter: {
...usersFilter,
$not: { currency: this.FOREX_SYMBS }
},
isDistinct: true,
projection: ['currency']
}
Expand Down
77 changes: 60 additions & 17 deletions workers/loc.api/sync/data.inserter/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ const depsTypes = (TYPES) => [
TYPES.WSEventEmitter,
TYPES.GetDataFromApi,
TYPES.SyncTempTablesManager,
TYPES.SyncUserStepManager
TYPES.SyncUserStepManager,
TYPES.Progress
]
class DataInserter extends EventEmitter {
constructor (
Expand All @@ -79,7 +80,8 @@ class DataInserter extends EventEmitter {
wsEventEmitter,
getDataFromApi,
syncTempTablesManager,
syncUserStepManager
syncUserStepManager,
progress
) {
super()

Expand All @@ -99,6 +101,7 @@ class DataInserter extends EventEmitter {
this.getDataFromApi = getDataFromApi
this.syncTempTablesManager = syncTempTablesManager
this.syncUserStepManager = syncUserStepManager
this.progress = progress

this._asyncProgressHandlers = []
this._auth = null
Expand Down Expand Up @@ -284,7 +287,7 @@ class DataInserter extends EventEmitter {
await this.wsEventEmitter
.emitSyncingStep('CHECKING_NEW_PUBLIC_DATA')
const methodCollMap = await this.dataChecker
.checkNewPublicData()
.checkNewPublicData(this._auth)
await this.syncTempTablesManager
.createTempDBStructureForCurrSync(methodCollMap)
const size = methodCollMap.size
Expand All @@ -302,6 +305,12 @@ class DataInserter extends EventEmitter {

const { type, start } = schema ?? {}

if (schema.name === this.ALLOWED_COLLS.CANDLES) {
// Considers 10 reqs/min for candles
const leftTime = Math.floor((60 / 10) * start.length * 1000)
this.progress.setCandlesLeftTime(leftTime)
}

for (const syncUserStepData of start) {
if (isInsertableArrObj(schema?.type, { isPublic: true })) {
await this._insertApiData(
Expand Down Expand Up @@ -417,7 +426,14 @@ class DataInserter extends EventEmitter {
return
}

const { userId, subUserId } = this._getUserIds(auth)
const hasCandlesSection = schema.name === this.ALLOWED_COLLS.CANDLES
const _auth = (
hasCandlesSection &&
syncUserStepData?.auth
)
? syncUserStepData.auth
: auth
const { userId, subUserId } = this._getUserIds(_auth)
await this.syncUserStepManager.updateOrInsertSyncInfoForCurrColl({
collName: methodApi,
userId,
Expand All @@ -436,7 +452,6 @@ class DataInserter extends EventEmitter {
hasTimeframe,
areAllSymbolsRequired
} = syncUserStepData
const hasCandlesSection = schema.name === this.ALLOWED_COLLS.CANDLES

const params = {}

Expand Down Expand Up @@ -926,19 +941,47 @@ class DataInserter extends EventEmitter {
continue
}

const promise = this.syncUserStepManager.updateOrInsertSyncInfoForCurrColl({
collName,
syncedAt,
...this.syncUserStepManager.wereStepsSynced(
schema.start,
{
shouldNotMtsBeChecked: isUpdatable(schema?.type),
shouldStartMtsBeChecked: schema?.name === this.ALLOWED_COLLS.STATUS_MESSAGES
}
)
}, { doNotQueueQuery: true })
const startWithAuth = schema.start
.filter((syncUserStepData) => syncUserStepData?.auth)
const startWithoutAuth = schema.start
.filter((syncUserStepData) => !syncUserStepData?.auth)

if (startWithAuth.length > 0) {
for (const syncUserStepData of startWithAuth) {
const { userId, subUserId } = this._getUserIds(syncUserStepData.auth)

const promise = this.syncUserStepManager.updateOrInsertSyncInfoForCurrColl({
collName,
userId,
subUserId,
syncedAt,
...this.syncUserStepManager.wereStepsSynced(
[syncUserStepData],
{
shouldNotMtsBeChecked: isUpdatable(schema?.type),
shouldStartMtsBeChecked: schema?.name === this.ALLOWED_COLLS.STATUS_MESSAGES
}
)
}, { doNotQueueQuery: true })

updatesForPubCollsPromises.push(promise)
}
}
if (startWithoutAuth.length > 0) {
const promise = this.syncUserStepManager.updateOrInsertSyncInfoForCurrColl({
collName,
syncedAt,
...this.syncUserStepManager.wereStepsSynced(
schema.start,
{
shouldNotMtsBeChecked: isUpdatable(schema?.type),
shouldStartMtsBeChecked: schema?.name === this.ALLOWED_COLLS.STATUS_MESSAGES
}
)
}, { doNotQueueQuery: true })

updatesForPubCollsPromises.push(promise)
updatesForPubCollsPromises.push(promise)
}
}

await Promise.all(updatesForPubCollsPromises)
Expand Down
Loading