Files
few-line-engine/src/query/executer/query.executer.service.ts

199 lines
5.3 KiB
TypeScript

import { RedisNodeService } from "./../../redisManager/redisNode/redis.node.service";
import { Inject, Injectable } from "@nestjs/common";
import { InjectRepository } from "@nestjs/typeorm";
import { Query } from "../entities/query.entity";
import { Repository } from "typeorm";
import { Vm } from "../../vm/vm.class";
import { VModule } from "src/vm/module.class";
import {
QueryResponse,
registeredModules,
registeredPlugins,
} from "src/vm/vm.constants";
import { DatabaseManagerService } from "src/databaseManager/database/database.manager.service";
import { InjectQueue } from "@nestjs/bullmq";
import { QUEUE_NAMES } from "src/queue/constants";
import { Queue, QueueEvents } from "bullmq";
import { FunctionService } from "src/query/function/function.service";
import { SessionService } from "../session/session.service";
import { TLog } from "../logger/logger.types";
@Injectable()
export class QueryExecuterService {
private queueEvents: QueueEvents;
constructor(
@InjectRepository(Query)
readonly queryRepository: Repository<Query>,
@Inject(FunctionService)
private readonly functionService: FunctionService,
readonly databaseManagerService: DatabaseManagerService,
readonly redisNodeService: RedisNodeService,
readonly sessionService: SessionService,
@InjectQueue(QUEUE_NAMES.QUERY) private queryQueue: Queue
) {
this.queueEvents = new QueueEvents(this.queryQueue.name);
}
parseImports(source: string): string[] {
const importRegex =
/import\s+(?:[\w*\s{},]*\s+from\s+)?["']([^"']+)["'];?/g;
const imports: string[] = [];
let match: RegExpExecArray | null;
while ((match = importRegex.exec(source)) !== null) {
imports.push(match[1]);
}
return imports;
}
clearImports(source: string): string {
return source
.replace(/import\s+(?:[\w*\s{},]*\s+from\s+)?["']([^"']+)["'];?/g, "")
.trim();
}
async runQueryQueued(
token: string,
queryData: any,
log: TLog,
headers: Record<string, any> = {},
cookies: Record<string, any> = {}
): Promise<QueryResponse> {
const job = await this.queryQueue.add(
`${new Date().getTime()}_${token}`,
{
token,
queryData,
headers,
cookies,
log,
},
{
removeOnComplete: true,
removeOnFail: true,
attempts: 0,
}
);
const result = await job.waitUntilFinished(this.queueEvents);
return result;
}
async runQuery(
token: string,
queryData: any,
headers: Record<string, any> = {},
cookies: Record<string, any> = {},
log: TLog = null
): Promise<QueryResponse> {
const query = await this.queryRepository.findOne({
where: { id: token },
relations: ["project"],
});
if (!query) {
throw new Error("Query not found");
}
const sessionId = cookies["x-session-id"] || null;
if (!sessionId) {
const session = await this.sessionService.create(query.project.id);
cookies["x-session-id"] = session.sessionId;
}
const vm = await this.createVm(query, cookies["x-session-id"]);
const result = await vm.runScript(
this.clearImports(query.source),
queryData,
headers,
log
);
if (!this.checkResponse(result)) {
throw new Error(`Error initializing VM: ${JSON.stringify(result)}`);
}
if (!result?.cookies || !result?.cookies["x-session-id"]) {
if (result.cookies === undefined) {
result.cookies = {};
}
result.cookies["x-session-id"] = (
await this.sessionService.get(cookies["x-session-id"], query.project.id)
).sessionId;
}
return result;
}
private async createVm(query: Query, sessionId: string = null) {
const imports = this.parseImports(query.source);
const importsParsed = imports.map((imp) => {
const item = imp.split("/");
return {
type: item[0],
name: item[1],
};
});
const moduleNames = importsParsed.filter((imp) => imp.type === "module");
const pluginNames = importsParsed.filter((imp) => imp.type === "plugin");
const functionNames = importsParsed.filter(
(imp) => imp.type === "function"
);
const functions = (
await this.functionService.findByNames(
query.project.id,
functionNames.map((fn) => fn.name)
)
).map((fn) => fn.source);
const modules = moduleNames.map((mod) => {
if (registeredModules[mod.name]) {
return new VModule(mod.name, registeredModules[mod.name]);
}
throw new Error(`Module ${mod.name} not found`);
});
const plugins = [];
for (const plugin of pluginNames) {
if (!registeredPlugins[plugin.name]) {
throw new Error(`Plugin ${plugin.name} not found`);
}
plugins.push(
await registeredPlugins[plugin.name](this, query, sessionId)
);
}
const vm = new Vm({
memoryLimit: 128,
timeLimit: BigInt(100e9),
cpuTimeLimit: BigInt(5e9),
modules: modules,
plugins: plugins,
functions: functions,
});
return await vm.init();
}
private checkResponse(obj: any): boolean {
if (obj?.statusCode && obj.response && typeof obj.statusCode === "number") {
return true;
}
return false;
}
async onModuleDestroy() {
await this.queueEvents.close();
}
}