feat: integrate BullMQ for query processing and add QueueModule

This commit is contained in:
Boris D
2025-10-06 16:32:28 +03:00
parent 39c40013b2
commit 67099969db
9 changed files with 208 additions and 14 deletions

View File

@ -1,11 +1,11 @@
import { Module } from "@nestjs/common";
import { ApiModule } from "../api/api.module";
import { QueryModule } from "src/query/query.module";
import { ProjectModule } from "src/project/project.module";
import { RedisModule } from "src/redis/redis.module";
import { DatabaseModule } from "src/database/database.module";
import { DatabaseManagerModule } from "src/databaseManager/database.manager.module";
import { RedisManagerModule } from "src/redisManager/redisManager.module";
import { QueueModule } from "src/queue/queue.module";
@Module({
imports: [
@ -14,8 +14,8 @@ import { RedisManagerModule } from "src/redisManager/redisManager.module";
DatabaseManagerModule,
ApiModule,
ProjectModule,
QueryModule,
RedisManagerModule,
QueueModule,
],
controllers: [],
providers: [],

View File

@ -45,7 +45,7 @@ export class CommandController {
@Headers() headers: Record<string, any>,
@Res() res: Response
) {
const queryResult = await this.queryExecuterService.runQuery(
const queryResult = await this.queryExecuterService.runQueryQueued(
token,
query,
headers

View File

@ -10,14 +10,22 @@ import {
registeredPlugins,
} from "src/vm/vm.constants";
import { DatabaseManagerService } from "src/databaseManager/database/database.manager.service";
import { InjectQueue } from "@nestjs/bullmq";
import { QUEUE_NAMES } from "src/queue/constants";
import { Queue, QueueEvents } from "bullmq";
@Injectable()
export class QueryExecuterService {
private queueEvents: QueueEvents;
constructor(
@InjectRepository(Query)
readonly queryRepository: Repository<Query>,
readonly databaseManagerService: DatabaseManagerService
) {}
readonly databaseManagerService: DatabaseManagerService,
@InjectQueue(QUEUE_NAMES.QUERY) private queryQueue: Queue
) {
this.queueEvents = new QueueEvents(this.queryQueue.name);
}
parseImports(source: string): string[] {
const importRegex =
@ -38,6 +46,21 @@ export class QueryExecuterService {
.trim();
}
async runQueryQueued(
token: string,
queryData: any,
headers: Record<string, any> = {}
): Promise<QueryResponse> {
const job = await this.queryQueue.add(`${new Date().getTime()}_${token}`, {
token,
queryData,
headers,
});
const result = await job.waitUntilFinished(this.queueEvents);
return result;
}
async runQuery(
token: string,
queryData: any,
@ -120,4 +143,8 @@ export class QueryExecuterService {
)
);
}
async onModuleDestroy() {
await this.queueEvents.close();
}
}

View File

@ -45,7 +45,7 @@ export class QueryController {
@Headers() headers: Record<string, any>,
@Res() res: Response
) {
const queryResult = await this.queryExecuterService.runQuery(
const queryResult = await this.queryExecuterService.runQueryQueued(
token,
query,
headers

3
src/queue/constants.ts Normal file
View File

@ -0,0 +1,3 @@
export const QUEUE_NAMES = {
QUERY: "query-queue",
};

View File

@ -0,0 +1,23 @@
import { Processor, WorkerHost } from "@nestjs/bullmq";
import { Job } from "bullmq";
import { QueryExecuterService } from "src/query/executer/query.executer.service";
import { QUEUE_NAMES } from "../constants";
export interface QueryJob {
token: string;
queryData: any;
headers: Record<string, any>;
}
@Processor(QUEUE_NAMES.QUERY, { concurrency: 5 })
export class QueryProcessor extends WorkerHost {
constructor(private readonly queryExecuterService: QueryExecuterService) {
super();
}
async process(job: Job<QueryJob>) {
const { token, queryData, headers } = job.data;
return await this.queryExecuterService.runQuery(token, queryData, headers);
}
}

21
src/queue/queue.module.ts Normal file
View File

@ -0,0 +1,21 @@
import { BullModule } from "@nestjs/bullmq";
import { forwardRef, Module } from "@nestjs/common";
import { QueryProcessor } from "./processors/query.processor";
import { QUEUE_NAMES } from "./constants";
import { QueryModule } from "src/query/query.module";
@Module({
imports: [
forwardRef(() => QueryModule),
BullModule.forRoot({
connection: {
host: process.env.REDIS_HOST || "localhost",
port: parseInt(process.env.REDIS_PORT) || 6379,
},
}),
BullModule.registerQueue({ name: QUEUE_NAMES.QUERY }),
],
providers: [QueryProcessor],
exports: [BullModule],
})
export class QueueModule {}