fix(basic.gblib): Fix Scheduling.

This commit is contained in:
Rodrigo Rodriguez 2024-03-16 21:36:03 -03:00
parent 359c1beb02
commit bb9d8c91e6
10 changed files with 161 additions and 70 deletions

2
.vscode/launch.json vendored
View file

@ -6,7 +6,7 @@
"request": "launch",
"sourceMaps": true,
"name": "Debug Program",
"runtimeExecutable": "/root/.nvm/versions/node/v19.9.0/bin/node",
"runtimeExecutable": "node",
"program": "${workspaceRoot}/boot.mjs",
"cwd": "${workspaceRoot}",
"env": {

View file

@ -9,8 +9,6 @@ import pjson from './package.json' assert { type: 'json' };
// Displays version of Node JS being used at runtime and others attributes.
process.stdout.write(`General Bots. BotServer@${pjson.version}, botlib@${pjson.dependencies.botlib}, botbuilder@${pjson.dependencies.botbuilder}, node@${process.version.replace('v', '')}, ${process.platform} ${process.arch} `);
os.setPriority(process.pid, os.constants.priority.PRIORITY_HIGHEST);
console.log(`\nLoading virtual machine source code files...`);
var __dirname = process.env.PWD || process.cwd();

View file

@ -135,6 +135,7 @@
"ffmpeg-static": "5.1.0",
"google-libphonenumber": "3.2.31",
"googleapis": "126.0.1",
"hnswlib-node": "^1.4.2",
"ibm-watson": "7.1.2",
"iso-639-1": "3.1.1",
"join-images-updated": "1.1.4",
@ -149,6 +150,7 @@
"language-tags": "^1.0.9",
"line-replace": "2.0.1",
"lodash": "4.17.21",
"lunary": "^0.6.16",
"luxon": "3.1.0",
"mammoth": "1.7.0",
"mariadb": "3.2.2",

View file

@ -1346,7 +1346,7 @@ export class DialogKeywords {
public static async getProcessInfo(pid: number) {
const proc = GBServer.globals.processes[pid];
const step = proc.step;
const min = GBServer.globals.minInstances.filter(p => p.instance.instanceId == proc.instanceId)[0];
const sec = new SecService();
const user = await sec.getUserFromId(min.instance.instanceId, proc.userId);
@ -1355,7 +1355,8 @@ export class DialogKeywords {
min,
user,
params,
proc
proc,
step
};
}
@ -1363,8 +1364,8 @@ export class DialogKeywords {
* Talks to the user by using the specified text.
*/
public async talk({ pid, text }) {
GBLog.info(`BASIC: TALK '${text}'.`);
const { min, user } = await DialogKeywords.getProcessInfo(pid);
const { min, user, step } = await DialogKeywords.getProcessInfo(pid);
GBLog.info(`BASIC: TALK '${text} step:${step}'.`);
if (user) {
// TODO: const translate = user ? user.basicOptions.translatorOn : false;
@ -1376,7 +1377,12 @@ export class DialogKeywords {
);
GBLog.verbose(`Translated text(playMarkdown): ${text}.`);
await min.conversationalService['sendOnConversation'](min, user, text);
if (step){
await min.conversationalService.sendText(min, step, text);
}
else{
await min.conversationalService['sendOnConversation'](min, user, text);
}
}
return { status: 0 };
}

View file

@ -0,0 +1,19 @@
import { GBVMService } from './GBVMService';
import { expect, test } from 'vitest'
test('Default', () => {
const args = GBVMService.getSetScheduleKeywordArgs(`
SET SCHEDULE "0 0 */1 * * *"
SET SCHEDULE "0 0 */3 * * *"
SET SCHEDULE "0 0 */2 * * *"
SET SCHEDULE "0 0 */2 * * *"
SET SCHEDULE "0 0 */3 * * *"
`);
expect(args.length).toBe(5);
});

View file

@ -128,13 +128,19 @@ export class GBVMService extends GBService {
// Pre process SET SCHEDULE calls.
const schedule = GBVMService.getSetScheduleKeywordArgs(text);
const schedules = GBVMService.getSetScheduleKeywordArgs(text);
const s = new ScheduleServices();
if (schedule) {
await s.createOrUpdateSchedule(min, schedule, mainName);
} else {
await s.deleteScheduleIfAny(min, mainName);
}
await s.deleteScheduleIfAny(min, mainName);
let i = 1;
await CollectionUtil.asyncForEach(schedules, async (syntax) => {
if (s) {
await s.createOrUpdateSchedule(min, syntax, `${mainName};${i++}`);
}
});
text = text.replace(/^\s*SET SCHEDULE (.*)/gim, '');
// Write VBS file without pragma keywords.
@ -687,13 +693,27 @@ export class GBVMService extends GBService {
return mainName.toLowerCase();
}
public static getSetScheduleKeywordArgs(code: string) {
if (!code) return null;
const keyword = /^\s*SET SCHEDULE (.*)/gim;
const result = keyword.exec(code);
return result ? result[1].replace(/\`/, '') : null;
}
public static getSetScheduleKeywordArgs(code) {
if (!code) return [];
const lines = code.split(/\n/);
const results = [];
lines.forEach(line => {
if (line.trim()) {
console.log(line);
const keyword = /\s*SET SCHEDULE (.*)/gi;
let result: any = keyword.exec(line);
if (result) {
result = result[1].replace(/\`|\"|\'/, '')
result = result.trim();
results.push(result);
}
}
});
return results;
}
private async getTextFromWord(folder: string, filename: string) {
return new Promise<string>(async (resolve, reject) => {
const path = urlJoin(folder, filename);
@ -1141,7 +1161,7 @@ export class GBVMService extends GBService {
}
public static createProcessInfo(user: GuaribasUser, min: GBMinInstance, channel: any, executable: string) {
public static createProcessInfo(user: GuaribasUser, min: GBMinInstance, channel: any, executable: string, step = null) {
const pid = GBAdminService.getNumberIdentifier();
GBServer.globals.processes[pid] = {
pid: pid,
@ -1149,6 +1169,7 @@ export class GBVMService extends GBService {
instanceId: min.instance.instanceId,
channel: channel,
roles: 'everyone',
step: step,
executable: executable
};
return pid;

View file

@ -47,30 +47,38 @@ import { GBLogEx } from '../../core.gbapp/services/GBLogEx.js';
* Basic services for BASIC manipulation.
*/
export class ScheduleServices extends GBService {
public async deleteScheduleIfAny (min: GBMinInstance, name: string) {
const task = min['scheduleMap'] ? min['scheduleMap'][name] : null;
public async deleteScheduleIfAny(min: GBMinInstance, name: string) {
if (task) {
task.destroy();
delete min['scheduleMap'][name];
}
let i = 1;
while (i <= 10) {
const task = min['scheduleMap'] ? min['scheduleMap'][name + i] : null;
const count = await GuaribasSchedule.destroy({
where: {
instanceId: min.instance.instanceId,
name: name
if (task) {
task.destroy();
const id = `${name};${i}`;
delete min['scheduleMap'][id];
const count = await GuaribasSchedule.destroy({
where: {
instanceId: min.instance.instanceId,
name: id
}
});
if (count > 0) {
GBLogEx.info(min, `BASIC: Removed ${name} SET SCHEDULE and ${count} rows from storage on: ${min.botId}...`);
}
}
});
i++;
if (count > 0) {
GBLogEx.info(min,`BASIC: Removed ${name} SET SCHEDULE and ${count} rows from storage on: ${min.botId}...`);
}
}
/**
* Finds and update user agent information to a next available person.
*/
public async createOrUpdateSchedule (min: GBMinInstance, schedule: string, name: string): Promise<GuaribasSchedule> {
public async createOrUpdateSchedule(min: GBMinInstance, schedule: string, name: string): Promise<GuaribasSchedule> {
let record = await GuaribasSchedule.findOne({
where: {
instanceId: min.instance.instanceId,
@ -97,11 +105,22 @@ export class ScheduleServices extends GBService {
/**
* Load all cached schedule from BASIC SET SCHEDULE keyword.
*/
public async scheduleAll () {
public async scheduleAll() {
let schedules;
try {
schedules = await GuaribasSchedule.findAll();
await CollectionUtil.asyncForEach(schedules, async item => {
let i = 0;
let lastName = '';
await CollectionUtil.asyncForEach(schedules, async (item) => {
if (item.name === lastName) {
item.name = item.name + ++i;
}
else {
i = 0;
}
let min: GBMinInstance = GBServer.globals.minInstances.filter(
p => p.instance.instanceId === item.instanceId
)[0];
@ -116,8 +135,8 @@ export class ScheduleServices extends GBService {
return schedules;
}
private ScheduleItem (item: GuaribasSchedule, min: GBMinInstance) {
GBLogEx.info(min,`Scheduling ${item.name} on ${min.botId}...`);
private ScheduleItem(item: GuaribasSchedule, min: GBMinInstance) {
GBLogEx.info(min, `Scheduling ${item.name} on ${min.botId}...`);
try {
const options = {
scheduled: true,
@ -134,12 +153,12 @@ export class ScheduleServices extends GBService {
item.schedule,
function () {
const finalData = async () => {
let script = item.name;
let script = item.name.split(';')[0];
let min: GBMinInstance = GBServer.globals.minInstances.filter(
p => p.instance.instanceId === item.instanceId
)[0];
GBLogEx.info(min,`Running .gbdialog word ${item.name} on:${item.schedule}...`);
GBLogEx.info(min, `Running .gbdialog word ${item.name} on:${item.schedule}...`);
const pid = GBVMService.createProcessInfo(null, min, 'batch', null);
await GBVMService.callVM(script, min, null, pid);
};
@ -149,9 +168,9 @@ export class ScheduleServices extends GBService {
},
options
);
} catch (error) {
GBLogEx.error(min,`Running .gbdialog word ${item.name} : ${error}...`);
GBLogEx.error(min, `Running .gbdialog word ${item.name} : ${error}...`);
}
}
}

View file

@ -1443,9 +1443,6 @@ export class SystemKeywords {
public async setSystemPrompt({ pid, text }) {
let { min, user } = await DialogKeywords.getProcessInfo(pid);
const sec = new SecService();
if (user) {
ChatServices.userSystemPrompt[user.userSystemId] = text;

View file

@ -1027,7 +1027,7 @@ export class GBMinService {
let pid = step.context.activity['pid'];
if (!pid) {
pid = GBVMService.createProcessInfo(user, min, step.context.activity.channelId, null);
pid = GBVMService.createProcessInfo(user, min, step.context.activity.channelId, null, step);
}
step.context.activity['pid'] = pid;

View file

@ -46,13 +46,15 @@ import { DialogKeywords } from '../../basic.gblib/services/DialogKeywords.js';
import { GBVMService } from '../../basic.gblib/services/GBVMService.js';
import { GBConfigService } from '../../core.gbapp/services/GBConfigService.js';
import { GuaribasSubject } from '../../kb.gbapp/models/index.js';
import { z } from "zod";
import { Serialized } from "@langchain/core/load/serializable";
import { BaseCallbackHandler } from "@langchain/core/callbacks/base";
import { DynamicStructuredTool } from "@langchain/core/tools";
import {
BaseLLMOutputParser,
OutputParserException,
} from "@langchain/core/output_parsers";
import { ChatGeneration, Generation } from "@langchain/core/outputs";
import { LunaryHandler } from "@langchain/community/callbacks/handlers/lunary";
export interface CustomOutputParserFields { }
export type ExpectedOutput = string;
@ -63,6 +65,28 @@ function isChatGeneration(
return "message" in llmOutput;
}
class CustomHandler extends BaseCallbackHandler {
name = "custom_handler";
handleLLMNewToken(token: string) {
GBLog.info(`LLM: token: ${JSON.stringify(token)}`);
}
handleLLMStart(llm: Serialized, _prompts: string[]) {
GBLog.info(`LLM: handleLLMStart ${JSON.stringify(llm)}, Prompts: ${_prompts.join('\n')}`);
}
handleChainStart(chain: Serialized) {
GBLog.info(`LLM: handleChainStart: ${JSON.stringify(chain)}`);
}
handleToolStart(tool: Serialized) {
GBLog.info(`LLM: handleToolStart: ${JSON.stringify(tool)}`);
}
}
const logHandler = new CustomHandler();
export class CustomLLMOutputParser extends BaseLLMOutputParser<ExpectedOutput> {
lc_namespace = ["langchain", "output_parsers"];
@ -105,7 +129,7 @@ export class ChatServices {
private static async getRelevantContext(
vectorStore: HNSWLib,
sanitizedQuestion: string,
numDocuments: number
numDocuments: number = 10
): Promise<string> {
if (sanitizedQuestion === '') {
return '';
@ -149,7 +173,10 @@ export class ChatServices {
'Default Content Language',
GBConfigService.get('DEFAULT_CONTENT_LANGUAGE')
);
const LLMMode = min.core.getParam(
min.instance,
'Answer Mode', 'direct'
);
const docsContext = min['vectorStore'];
if (!this.memoryMap[user.userSystemId]) {
@ -167,6 +194,7 @@ export class ChatServices {
openAIApiKey: process.env.OPENAI_API_KEY,
modelName: "gpt-3.5-turbo-0125",
temperature: 0,
callbacks: [logHandler],
});
@ -206,9 +234,13 @@ export class ChatServices {
const combineDocumentsPrompt = ChatPromptTemplate.fromMessages([
AIMessagePromptTemplate.fromTemplate(
`
\n\n{context}\n\n
And using \n\n{chat_history}\n\n
rephrase the answer to the user using this context already spoken.
If you don't know the answer, just say that you don't know, don't try to make up an answer.
Use the following pieces, if any, of context to answer the question at the end.
\n\n{context}\n\n
`
),
new MessagesPlaceholder("chat_history"),
@ -239,14 +271,14 @@ export class ChatServices {
const combineDocumentsChain = RunnableSequence.from([
{
question: (output: string) => output,
question: (question: string) => question,
chat_history: async () => {
const { chat_history } = await memory.loadMemoryVariables({});
return chat_history;
},
context: async (output: string) => {
const c = await ChatServices.getRelevantContext(docsContext, output, 1);
return c ?? 'answer just with user question.';
const c = await ChatServices.getRelevantContext(docsContext, output);
return `${systemPrompt} \n ${c ? 'Use this context to answer:\n' + c: 'answer just with user question.'}`;
},
},
@ -269,28 +301,23 @@ export class ChatServices {
new StringOutputParser()
]);
const directChain = RunnableSequence.from([
const conversationalToolChain = RunnableSequence.from([
{
question: (i: { question: string }) => {
return `
${systemPrompt}
${i.question}`
},
question: (i: { question: string }) => i.question,
chat_history: async () => {
const { chat_history } = await memory.loadMemoryVariables({});
return chat_history;
},
},
questionGeneratorTemplate,
modelWithTools,
new CustomLLMOutputParser(callToolChain, docsContext?.docstore?._docs.length > 0 ? combineDocumentsChain : null),
new StringOutputParser()
]);
const direct = true;
let result;
if (direct) {
if (LLMMode === "direct") {
result = await (tools.length > 0 ? modelWithTools : model).invoke(`
${systemPrompt}
@ -298,16 +325,18 @@ export class ChatServices {
result = result.content;
}
else {
else if (LLMMode === "document") {
result = await combineDocumentsChain.invoke(question);
result = await (directChain ?? conversationalQaChain).invoke({
} else if (LLMMode === "function") {
result = await conversationalToolChain.invoke({
question,
});
}
else {
GBLog.info(`Invalid Answer Mode in Config.xlsx: ${LLMMode}.`);
}
await memory.saveContext(
{