diff --git a/package.json b/package.json index ff3e1c5..d714127 100644 --- a/package.json +++ b/package.json @@ -11,11 +11,13 @@ "dependencies": { "@crustio/type-definitions": "1.1.0", "@polkadot/api": "4.2.2-4", + "@types/express": "^4.17.13", "@types/seedrandom": "^2.4.28", "axios": "^0.21.1", "bignumber.js": "^9.0.1", "bluebird": "^3.7.2", "dayjs": "^1.10.6", + "express": "^4.17.2", "fs-extra": "^9.1.0", "ipfs-http-client": "^48.1.3", "joi": "^17.4.1", diff --git a/src/chain/index.ts b/src/chain/index.ts index 66a320b..d7ed213 100644 --- a/src/chain/index.ts +++ b/src/chain/index.ts @@ -313,7 +313,7 @@ export default class CrustApi { * @returns Option * @throws ApiPromise error or type conversing error */ - async maybeGetFileUsedInfo(cid: string): Promise { + public async maybeGetFileUsedInfo(cid: string): Promise { await this.withApiReady(); try { diff --git a/src/http/http-interface.ts b/src/http/http-interface.ts new file mode 100644 index 0000000..c317030 --- /dev/null +++ b/src/http/http-interface.ts @@ -0,0 +1,67 @@ +import express from 'express'; +import { Logger } from 'winston'; +import { AppContext } from '../types/context'; +import * as bodyParser from 'body-parser'; +import { FileInfo } from '../chain'; +import { createFileOrderOperator } from '../db/file-record'; +import { bytesToMb } from '../utils'; +import { ChainFileInfo } from '../types/chain'; +import BigNumber from 'bignumber.js'; + +export async function startHttp( + context: AppContext, + loggerParent: Logger, +): Promise { + const logger = loggerParent.child({ moduleId: "http" }); + const app = express(); + const PORT = 42087; + const fileOrderOp = createFileOrderOperator(context.database); + + app.use(bodyParser.json({ limit: '50mb' })); + app.use(bodyParser.urlencoded({ limit: '50mb', extended: true })); + app.use(bodyParser.json()); + + logger.info("Configure smanager interface"); + + app.post('/api/v0/insert', async (req, res) => { + try { + const cid = req.body['cid']; + if (!cid) { + return res.status(400).send('please provide cid in the request'); + } + logger.info("try to insert wanted file: %s", cid); + + const file: any = await context.api.chainApi().query.market.files(cid); // eslint-disable-line + if (file.isEmpty) { + logger.warn('wanted file %s not exist on chain', cid); + return res.status(400).send(`wanted file ${cid} not exist on chain`); + } + + const fi = file.toJSON() as any; // eslint-disable-line + const fileInfo = { + ...fi, + amount: new BigNumber(fi.amount.toString()), + } as ChainFileInfo; + + logger.info(`wanted file chain info: ${JSON.stringify(fileInfo)}`); + + const sfi: FileInfo[] = [{ + cid, + size: bytesToMb(fileInfo.file_size), + tips: fileInfo.amount.toNumber(), + owner: null, + replicas: fileInfo.reported_replica_count, + expiredAt: fileInfo.expired_at, + }]; + + await fileOrderOp.addFiles(sfi, 'wanted', true); + return res.status(200).send(`insert wanted file: ${cid} success`); + } catch (e) { + return res.status(500).send("internal server error"); + } + }); + + app.listen(PORT, () => { + logger.info(`Smanager interface run on http://localhost:${PORT}`); + }); +} diff --git a/src/main.ts b/src/main.ts index 7a9c084..9290a2c 100644 --- a/src/main.ts +++ b/src/main.ts @@ -15,6 +15,7 @@ import { SimpleTask, Task } from './types/tasks'; import { Dayjs } from './utils/datetime'; import { logger } from './utils/logger'; import { timeout, timeoutOrError } from './utils/promise-utils'; +import { startHttp } from './http/http-interface'; const MaxTickTimout = 15 * 1000; const IpfsTimeout = 8000 * 1000; // 8000s @@ -78,6 +79,10 @@ async function main() { // start tasks _.forEach(simpleTasks, (t) => t.start(context)); _.forEach(tasks, (t) => t.start(context)); + + // start http + startHttp(context, logger); + // start event loop after chain is synced await doEventLoop(context, tasks); } catch (e) { diff --git a/src/tasks/pull-scheduler-task.ts b/src/tasks/pull-scheduler-task.ts index 44b5e8e..175ed81 100644 --- a/src/tasks/pull-scheduler-task.ts +++ b/src/tasks/pull-scheduler-task.ts @@ -121,6 +121,7 @@ async function handlePulling( await fileOrderOps.updateFileInfoStatus(record.id, 'insufficient_space'); continue; } + await sealFile( context, logger, @@ -218,6 +219,16 @@ async function getOneFileByStrategy( ): Promise { const { strategy } = options; do { + // Accapt all wanted records + const wantedRecord = await getWantedPendingFile(fileOrderOps, options); + if (wantedRecord) { + if (await isSealDone(wantedRecord.cid, context.sworkerApi, logger)) { + await fileOrderOps.updateFileInfoStatus(wantedRecord.id, 'handled'); + return null; + } + return wantedRecord; + } + const record = await getPendingFile(fileOrderOps, options); if (!record) { return null; @@ -284,6 +295,25 @@ async function getFreeSpace(context: AppContext): Promise<[number, number]> { return [gbToMb(freeGBSize), gbToMb(sysFreeGBSize)]; } +async function getWantedPendingFile( + fileOrderOps: DbOrderOperator, + sealOptions: SealOption, +): DbResult { + if (sealOptions.sealLarge) { + const record = await fileOrderOps.getPendingFileRecord('wanted', false); + if (record) { + return record; + } + + return await fileOrderOps.getPendingFileRecord('wanted', true); + } + + if (sealOptions.sealSmall) { + return await fileOrderOps.getPendingFileRecord('wanted', true); + } + return null; +} + async function getPendingFile( fileOrderOps: DbOrderOperator, sealOptions: SealOption, @@ -303,7 +333,7 @@ async function getPendingFile( } if (sealSmall) { - return getPendingFileByStrategy(fileOrderOps, strategy, true); + return await getPendingFileByStrategy(fileOrderOps, strategy, true); } return null; } @@ -374,7 +404,7 @@ export async function createPullSchedulerTask( const pullingInterval = 1 * 60 * 1000; // trival, period run it if there is no pending files in the db return makeIntervalTask( - 60 * 1000, + 70 * 1000, pullingInterval, 'files-pulling', context, diff --git a/src/tasks/seal-cleanup-task.ts b/src/tasks/seal-cleanup-task.ts index b1a14ca..6ad475e 100644 --- a/src/tasks/seal-cleanup-task.ts +++ b/src/tasks/seal-cleanup-task.ts @@ -65,7 +65,7 @@ export async function createSealCleanupTask( hours: 1, }).asMilliseconds(); return makeIntervalTask( - 60 * 1000, + 40 * 1000, cleanupInterval, 'seal-cleanup', context, diff --git a/src/tasks/seal-status-updater-task.ts b/src/tasks/seal-status-updater-task.ts index 3ff62fb..3316baf 100644 --- a/src/tasks/seal-status-updater-task.ts +++ b/src/tasks/seal-status-updater-task.ts @@ -112,7 +112,7 @@ export async function createSealStatuUpdater( ): Promise { const sealStatusUpdateInterval = 2 * 60 * 1000; // update seal status every 2 minutes return makeIntervalTask( - 1 * 60 * 1000, + 50 * 1000, sealStatusUpdateInterval, 'seal-updater', context, diff --git a/src/types/indexing.d.ts b/src/types/indexing.d.ts index 479a6ee..8fd5436 100644 --- a/src/types/indexing.d.ts +++ b/src/types/indexing.d.ts @@ -2,4 +2,5 @@ // indexers types // dbScan - indexer by scanning the market.Files map // chainEvent - indexer by subscribing to the latest storage orders events -export type Indexer = 'dbScan' | 'chainEvent'; +// wanted - trigger by user +export type Indexer = 'dbScan' | 'chainEvent' | 'wanted'; diff --git a/src/utils/index.ts b/src/utils/index.ts index 9f93e46..3a44d1b 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -12,10 +12,11 @@ export * as consts from './consts'; * Parse object into JSON object * @param o any object */ -export function parseObj(o: unknown): T { +export function parseObj(o: unknown): any { // eslint-disable-line if (typeof o !== 'string') { - return o as T; + return JSON.parse(JSON.stringify(o)); } + return JSON.parse(o); }