fix(basic.gblib): Databases. #392 @othonlima.

This commit is contained in:
Rodrigo Rodriguez 2023-11-30 15:47:47 -03:00
parent 4c64bddea9
commit 99a729b7f5
4 changed files with 117 additions and 45 deletions

View file

@ -86,6 +86,7 @@
"arraybuffer-to-buffer": "0.0.7",
"async-mutex": "0.4.0",
"async-promises": "0.2.3",
"async-retry": "^1.3.3",
"basic-auth": "2.0.1",
"billboard.js": "3.6.3",
"bluebird": "3.7.2",

View file

@ -586,6 +586,7 @@ export class GBVMService extends GBService {
// Closes handles if any.
await wa.closeHandles({pid: pid});
await sys.closeHandles({pid: pid});
})();
`;

View file

@ -64,6 +64,7 @@ import mime from 'mime-types';
import exts from '../../../extensions.json' assert { type: 'json' };
import { SecService } from '../../security.gbapp/services/SecService.js';
import { GBLogEx } from '../../core.gbapp/services/GBLogEx.js';
import retry from 'async-retry';
/**
* @fileoverview General Bots server core.
@ -327,6 +328,10 @@ export class SystemKeywords {
return { url, localName };
}
public async closeHandles({ pid }) {
delete this.cachedMerge[pid];
}
public async asPDF({ pid, data }) {
let file = await this.renderTable(pid, data, true, false);
return file;
@ -673,7 +678,18 @@ export class SystemKeywords {
i++;
});
return await definition.create(dst);
let item;
await retry(
async (bail) => {
item = await definition.create(dst);
},
{
retries: 5,
}
);
return item;
}
public async saveToStorageWithJSON({ pid, table, fieldsValues, fieldsNames }): Promise<any> {
@ -1594,22 +1610,43 @@ export class SystemKeywords {
if (qs) {
options['qs'] = qs;
}
let result;
await retry(
async (bail) => {
const result = await fetch(url, options);
result = await fetch(url, options);
if (result.status === 2000) {
if (result.status === 429) {
// Token expired.
const sleep = ms => {
return new Promise(resolve => {
setTimeout(resolve, ms);
});
};
GBLog.info(`Waiting 1min. before retrynig GET: ${url}.`);
sleep(60 * 1000);
throw new Error(`BASIC: TOO MANY REQUESTS retrying after 1(one) minute now: ${result.statusText}.`);
GBLog.info(`Expired Token for ${url}.`);
await DialogKeywords.setOption({ pid, name: `${proc.executable}-continuationToken`, value: null });
}
return null;
}
if (result.status != 200) {
throw new Error(`BASIC: GET ${result.status}: ${result.statusText}.`)
}
if (result.status === 2000) {
// Token expired.
await DialogKeywords.setOption({ pid, name: `${proc.executable}-continuationToken`, value: null });
bail(new Error(`Expired Token for ${url}.`));
}
if (result.status != 200) {
throw new Error(`BASIC: GET ${result.status}: ${result.statusText}.`);
}
},
{
retries: 5,
}
);
let res = JSON.parse(await result.text());
@ -1914,7 +1951,7 @@ export class SystemKeywords {
return letters;
}
private getTableFromName(file, min){
private getTableFromName(file, min) {
const minBoot = GBServer.globals.minBoot;
const parts = file.split('.');
const con = min[parts[0]];
@ -1926,6 +1963,8 @@ export class SystemKeywords {
}
private cachedMerge = {};
/**
* Merges a multi-value with a tabular file using BY field as key.
*
@ -1938,8 +1977,10 @@ export class SystemKeywords {
public async merge({ pid, file, data, key1, key2 }): Promise<any> {
GBLog.info(`BASIC: MERGE running on ${file} and key1: ${key1}, key2: ${key2}...`);
if (!data){
if (!this.cachedMerge[pid]) {
this.cachedMerge[pid] = { file: {} }
}
if (!data) {
GBLog.info(`BASIC: MERGE running on ${file}: NO DATA.`);
return data;
}
@ -1991,9 +2032,18 @@ export class SystemKeywords {
fieldsSizes.push(t.fieldRawAttributesMap[e].size);
})
rows = await t.findAll({});
if (rows.length > 0) {
if (!this.cachedMerge[pid][file]) {
rows = await t.findAll({});
header = Object.keys(rows[0].dataValues)
}
else {
rows = this.cachedMerge[pid][file];
header = Object.keys(rows[0])
}
if (rows.length > 0) {
}
} else {
@ -2022,25 +2072,39 @@ export class SystemKeywords {
let table = [];
let foundIndex = 0;
// Fills the row variable.
// Fills the row variable on the base dataset.
for (; foundIndex < rows.length; foundIndex++) {
let row = {};
const xlRow = rows[foundIndex];
row = xlRow.dataValues ? xlRow.dataValues : xlRow;
for (let colIndex = 0; colIndex < xlRow.length; colIndex++) {
const propertyName = header[colIndex];
let value = xlRow[colIndex];
if (value && value.charAt(0) === "'") {
if (await this.isValidDate({ pid, dt: value.substr(1) })) {
value = value.substr(1);
if (!storage || !this.cachedMerge[pid][file]) {
for (; foundIndex < rows.length; foundIndex++) {
let row = {};
const tmpRow = rows[foundIndex];
row = tmpRow.dataValues ? tmpRow.dataValues : tmpRow;
for (let colIndex = 0; colIndex < tmpRow.length; colIndex++) {
const propertyName = header[colIndex];
let value = tmpRow[colIndex];
if (value && value.charAt(0) === "'") {
if (await this.isValidDate({ pid, dt: value.substr(1) })) {
value = value.substr(1);
}
}
row[propertyName] = value;
}
row[propertyName] = value;
row['line'] = foundIndex + 1;
table.push(row);
}
if (storage) {
this.cachedMerge[pid][file] = table;
}
row['line'] = foundIndex + 1;
table.push(row);
}
else {
table = this.cachedMerge[pid][file];
}
let key1Index, key2Index;
@ -2118,7 +2182,16 @@ export class SystemKeywords {
obj[columnName] = value;
let criteria = {};
criteria[key1Original] = key1Value;
await t.update(obj, { where: criteria });
let item;
await retry(
async (bail) => {
await t.update(obj, { where: criteria });
},
{
retries: 5,
}
);
return item;
} else {
@ -2157,6 +2230,8 @@ export class SystemKeywords {
}
}
this.cachedMerge[pid][file].push(row);
if (storage) {
await this.saveToStorage({ pid, table: file, fieldsValues, fieldsNames });
}
@ -2168,13 +2243,8 @@ export class SystemKeywords {
}
}
if (table.length === 0) {
GBLog.info(`BASIC: MERGE ran but updated zero rows.`);
return null;
} else {
GBLog.info(`BASIC: MERGE updated (merges:${merges}, additions:${adds}, skipped: ${skipped}).`);
return table;
}
GBLog.info(`BASIC: MERGE updated (merges:${merges}, additions:${adds}, skipped: ${skipped}).`);
return table;
}
public async tweet({ pid, text }) {

View file

@ -109,13 +109,13 @@ export class GBServer {
// Setups global error handlers.
process.on('unhandledRejection', (err, p) => {
err = err['e'] ? err['e'] : err;
GBLog.error(`UNHANDLED_REJECTION(promises): ${err.toString()} ${err['stack'] ? '\n' + err['stack'] : ''} ${err['cause'] ? '\n' + err['cause']?.message : ''}`);
if (err['response']?.obj?.httpStatusCode === 404) {
GBLog.warn(`Check reverse proxy: ${process.env.BOT_URL} as it seems to be invalid.`);
}
});
// process.on('unhandledRejection', (err, p) => {
// err = err['e'] ? err['e'] : err;
// GBLog.error(`UNHANDLED_REJECTION(promises): ${err.toString()} ${err['stack'] ? '\n' + err['stack'] : ''} ${err['cause'] ? '\n' + err['cause']?.message : ''}`);
// if (err['response']?.obj?.httpStatusCode === 404) {
// GBLog.warn(`Check reverse proxy: ${process.env.BOT_URL} as it seems to be invalid.`);
// }
// });
process.on('uncaughtException', (err, p) => {
if (err !== null) {