这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions collector/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const { ACCEPTED_MIMES } = require("./utils/constants");
const { reqBody } = require("./utils/http");
const { processSingleFile } = require("./processSingleFile");
const { processLink } = require("./processLink");
const { wipeCollectorStorage } = require("./utils/files");
const app = express();

app.use(cors({ origin: true }));
Expand Down Expand Up @@ -66,6 +67,7 @@ app.all("*", function (_, response) {

app
.listen(8888, async () => {
await wipeCollectorStorage();
console.log(`Document processor app listening on port 8888`);
})
.on("error", function (_) {
Expand Down
7 changes: 5 additions & 2 deletions collector/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@
"node": ">=18.12.1"
},
"scripts": {
"dev": "NODE_ENV=development nodemon --trace-warnings index.js",
"dev": "NODE_ENV=development nodemon --ignore hotdir --ignore storage --trace-warnings index.js",
"start": "NODE_ENV=production node index.js",
"lint": "yarn prettier --write ./processSingleFile ./processLink ./utils index.js"
},
"dependencies": {
"@googleapis/youtube": "^9.0.0",
"@xenova/transformers": "^2.11.0",
"bcrypt": "^5.1.0",
"body-parser": "^1.20.2",
"cors": "^2.8.5",
"dotenv": "^16.0.3",
"express": "^4.18.2",
"extract-zip": "^2.0.1",
"fluent-ffmpeg": "^2.1.2",
"js-tiktoken": "^1.0.8",
"langchain": "0.0.201",
"mammoth": "^1.6.0",
Expand All @@ -33,7 +35,8 @@
"pdf-parse": "^1.1.1",
"puppeteer": "^21.6.1",
"slugify": "^1.6.6",
"uuid": "^9.0.0"
"uuid": "^9.0.0",
"wavefile": "^11.0.0"
},
"devDependencies": {
"nodemon": "^2.0.22",
Expand Down
146 changes: 146 additions & 0 deletions collector/processSingleFile/convert/asAudio.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
const fs = require("fs");
const path = require("path");
const { v4 } = require("uuid");
const {
createdDate,
trashFile,
writeToServerDocuments,
} = require("../../utils/files");
const { tokenizeString } = require("../../utils/tokenizer");
const { default: slugify } = require("slugify");
const { LocalWhisper } = require("../../utils/WhisperProviders/localWhisper");

async function asAudio({ fullFilePath = "", filename = "" }) {
const whisper = new LocalWhisper();

console.log(`-- Working ${filename} --`);
const transcriberPromise = new Promise((resolve) =>
whisper.client().then((client) => resolve(client))
);
const audioDataPromise = new Promise((resolve) =>
convertToWavAudioData(fullFilePath).then((audioData) => resolve(audioData))
);
const [audioData, transcriber] = await Promise.all([
audioDataPromise,
transcriberPromise,
]);

if (!audioData) {
console.error(`Failed to parse content from ${filename}.`);
trashFile(fullFilePath);
return {
success: false,
reason: `Failed to parse content from ${filename}.`,
};
}

console.log(`[Model Working]: Transcribing audio data to text`);
const { text: content } = await transcriber(audioData, {
chunk_length_s: 30,
stride_length_s: 5,
});

if (!content.length) {
console.error(`Resulting text content was empty for ${filename}.`);
trashFile(fullFilePath);
return { success: false, reason: `No text content found in ${filename}.` };
}

data = {
id: v4(),
url: "file://" + fullFilePath,
title: filename,
docAuthor: "no author found",
description: "No description found.",
docSource: "pdf file uploaded by the user.",
chunkSource: filename,
published: createdDate(fullFilePath),
wordCount: content.split(" ").length,
pageContent: content,
token_count_estimate: tokenizeString(content).length,
};

writeToServerDocuments(data, `${slugify(filename)}-${data.id}`);
trashFile(fullFilePath);
console.log(
`[SUCCESS]: ${filename} transcribed, converted & ready for embedding.\n`
);
return { success: true, reason: null };
}

async function convertToWavAudioData(sourcePath) {
try {
let buffer;
const wavefile = require("wavefile");
const ffmpeg = require("fluent-ffmpeg");
const outFolder = path.resolve(__dirname, `../../storage/tmp`);
if (!fs.existsSync(outFolder)) fs.mkdirSync(outFolder, { recursive: true });

const fileExtension = path.extname(sourcePath).toLowerCase();
if (fileExtension !== ".wav") {
console.log(
`[Conversion Required] ${fileExtension} file detected - converting to .wav`
);
const outputFile = path.resolve(outFolder, `${v4()}.wav`);
const convert = new Promise((resolve) => {
ffmpeg(sourcePath)
.toFormat("wav")
.on("error", (error) => {
console.error(`[Conversion Error] ${error.message}`);
resolve(false);
})
.on("progress", (progress) =>
console.log(
`[Conversion Processing]: ${progress.targetSize}KB converted`
)
)
.on("end", () => {
console.log("[Conversion Complete]: File converted to .wav!");
resolve(true);
})
.save(outputFile);
});
const success = await convert;
if (!success)
throw new Error(
"[Conversion Failed]: Could not convert file to .wav format!"
);

const chunks = [];
const stream = fs.createReadStream(outputFile);
for await (let chunk of stream) chunks.push(chunk);
buffer = Buffer.concat(chunks);
fs.rmSync(outputFile);
} else {
const chunks = [];
const stream = fs.createReadStream(sourcePath);
for await (let chunk of stream) chunks.push(chunk);
buffer = Buffer.concat(chunks);
}

const wavFile = new wavefile.WaveFile(buffer);
wavFile.toBitDepth("32f");
wavFile.toSampleRate(16000);

let audioData = wavFile.getSamples();
if (Array.isArray(audioData)) {
if (audioData.length > 1) {
const SCALING_FACTOR = Math.sqrt(2);

// Merge channels into first channel to save memory
for (let i = 0; i < audioData[0].length; ++i) {
audioData[0][i] =
(SCALING_FACTOR * (audioData[0][i] + audioData[1][i])) / 2;
}
}
audioData = audioData[0];
}

return audioData;
} catch (error) {
console.error(`convertToWavAudioData`, error);
return null;
}
}

module.exports = asAudio;
2 changes: 2 additions & 0 deletions collector/storage/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
tmp/*
!tmp/.placeholder
Empty file.
59 changes: 59 additions & 0 deletions collector/utils/WhisperProviders/localWhisper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
const path = require("path");
const fs = require("fs");

class LocalWhisper {
constructor() {
// Model Card: https://huggingface.co/Xenova/whisper-small
this.model = "Xenova/whisper-small";
this.cacheDir = path.resolve(
process.env.STORAGE_DIR
? path.resolve(process.env.STORAGE_DIR, `models`)
: path.resolve(__dirname, `../../../server/storage/models`)
);

this.modelPath = path.resolve(this.cacheDir, "Xenova", "whisper-small");

// Make directory when it does not exist in existing installations
if (!fs.existsSync(this.cacheDir))
fs.mkdirSync(this.cacheDir, { recursive: true });
}

async client() {
if (!fs.existsSync(this.modelPath)) {
console.log(
"\x1b[34m[INFO]\x1b[0m The native whisper model has never been run and will be downloaded right now. Subsequent runs will be faster. (~250MB)\n\n"
);
}

try {
// Convert ESM to CommonJS via import so we can load this library.
const pipeline = (...args) =>
import("@xenova/transformers").then(({ pipeline }) =>
pipeline(...args)
);
return await pipeline("automatic-speech-recognition", this.model, {
cache_dir: this.cacheDir,
...(!fs.existsSync(this.modelPath)
? {
// Show download progress if we need to download any files
progress_callback: (data) => {
if (!data.hasOwnProperty("progress")) return;
console.log(
`\x1b[34m[Embedding - Downloading Model Files]\x1b[0m ${
data.file
} ${~~data?.progress}%`
);
},
}
: {}),
});
} catch (error) {
console.error("Failed to load the native whisper model:", error);
throw error;
}
}
}

module.exports = {
LocalWhisper,
};
50 changes: 0 additions & 50 deletions collector/utils/asDocx.js

This file was deleted.

11 changes: 11 additions & 0 deletions collector/utils/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ const ACCEPTED_MIMES = {

"application/pdf": [".pdf"],
"application/mbox": [".mbox"],

"audio/wav": [".wav"],
"audio/mpeg": [".mp3"],

"video/mp4": [".mp4"],
"video/mpeg": [".mpeg"],
};

const SUPPORTED_FILETYPE_CONVERTERS = {
Expand All @@ -31,6 +37,11 @@ const SUPPORTED_FILETYPE_CONVERTERS = {
".odp": "./convert/asOfficeMime.js",

".mbox": "./convert/asMbox.js",

".mp3": "./convert/asAudio.js",
".wav": "./convert/asAudio.js",
".mp4": "./convert/asAudio.js",
'.mpeg': "./convert/asAudio.js",
};

module.exports = {
Expand Down
40 changes: 40 additions & 0 deletions collector/utils/files/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,48 @@ function writeToServerDocuments(
return;
}

// When required we can wipe the entire collector hotdir and tmp storage in case
// there were some large file failures that we unable to be removed a reboot will
// force remove them.
async function wipeCollectorStorage() {
const cleanHotDir = new Promise((resolve) => {
const directory = path.resolve(__dirname, "../../hotdir");
fs.readdir(directory, (err, files) => {
if (err) resolve();

for (const file of files) {
if (file === "__HOTDIR__.md") continue;
try {
fs.rmSync(path.join(directory, file));
} catch {}
}
resolve();
});
});

const cleanTmpDir = new Promise((resolve) => {
const directory = path.resolve(__dirname, "../../storage/tmp");
fs.readdir(directory, (err, files) => {
if (err) resolve();

for (const file of files) {
if (file === ".placeholder") continue;
try {
fs.rmSync(path.join(directory, file));
} catch {}
}
resolve();
});
});

await Promise.all([cleanHotDir, cleanTmpDir]);
console.log(`Collector hot directory and tmp storage wiped!`);
return;
}

module.exports = {
trashFile,
createdDate,
writeToServerDocuments,
wipeCollectorStorage,
};
Loading