diff --git a/.vscode/launch.json b/.vscode/launch.json index bf24a3b2..27393a06 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -16,6 +16,9 @@ } }, "args": [], + "env": { + "RUST_LOG": "debug" + }, "cwd": "${workspaceFolder}" }, { @@ -30,6 +33,9 @@ } }, "args": [], + "env": { + "RUST_LOG": "trace" + }, "cwd": "${workspaceFolder}" } ] diff --git a/Cargo.lock b/Cargo.lock index 14e94033..26b7f1d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -459,6 +459,16 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -518,6 +528,436 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "aws-config" +version = "1.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37cf2b6af2a95a20e266782b4f76f1a5e12bf412a9db2de9c1e9123b9d8c0ad8" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "hex", + "http 1.3.1", + "ring", + "time", + "tokio", + "tracing", + "url", + "zeroize", +] + +[[package]] +name = "aws-credential-types" +version = "1.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faf26925f4a5b59eb76722b63c2892b1d70d06fa053c72e4a100ec308c1d47bc" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "zeroize", +] + +[[package]] +name = "aws-lc-rs" +version = "1.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879b6c89592deb404ba4dc0ae6b58ffd1795c78991cbb5b8bc441c48a070440d" +dependencies = [ + "aws-lc-sys", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.32.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "107a4e9d9cab9963e04e84bb8dee0e25f2a987f9a8bad5ed054abd439caa8f8c" +dependencies = [ + "bindgen", + "cc", + "cmake", + "dunce", + "fs_extra", +] + +[[package]] +name = "aws-runtime" +version = "1.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa006bb32360ed90ac51203feafb9d02e3d21046e1fd3a450a404b90ea73e5d" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "http-body 0.4.6", + "percent-encoding", + "pin-project-lite", + "tracing", + "uuid", +] + +[[package]] +name = "aws-sdk-s3" +version = "1.109.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c6d81b75f8ff78882e70c5909804b44553d56136899fb4015a0a68ecc870e0e" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-checksums", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "fastrand", + "hex", + "hmac", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "lru", + "percent-encoding", + "regex-lite", + "sha2", + "tracing", + "url", +] + +[[package]] +name = "aws-sdk-sso" +version = "1.86.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a0abbfab841446cce6e87af853a3ba2cc1bc9afcd3f3550dd556c43d434c86d" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.89.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "695dc67bb861ccb8426c9129b91c30e266a0e3d85650cafdf62fcca14c8fd338" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.88.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d30990923f4f675523c51eb1c0dec9b752fb267b36a61e83cbc219c9d86da715" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bffc03068fbb9c8dd5ce1c6fb240678a5cffb86fb2b7b1985c999c4b83c8df68" +dependencies = [ + "aws-credential-types", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "crypto-bigint 0.5.5", + "form_urlencoded", + "hex", + "hmac", + "http 0.2.12", + "http 1.3.1", + "p256", + "percent-encoding", + "ring", + "sha2", + "subtle", + "time", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-async" +version = "1.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "127fcfad33b7dfc531141fda7e1c402ac65f88aca5511a4d31e2e3d2cd01ce9c" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-checksums" +version = "0.63.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "165d8583d8d906e2fb5511d29201d447cc710864f075debcdd9c31c265412806" +dependencies = [ + "aws-smithy-http", + "aws-smithy-types", + "bytes", + "crc-fast", + "hex", + "http 0.2.12", + "http-body 0.4.6", + "md-5", + "pin-project-lite", + "sha1", + "sha2", + "tracing", +] + +[[package]] +name = "aws-smithy-eventstream" +version = "0.60.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9656b85088f8d9dc7ad40f9a6c7228e1e8447cdf4b046c87e152e0805dea02fa" +dependencies = [ + "aws-smithy-types", + "bytes", + "crc32fast", +] + +[[package]] +name = "aws-smithy-http" +version = "0.62.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3feafd437c763db26aa04e0cc7591185d0961e64c61885bece0fb9d50ceac671" +dependencies = [ + "aws-smithy-eventstream", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-http-client" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1053b5e587e6fa40ce5a79ea27957b04ba660baa02b28b7436f64850152234f1" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "h2 0.3.27", + "h2 0.4.12", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "hyper 0.14.32", + "hyper 1.7.0", + "hyper-rustls 0.24.2", + "hyper-rustls 0.27.7", + "hyper-util", + "pin-project-lite", + "rustls 0.21.12", + "rustls 0.23.34", + "rustls-native-certs 0.8.2", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.4", + "tower 0.5.2", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.61.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cff418fc8ec5cadf8173b10125f05c2e7e1d46771406187b2c878557d4503390" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-observability" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d1881b1ea6d313f9890710d65c158bdab6fb08c91ea825f74c1c8c357baf4cc" +dependencies = [ + "aws-smithy-runtime-api", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d28a63441360c477465f80c7abac3b9c4d075ca638f982e605b7dc2a2c7156c9" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40ab99739082da5347660c556689256438defae3bcefd66c52b095905730e404" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-http-client", + "aws-smithy-observability", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "fastrand", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "http-body 1.0.1", + "pin-project-lite", + "pin-utils", + "tokio", + "tracing", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3683c5b152d2ad753607179ed71988e8cfd52964443b4f74fd8e552d0bbfeb46" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "bytes", + "http 0.2.12", + "http 1.3.1", + "pin-project-lite", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f5b3a7486f6690ba25952cabf1e7d75e34d69eaff5081904a47bc79074d6457" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "http-body 1.0.1", + "http-body-util", + "itoa", + "num-integer", + "pin-project-lite", + "pin-utils", + "ryu", + "serde", + "time", + "tokio", + "tokio-util", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9c34127e8c624bc2999f3b657e749c1393bedc9cd97b92a804db8ced4d2e163" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2fd329bf0e901ff3f60425691410c69094dc2a1f34b331f37bfc4e9ac1565a1" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "rustc_version", + "tracing", +] + [[package]] name = "axum" version = "0.7.9" @@ -529,7 +969,7 @@ dependencies = [ "bytes", "futures-util", "http 1.3.1", - "http-body", + "http-body 1.0.1", "http-body-util", "itoa", "matchit", @@ -555,7 +995,7 @@ dependencies = [ "bytes", "futures-util", "http 1.3.1", - "http-body", + "http-body 1.0.1", "http-body-util", "mime", "pin-project-lite", @@ -566,15 +1006,10 @@ dependencies = [ ] [[package]] -name = "backon" -version = "1.6.0" +name = "base16ct" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" -dependencies = [ - "fastrand", - "gloo-timers", - "tokio", -] +checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" [[package]] name = "base64" @@ -588,12 +1023,42 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "base64ct" version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" +[[package]] +name = "bindgen" +version = "0.72.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" +dependencies = [ + "bitflags", + "cexpr", + "clang-sys", + "itertools 0.13.0", + "log", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn", +] + [[package]] name = "bitflags" version = "2.10.0" @@ -650,6 +1115,8 @@ dependencies = [ "argon2", "async-stream", "async-trait", + "aws-config", + "aws-sdk-s3", "base64 0.22.1", "bytes", "chrono", @@ -661,6 +1128,7 @@ dependencies = [ "futures", "futures-util", "headless_chrome", + "hmac", "imap", "include_dir", "indicatif", @@ -668,9 +1136,9 @@ dependencies = [ "livekit", "log", "mailparse", + "mockito", "native-tls", "num-format", - "opendal", "pdf-extract", "qdrant-client", "rand 0.9.2", @@ -746,6 +1214,16 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +[[package]] +name = "bytes-utils" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "bytestring" version = "1.5.0" @@ -811,6 +1289,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom 7.1.3", +] + [[package]] name = "cff-parser" version = "0.1.0" @@ -873,6 +1360,17 @@ dependencies = [ "inout", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "4.5.50" @@ -899,6 +1397,15 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" +[[package]] +name = "cmake" +version = "0.1.54" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0" +dependencies = [ + "cc", +] + [[package]] name = "codespan-reporting" version = "0.13.1" @@ -916,6 +1423,15 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "colored" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fde0e0ec90c9dfb3b4b1a0891a7dcd0e2bffde2f7efed5fe7c9bb00e5bfb915e" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "combine" version = "4.6.7" @@ -1049,12 +1565,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" [[package]] -name = "crc32c" -version = "0.6.8" +name = "crc-fast" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47" +checksum = "6bf62af4cc77d8fe1c22dde4e721d87f2f54056139d8c412e1366b740305f56f" dependencies = [ - "rustc_version", + "crc", + "digest", + "libc", + "rand 0.9.2", + "regex", ] [[package]] @@ -1078,6 +1598,28 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" +[[package]] +name = "crypto-bigint" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef2b4b23cddf68b89b8f8069890e8c270d54e2d5fe1b143820234805e4cb17ef" +dependencies = [ + "generic-array", + "rand_core 0.6.4", + "subtle", + "zeroize", +] + +[[package]] +name = "crypto-bigint" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76" +dependencies = [ + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -1286,6 +1828,16 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26bf8fc351c5ed29b5c2f0cbbac1b209b74f60ecd62e675a998df72c49af5204" +[[package]] +name = "der" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1a467a65c5e759bce6e65eaf91cc29f466cdc57cb65777bd646872a8a1fd4de" +dependencies = [ + "const-oid", + "zeroize", +] + [[package]] name = "deranged" version = "0.5.5" @@ -1417,7 +1969,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", - "const-oid", "crypto-common", "subtle", ] @@ -1433,15 +1984,6 @@ dependencies = [ "syn", ] -[[package]] -name = "dlv-list" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" -dependencies = [ - "const-random", -] - [[package]] name = "dotenvy" version = "0.15.7" @@ -1496,6 +2038,12 @@ dependencies = [ "dtoa", ] +[[package]] +name = "dunce" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" + [[package]] name = "ecb" version = "0.1.2" @@ -1505,6 +2053,18 @@ dependencies = [ "cipher", ] +[[package]] +name = "ecdsa" +version = "0.14.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c" +dependencies = [ + "der", + "elliptic-curve", + "rfc6979", + "signature", +] + [[package]] name = "ego-tree" version = "0.6.3" @@ -1517,6 +2077,26 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "elliptic-curve" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7bb888ab5300a19b8e5bceef25ac745ad065f3c9f7efc6de1b91958110891d3" +dependencies = [ + "base16ct", + "crypto-bigint 0.4.9", + "der", + "digest", + "ff", + "generic-array", + "group", + "pkcs8", + "rand_core 0.6.4", + "sec1", + "subtle", + "zeroize", +] + [[package]] name = "email-encoding" version = "0.4.1" @@ -1608,6 +2188,16 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "ff" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d013fc25338cc558c5c2cfbad646908fb23591e2404481826742b651c9af7160" +dependencies = [ + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "find-msvc-tools" version = "0.1.4" @@ -1682,6 +2272,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "futf" version = "0.1.5" @@ -1853,15 +2449,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" [[package]] -name = "gloo-timers" -version = "0.3.0" +name = "group" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +checksum = "5dfbfb3a6cfbd390d5c9564ab283a0349b9b9fcd46a706c1eb10e0db70bfbac7" dependencies = [ - "futures-channel", - "futures-core", - "js-sys", - "wasm-bindgen", + "ff", + "rand_core 0.6.4", + "subtle", ] [[package]] @@ -1918,6 +2513,17 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash 0.1.5", +] + [[package]] name = "hashbrown" version = "0.16.0" @@ -1974,15 +2580,6 @@ dependencies = [ "digest", ] -[[package]] -name = "home" -version = "0.5.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc627f471c528ff0c4a49e1d5e60450c8f6461dd6d10ba9dcd3a61d3dff7728d" -dependencies = [ - "windows-sys 0.61.2", -] - [[package]] name = "hostname" version = "0.4.1" @@ -2030,6 +2627,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.1" @@ -2049,7 +2657,7 @@ dependencies = [ "bytes", "futures-core", "http 1.3.1", - "http-body", + "http-body 1.0.1", "pin-project-lite", ] @@ -2065,6 +2673,30 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "hyper" +version = "0.14.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2 0.3.27", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2 0.5.10", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.7.0" @@ -2077,7 +2709,7 @@ dependencies = [ "futures-core", "h2 0.4.12", "http 1.3.1", - "http-body", + "http-body 1.0.1", "httparse", "httpdate", "itoa", @@ -2088,6 +2720,22 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.32", + "log", + "rustls 0.21.12", + "rustls-native-certs 0.6.3", + "tokio", + "tokio-rustls 0.24.1", +] + [[package]] name = "hyper-rustls" version = "0.27.7" @@ -2095,13 +2743,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" dependencies = [ "http 1.3.1", - "hyper", + "hyper 1.7.0", "hyper-util", - "rustls", - "rustls-native-certs", + "rustls 0.23.34", + "rustls-native-certs 0.8.2", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.4", "tower-service", "webpki-roots 1.0.3", ] @@ -2112,7 +2760,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" dependencies = [ - "hyper", + "hyper 1.7.0", "hyper-util", "pin-project-lite", "tokio", @@ -2127,7 +2775,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper", + "hyper 1.7.0", "hyper-util", "native-tls", "tokio", @@ -2147,8 +2795,8 @@ dependencies = [ "futures-core", "futures-util", "http 1.3.1", - "http-body", - "hyper", + "http-body 1.0.1", + "hyper 1.7.0", "ipnet", "libc", "percent-encoding", @@ -2779,6 +3427,15 @@ dependencies = [ "weezl", ] +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.5", +] + [[package]] name = "lru-slab" version = "0.1.2" @@ -2902,6 +3559,30 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "mockito" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7760e0e418d9b7e5777c0374009ca4c93861b9066f18cb334a20ce50ab63aa48" +dependencies = [ + "assert-json-diff", + "bytes", + "colored", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.7.0", + "hyper-util", + "log", + "rand 0.9.2", + "regex", + "serde_json", + "serde_urlencoded", + "similar", + "tokio", +] + [[package]] name = "multimap" version = "0.10.1" @@ -3044,34 +3725,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" -[[package]] -name = "opendal" -version = "0.54.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42afda58fa2cf50914402d132cc1caacff116a85d10c72ab2082bb7c50021754" -dependencies = [ - "anyhow", - "backon", - "base64 0.22.1", - "bytes", - "chrono", - "crc32c", - "futures", - "getrandom 0.2.16", - "http 1.3.1", - "http-body", - "log", - "md-5", - "percent-encoding", - "quick-xml 0.38.3", - "reqsign", - "reqwest", - "serde", - "serde_json", - "tokio", - "uuid", -] - [[package]] name = "openssl" version = "0.10.74" @@ -3116,16 +3769,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "ordered-multimap" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49203cdcae0030493bad186b28da2fa25645fa276a51b6fec8010d281e02ef79" -dependencies = [ - "dlv-list", - "hashbrown 0.14.5", -] - [[package]] name = "ouroboros" version = "0.18.5" @@ -3150,6 +3793,23 @@ dependencies = [ "syn", ] +[[package]] +name = "outref" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" + +[[package]] +name = "p256" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51f44edd08f51e2ade572f141051021c5af22677e42b7dd28a88155151c33594" +dependencies = [ + "ecdsa", + "elliptic-curve", + "sha2", +] + [[package]] name = "parking_lot" version = "0.12.5" @@ -3415,6 +4075,16 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs8" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9eca2c590a5f85da82668fa685c09ce2888b9430e83299debf1f34b65fd4a4ba" +dependencies = [ + "der", + "spki", +] + [[package]] name = "pkg-config" version = "0.3.32" @@ -3649,26 +4319,6 @@ dependencies = [ "tonic", ] -[[package]] -name = "quick-xml" -version = "0.37.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb" -dependencies = [ - "memchr", - "serde", -] - -[[package]] -name = "quick-xml" -version = "0.38.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89" -dependencies = [ - "memchr", - "serde", -] - [[package]] name = "quinn" version = "0.11.9" @@ -3681,7 +4331,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls", + "rustls 0.23.34", "socket2 0.6.1", "thiserror 2.0.17", "tokio", @@ -3701,7 +4351,7 @@ dependencies = [ "rand 0.9.2", "ring", "rustc-hash", - "rustls", + "rustls 0.23.34", "rustls-pki-types", "slab", "thiserror 2.0.17", @@ -3878,35 +4528,6 @@ version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" -[[package]] -name = "reqsign" -version = "0.16.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43451dbf3590a7590684c25fb8d12ecdcc90ed3ac123433e500447c7d77ed701" -dependencies = [ - "anyhow", - "async-trait", - "base64 0.22.1", - "chrono", - "form_urlencoded", - "getrandom 0.2.16", - "hex", - "hmac", - "home", - "http 1.3.1", - "log", - "percent-encoding", - "quick-xml 0.37.5", - "rand 0.8.5", - "reqwest", - "rust-ini", - "serde", - "serde_json", - "sha1", - "sha2", - "tokio", -] - [[package]] name = "reqwest" version = "0.12.24" @@ -3921,10 +4542,10 @@ dependencies = [ "futures-util", "h2 0.4.12", "http 1.3.1", - "http-body", + "http-body 1.0.1", "http-body-util", - "hyper", - "hyper-rustls", + "hyper 1.7.0", + "hyper-rustls 0.27.7", "hyper-tls", "hyper-util", "js-sys", @@ -3934,8 +4555,8 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls", - "rustls-native-certs", + "rustls 0.23.34", + "rustls-native-certs 0.8.2", "rustls-pki-types", "serde", "serde_json", @@ -3943,7 +4564,7 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-native-tls", - "tokio-rustls", + "tokio-rustls 0.26.4", "tokio-util", "tower 0.5.2", "tower-http", @@ -3956,6 +4577,17 @@ dependencies = [ "webpki-roots 1.0.3", ] +[[package]] +name = "rfc6979" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7743f17af12fa0b03b803ba12cd6a8d9483a587e89c69445e3909655c0b9fabb" +dependencies = [ + "crypto-bigint 0.4.9", + "hmac", + "zeroize", +] + [[package]] name = "rhai" version = "1.22.2" @@ -3997,16 +4629,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rust-ini" -version = "0.21.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "796e8d2b6696392a43bea58116b667fb4c29727dc5abd27d6acf338bb4f688c7" -dependencies = [ - "cfg-if", - "ordered-multimap", -] - [[package]] name = "rustc-hash" version = "2.1.1" @@ -4035,21 +4657,46 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "rustls" +version = "0.21.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +dependencies = [ + "log", + "ring", + "rustls-webpki 0.101.7", + "sct", +] + [[package]] name = "rustls" version = "0.23.34" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a9586e9ee2b4f8fab52a0048ca7334d7024eef48e2cb9407e3497bb7cab7fa7" dependencies = [ + "aws-lc-rs", "log", "once_cell", "ring", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.103.7", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile 1.0.4", + "schannel", + "security-framework 2.11.1", +] + [[package]] name = "rustls-native-certs" version = "0.8.2" @@ -4062,6 +4709,15 @@ dependencies = [ "security-framework 3.5.1", ] +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64 0.21.7", +] + [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -4081,12 +4737,23 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.103.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e10b3f4191e8a80e6b43eebabfac91e5dcecebb27a71f04e820c47ec41d314bf" dependencies = [ + "aws-lc-rs", "ring", "rustls-pki-types", "untrusted", @@ -4150,6 +4817,30 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d68f2ec51b097e4c1a75b681a8bec621909b5e91f15bb7b840c4f2f7b01148b2" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "sec1" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3be24c1842290c45df0a7bf069e0c268a747ad05a192f2fd7dcfdbc1cba40928" +dependencies = [ + "base16ct", + "der", + "generic-array", + "pkcs8", + "subtle", + "zeroize", +] + [[package]] name = "security-framework" version = "2.11.1" @@ -4336,12 +5027,28 @@ dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "1.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" +dependencies = [ + "digest", + "rand_core 0.6.4", +] + [[package]] name = "simd-adler32" version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" +[[package]] +name = "similar" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbbb5d9659141646ae647b42fe094daf6c6192d1620870b449d9557f748b2daa" + [[package]] name = "siphasher" version = "0.3.11" @@ -4408,6 +5115,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "spki" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67cf02bbac7a337dc36e4f5a693db6c21e7863f45070f7064577eb4367a3212b" +dependencies = [ + "base64ct", + "der", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" @@ -4746,13 +5463,23 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.12", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" dependencies = [ - "rustls", + "rustls 0.23.34", "tokio", ] @@ -4806,19 +5533,19 @@ dependencies = [ "flate2", "h2 0.4.12", "http 1.3.1", - "http-body", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.7.0", "hyper-timeout", "hyper-util", "percent-encoding", "pin-project", "prost 0.13.5", - "rustls-native-certs", - "rustls-pemfile", + "rustls-native-certs 0.8.2", + "rustls-pemfile 2.2.0", "socket2 0.5.10", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.4", "tokio-stream", "tower 0.4.13", "tower-layer", @@ -4871,7 +5598,7 @@ dependencies = [ "bytes", "futures-util", "http 1.3.1", - "http-body", + "http-body 1.0.1", "iri-string", "pin-project-lite", "tower 0.5.2", @@ -5087,7 +5814,7 @@ dependencies = [ "flate2", "log", "once_cell", - "rustls", + "rustls 0.23.34", "rustls-pki-types", "socks", "url", @@ -5104,8 +5831,8 @@ dependencies = [ "flate2", "log", "percent-encoding", - "rustls", - "rustls-pemfile", + "rustls 0.23.34", + "rustls-pemfile 2.2.0", "rustls-pki-types", "ureq-proto", "utf-8", @@ -5190,6 +5917,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "walkdir" version = "2.5.0" @@ -5764,6 +6497,12 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb" +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + [[package]] name = "xz2" version = "0.1.7" diff --git a/Cargo.toml b/Cargo.toml index 667fd203..546638ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,8 @@ anyhow = "1.0" argon2 = "0.5" async-stream = "0.3" async-trait = "0.1" +aws-config = "1.8.8" +aws-sdk-s3 = { version = "1.109.0", features = ["behavior-version-latest"] } base64 = "0.22" bytes = "1.8" chrono = { version = "0.4", features = ["serde"] } @@ -64,6 +66,7 @@ env_logger = "0.11" futures = "0.3" futures-util = "0.3" headless_chrome = { version = "1.0.18", optional = true } +hmac = "0.12.1" imap = { version = "3.0.0-alpha.15", optional = true } include_dir = "0.7" indicatif = "0.18.0" @@ -71,9 +74,9 @@ lettre = { version = "0.11", features = ["smtp-transport", "builder", "tokio1", livekit = "0.7" log = "0.4" mailparse = "0.15" +mockito = "1.7.0" native-tls = "0.2" num-format = "0.4" -opendal = { version = "0.54.1", features = ["services-s3"] } pdf-extract = "0.10.0" qdrant-client = { version = "1.12", optional = true } rand = "0.9.2" diff --git a/add-req.sh b/add-req.sh index 0fce6738..89f9a9f2 100755 --- a/add-req.sh +++ b/add-req.sh @@ -22,9 +22,9 @@ dirs=( # "auth" # "automation" # "basic" - # "bot" + "bot" "bootstrap" - "package_manager" + #"package_manager" # "channels" # "config" # "context" diff --git a/migrations/6.0.4.sql b/migrations/6.0.4.sql index 7847d992..c22b0e7d 100644 --- a/migrations/6.0.4.sql +++ b/migrations/6.0.4.sql @@ -183,31 +183,6 @@ BEGIN END IF; END $$; --- ============================================================================ --- DEFAULT SERVER CONFIGURATION --- Insert default values that replace .env --- ============================================================================ -INSERT INTO server_configuration (id, config_key, config_value, config_type, description) VALUES - (gen_random_uuid()::text, 'SERVER_HOST', '127.0.0.1', 'string', 'Server bind address'), - (gen_random_uuid()::text, 'SERVER_PORT', '8080', 'integer', 'Server port'), - (gen_random_uuid()::text, 'TABLES_SERVER', 'localhost', 'string', 'PostgreSQL server address'), - (gen_random_uuid()::text, 'TABLES_PORT', '5432', 'integer', 'PostgreSQL port'), - (gen_random_uuid()::text, 'TABLES_DATABASE', 'botserver', 'string', 'PostgreSQL database name'), - (gen_random_uuid()::text, 'TABLES_USERNAME', 'botserver', 'string', 'PostgreSQL username'), - (gen_random_uuid()::text, 'DRIVE_SERVER', 'localhost:9000', 'string', 'MinIO server address'), - (gen_random_uuid()::text, 'DRIVE_USE_SSL', 'false', 'boolean', 'Use SSL for drive'), - (gen_random_uuid()::text, 'DRIVE_ORG_PREFIX', 'botserver', 'string', 'Drive organization prefix'), - (gen_random_uuid()::text, 'DRIVE_BUCKET', 'default', 'string', 'Default S3 bucket'), - (gen_random_uuid()::text, 'VECTORDB_URL', 'http://localhost:6333', 'string', 'Qdrant vector database URL'), - (gen_random_uuid()::text, 'CACHE_URL', 'redis://localhost:6379', 'string', 'Redis cache URL'), - (gen_random_uuid()::text, 'STACK_PATH', './botserver-stack', 'string', 'Base path for all components'), - (gen_random_uuid()::text, 'SITES_ROOT', './botserver-stack/sites', 'string', 'Root path for sites') -ON CONFLICT (config_key) DO NOTHING; - --- ============================================================================ --- DEFAULT TENANT --- Create default tenant for single-tenant installations --- ============================================================================ INSERT INTO tenants (id, name, slug, is_active) VALUES (gen_random_uuid(), 'Default Tenant', 'default', true) ON CONFLICT (slug) DO NOTHING; diff --git a/migrations/6.0.6.sql b/migrations/6.0.6.sql index cee6d7f0..e69de29b 100644 --- a/migrations/6.0.6.sql +++ b/migrations/6.0.6.sql @@ -1,11 +0,0 @@ --- Migration 6.0.6: Add LLM configuration defaults --- Description: Configure local LLM server settings with model paths - --- Insert LLM configuration with defaults -INSERT INTO server_configuration (id, config_key, config_value, config_type, description) VALUES - (gen_random_uuid()::text, 'LLM_LOCAL', 'true', 'boolean', 'Enable local LLM server'), - (gen_random_uuid()::text, 'LLM_MODEL_PATH', 'botserver-stack/data/llm/DeepSeek-R1-Distill-Qwen-1.5B-Q3_K_M.gguf', 'string', 'Path to LLM model file'), - (gen_random_uuid()::text, 'LLM_URL', 'http://localhost:8081', 'string', 'Local LLM server URL'), - (gen_random_uuid()::text, 'EMBEDDING_MODEL_PATH', 'botserver-stack/data/llm/DeepSeek-R1-Distill-Qwen-1.5B-Q3_K_M.gguf', 'string', 'Path to embedding model file'), - (gen_random_uuid()::text, 'EMBEDDING_URL', 'http://localhost:8082', 'string', 'Embedding server URL') -ON CONFLICT (config_key) DO NOTHING; diff --git a/src/auth/mod.rs b/src/auth/mod.rs index 17d13bc4..1a0f2e63 100644 --- a/src/auth/mod.rs +++ b/src/auth/mod.rs @@ -1,11 +1,11 @@ -use actix_web::{web, HttpResponse, Result}; +use actix_web::{HttpRequest, HttpResponse, Result, web}; use argon2::{ password_hash::{rand_core::OsRng, PasswordHash, PasswordHasher, PasswordVerifier, SaltString}, Argon2, }; use diesel::pg::PgConnection; use diesel::prelude::*; -use log::{error, warn}; +use log::{error}; use redis::Client; use std::collections::HashMap; use std::sync::Arc; @@ -148,6 +148,7 @@ impl AuthService { #[actix_web::get("/api/auth")] async fn auth_handler( + req: HttpRequest, data: web::Data, web::Query(params): web::Query>, ) -> Result { @@ -166,45 +167,10 @@ async fn auth_handler( } }; - let bot_id = if let Ok(bot_guid) = std::env::var("BOT_GUID") { - match Uuid::parse_str(&bot_guid) { - Ok(uuid) => uuid, - Err(e) => { - warn!("Invalid BOT_GUID from env: {}", e); - return Ok(HttpResponse::BadRequest() - .json(serde_json::json!({"error": "Invalid BOT_GUID"}))); - } - } - } else { - // BOT_GUID not set, get first available bot from database - use crate::shared::models::schema::bots::dsl::*; - use diesel::prelude::*; - - let mut db_conn = data.conn.lock().unwrap(); - match bots - .filter(is_active.eq(true)) - .select(id) - .first::(&mut *db_conn) - .optional() - { - Ok(Some(first_bot_id)) => { - log::info!( - "BOT_GUID not set, using first available bot: {}", - first_bot_id - ); - first_bot_id - } - Ok(None) => { - error!("No active bots found in database"); - return Ok(HttpResponse::ServiceUnavailable() - .json(serde_json::json!({"error": "No bots available"}))); - } - Err(e) => { - error!("Failed to query bots: {}", e); - return Ok(HttpResponse::InternalServerError() - .json(serde_json::json!({"error": "Failed to query bots"}))); - } - } + let mut db_conn = data.conn.lock().unwrap(); + let (bot_id, bot_name) = match crate::bot::bot_from_url(&mut *db_conn, req.path()) { + Ok((id, name)) => (id, name), + Err(res) => return Ok(res), }; let session = { @@ -224,35 +190,40 @@ async fn auth_handler( } }; - let session_id_clone = session.id.clone(); - let auth_script_path = "./templates/annoucements.gbai/annoucements.gbdialog/auth.bas"; - let auth_script = match std::fs::read_to_string(auth_script_path) { - Ok(content) => content, - Err(_) => r#"SET_USER "00000000-0000-0000-0000-000000000001""#.to_string(), - }; - - let script_service = crate::basic::ScriptService::new(Arc::clone(&data), session.clone()); - match script_service - .compile(&auth_script) - .and_then(|ast| script_service.run(&ast)) - { - Ok(result) => { - if result.to_string() == "false" { - error!("Auth script returned false, authentication failed"); - return Ok(HttpResponse::Unauthorized() - .json(serde_json::json!({"error": "Authentication failed"}))); + let auth_script_path = format!("./work/{}.gbai/{}.gbdialog/auth.ast", bot_name, bot_name); + if std::path::Path::new(&auth_script_path).exists() { + let auth_script = match std::fs::read_to_string(&auth_script_path) { + Ok(content) => content, + Err(e) => { + error!("Failed to read auth script: {}", e); + return Ok(HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Failed to read auth script"}))); + } + }; + + let script_service = crate::basic::ScriptService::new(Arc::clone(&data), session.clone()); + match script_service + .compile(&auth_script) + .and_then(|ast| script_service.run(&ast)) + { + Ok(result) => { + if result.to_string() == "false" { + error!("Auth script returned false, authentication failed"); + return Ok(HttpResponse::Unauthorized() + .json(serde_json::json!({"error": "Authentication failed"}))); + } + } + Err(e) => { + error!("Failed to run auth script: {}", e); + return Ok(HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Auth failed"}))); } - } - Err(e) => { - error!("Failed to run auth script: {}", e); - return Ok(HttpResponse::InternalServerError() - .json(serde_json::json!({"error": "Auth failed"}))); } } let session = { let mut sm = data.session_manager.lock().await; - match sm.get_session_by_id(session_id_clone) { + match sm.get_session_by_id(session.id) { Ok(Some(s)) => s, Ok(None) => { error!("Failed to retrieve session"); diff --git a/src/automation/mod.rs b/src/automation/mod.rs index 13035db4..558d97e3 100644 --- a/src/automation/mod.rs +++ b/src/automation/mod.rs @@ -331,7 +331,7 @@ impl AutomationService { e ); - if let Some(s3_operator) = &self.state.s3_operator { + if let Some(client) = &self.state.s3_client { let bucket_name = format!( "{}{}.gbai", env::var("MINIO_ORG_PREFIX").unwrap_or_else(|_| "org1_".to_string()), @@ -341,10 +341,9 @@ impl AutomationService { trace!("Downloading from bucket={} key={}", bucket_name, s3_key); - match s3_operator.read(&format!("{}/{}", bucket_name, s3_key)).await { + match crate::kb::minio_handler::get_file_content(client, &bucket_name, &s3_key).await { Ok(data) => { - let bytes: Vec = data.to_vec(); - match String::from_utf8(bytes) { + match String::from_utf8(data) { Ok(content) => { info!("Downloaded script '{}' from MinIO", param); diff --git a/src/basic/keywords/get.rs b/src/basic/keywords/get.rs index aa2ab44b..848d8f43 100644 --- a/src/basic/keywords/get.rs +++ b/src/basic/keywords/get.rs @@ -2,6 +2,7 @@ use crate::shared::models::UserSession; use crate::shared::state::AppState; use log::{debug, error, info}; use reqwest::{self, Client}; +use crate::kb::minio_handler; use rhai::{Dynamic, Engine}; use std::error::Error; use std::path::Path; @@ -158,13 +159,7 @@ pub async fn get_from_bucket( return Err("Invalid file path".into()); } - let s3_operator = match &state.s3_operator { - Some(operator) => operator, - None => { - error!("S3 operator not configured"); - return Err("S3 operator not configured".into()); - } - }; + let client = state.s3_client.as_ref().ok_or("S3 client not configured")?; let bucket_name = { let cfg = state @@ -187,11 +182,11 @@ pub async fn get_from_bucket( bucket }; - let response = match tokio::time::timeout( - Duration::from_secs(30), - s3_operator.read(&format!("{}/{}", bucket_name, file_path)) + let bytes = match tokio::time::timeout( + Duration::from_secs(30), + minio_handler::get_file_content(client, &bucket_name, file_path) ).await { - Ok(Ok(response)) => response, + Ok(Ok(data)) => data, Ok(Err(e)) => { error!("S3 read failed: {}", e); return Err(format!("S3 operation failed: {}", e).into()); @@ -202,15 +197,7 @@ pub async fn get_from_bucket( } }; - let bytes = response.to_vec(); - debug!( - "Retrieved {} bytes from S3 for key: {}", - bytes.len(), - file_path - ); - let content = if file_path.to_ascii_lowercase().ends_with(".pdf") { - debug!("Processing as PDF file: {}", file_path); match pdf_extract::extract_text_from_mem(&bytes) { Ok(text) => text, Err(e) => { diff --git a/src/bootstrap/mod.rs b/src/bootstrap/mod.rs index b4637b02..942d5ee6 100644 --- a/src/bootstrap/mod.rs +++ b/src/bootstrap/mod.rs @@ -1,20 +1,25 @@ use crate::config::AppConfig; use crate::package_manager::{InstallMode, PackageManager}; use anyhow::Result; -use diesel::connection::SimpleConnection; -use diesel::RunQueryDsl; -use diesel::{Connection, QueryableByName}; +use diesel::{connection::SimpleConnection, RunQueryDsl, Connection, QueryableByName}; use dotenvy::dotenv; use log::{debug, error, info, trace}; -use opendal::Operator; +use aws_sdk_s3::Client; +use aws_config::BehaviorVersion; use rand::distr::Alphanumeric; use rand::Rng; use sha2::{Digest, Sha256}; use std::io::{self, Write}; use std::path::Path; use std::process::Command; +use std::sync::{Arc, Mutex}; + +use diesel::Queryable; #[derive(QueryableByName)] +#[diesel(check_for_backend(diesel::pg::Pg))] +#[derive(Queryable)] +#[diesel(table_name = crate::shared::models::schema::bots)] struct BotIdRow { #[diesel(sql_type = diesel::sql_types::Uuid)] id: uuid::Uuid, @@ -28,21 +33,21 @@ pub struct ComponentInfo { pub struct BootstrapManager { pub install_mode: InstallMode, pub tenant: Option, - pub s3_operator: Operator, + pub s3_client: Client, } impl BootstrapManager { - pub fn new(install_mode: InstallMode, tenant: Option) -> Self { + pub async fn new(install_mode: InstallMode, tenant: Option) -> Self { info!( "Initializing BootstrapManager with mode {:?} and tenant {:?}", install_mode, tenant ); let config = AppConfig::from_env(); - let s3_operator = Self::create_s3_operator(&config); + let s3_client = futures::executor::block_on(Self::create_s3_operator(&config)); Self { install_mode, tenant, - s3_operator, + s3_client, } } @@ -140,8 +145,8 @@ impl BootstrapManager { let mut conn = diesel::pg::PgConnection::establish(&database_url) .map_err(|e| anyhow::anyhow!("Failed to connect to database: {}", e))?; let default_bot_id: uuid::Uuid = diesel::sql_query("SELECT id FROM bots LIMIT 1") - .get_result::(&mut conn) - .map(|row| row.id) + .load::(&mut conn) + .map(|rows| rows.first().map(|r| r.id).unwrap_or_else(|| uuid::Uuid::new_v4())) .unwrap_or_else(|_| uuid::Uuid::new_v4()); if let Err(e) = self.update_bot_config(&default_bot_id, component.name) { @@ -156,7 +161,8 @@ impl BootstrapManager { Ok(()) } - pub fn bootstrap(&mut self) -> Result { + pub async fn bootstrap(&mut self) -> Result { + // First check for legacy mode if let Ok(tables_server) = std::env::var("TABLES_SERVER") { if !tables_server.is_empty() { info!( @@ -164,7 +170,7 @@ impl BootstrapManager { ); let database_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| { let username = - std::env::var("TABLES_USERNAME").unwrap_or_else(|_| "postgres".to_string()); + std::env::var("TABLES_USERNAME").unwrap_or_else(|_| "gbuser".to_string()); let password = std::env::var("TABLES_PASSWORD").unwrap_or_else(|_| "postgres".to_string()); let server = @@ -178,6 +184,11 @@ impl BootstrapManager { ) }); + // In legacy mode, still try to load config.csv if available + if let Ok(config) = self.load_config_from_csv().await { + return Ok(config); + } + match diesel::PgConnection::establish(&database_url) { Ok(mut conn) => { if let Err(e) = self.apply_migrations(&mut conn) { @@ -292,47 +303,50 @@ impl BootstrapManager { } } - self.s3_operator = Self::create_s3_operator(&config); - let default_bucket_path = Path::new("templates/default.gbai/default.gbot/config.csv"); - if default_bucket_path.exists() { - info!("Found initial config.csv, uploading to default.gbai/default.gbot"); - let operator = &self.s3_operator; - futures::executor::block_on(async { - let content = std::fs::read(default_bucket_path).expect("Failed to read config.csv"); - operator.write("default.gbai/default.gbot/config.csv", content).await - })?; - debug!("Initial config.csv uploaded successfully"); + self.s3_client = futures::executor::block_on(Self::create_s3_operator(&config)); + + // Load config from CSV if available + if let Ok(csv_config) = self.load_config_from_csv().await { + Ok(csv_config) + } else { + Ok(config) } - Ok(config) } - fn create_s3_operator(config: &AppConfig) -> Operator { - use opendal::Scheme; - use std::collections::HashMap; - let mut endpoint = config.drive.server.clone(); - if !endpoint.ends_with('/') { - endpoint.push('/'); - } - let mut map = HashMap::new(); - map.insert("endpoint".to_string(), endpoint); - map.insert("access_key_id".to_string(), config.drive.access_key.clone()); - map.insert( - "secret_access_key".to_string(), - config.drive.secret_key.clone(), - ); - map.insert( - "bucket".to_string(), - format!("default.gbai"), - ); - map.insert("region".to_string(), "auto".to_string()); - map.insert("force_path_style".to_string(), "true".to_string()); - trace!("Creating S3 operator with endpoint {}", config.drive.server); + async fn create_s3_operator(config: &AppConfig) -> Client { + let endpoint = if !config.drive.server.ends_with('/') { + format!("{}/", config.drive.server) + } else { + config.drive.server.clone() + }; - Operator::via_iter(Scheme::S3, map).expect("Failed to initialize S3 operator") + let base_config = aws_config::defaults(BehaviorVersion::latest()) + .endpoint_url(endpoint) + .region("auto") + .credentials_provider( + aws_sdk_s3::config::Credentials::new( + config.drive.access_key.clone(), + config.drive.secret_key.clone(), + None, + None, + "static", + ) + ) + .load() + .await; + + let s3_config = aws_sdk_s3::config::Builder::from(&base_config) + .force_path_style(true) + .build(); + + aws_sdk_s3::Client::from_conf(s3_config) } + + + fn generate_secure_password(&self, length: usize) -> String { let mut rng = rand::rng(); std::iter::repeat_with(|| rng.sample(Alphanumeric) as char) @@ -381,7 +395,7 @@ impl BootstrapManager { if !templates_dir.exists() { return Ok(()); } - let operator = &self.s3_operator; + let client = &self.s3_client; for entry in std::fs::read_dir(templates_dir)? { let entry = entry?; let path = entry.path(); @@ -395,27 +409,30 @@ impl BootstrapManager { let bot_name = path.file_name().unwrap().to_string_lossy().to_string(); let bucket = bot_name.trim_start_matches('/').to_string(); info!("Uploading template {} to Drive bucket {}", bot_name, bucket); -if operator.stat(&bucket).await.is_err() { - info!("Bucket {} not found, creating it", bucket); - let bucket_path = if bucket.ends_with('/') { bucket.clone() } else { format!("{}/", bucket) }; -match operator.create_dir(&bucket_path).await { - Ok(_) => { - debug!("Bucket {} created successfully", bucket); - } - Err(e) => { - let err_msg = format!("{}", e); - if err_msg.contains("BucketAlreadyOwnedByYou") { - log::warn!("Bucket {} already exists, reusing default.gbai", bucket); - self.upload_directory_recursive(&operator, &Path::new("templates/default.gbai"), "default.gbai").await?; - continue; - } else { - return Err(e.into()); - } - } - } -} -self.upload_directory_recursive(&operator, &path, &bucket).await?; -info!("Uploaded template {} to Drive bucket {}", bot_name, bucket); + + // Check if bucket exists + if client.head_bucket().bucket(&bucket).send().await.is_err() { + info!("Bucket {} not found, creating it", bucket); + match client.create_bucket() + .bucket(&bucket) + .send() + .await { + Ok(_) => { + debug!("Bucket {} created successfully", bucket); + } + Err(e) => { + error!("Failed to create bucket {}: {:?}", bucket, e); + return Err(anyhow::anyhow!( + "Failed to create bucket {}: {}. Check S3 credentials and endpoint configuration", + bucket, e + )); + } + } + } + + self.upload_directory_recursive(client, &path, &bucket, "/") + .await?; + info!("Uploaded template {} to Drive bucket {}", bot_name, bucket); } } Ok(()) @@ -436,22 +453,9 @@ info!("Uploaded template {} to Drive bucket {}", bot_name, bucket); if path.is_dir() && path.extension().map(|e| e == "gbai").unwrap_or(false) { let bot_folder = path.file_name().unwrap().to_string_lossy().to_string(); let bot_name = bot_folder.trim_end_matches(".gbai"); - let formatted_name = bot_name - .split('_') - .map(|word| { - let mut chars = word.chars(); - match chars.next() { - None => String::new(), - Some(first) => { - first.to_uppercase().collect::() + chars.as_str() - } - } - }) - .collect::>() - .join(" "); let existing: Option = bots::table - .filter(bots::name.eq(&formatted_name)) + .filter(bots::name.eq(&bot_name)) .select(bots::name) .first(conn) .optional()?; @@ -461,11 +465,11 @@ info!("Uploaded template {} to Drive bucket {}", bot_name, bucket); "INSERT INTO bots (id, name, description, llm_provider, llm_config, context_provider, context_config, is_active) \ VALUES (gen_random_uuid(), $1, $2, 'openai', '{\"model\": \"gpt-4\", \"temperature\": 0.7}', 'database', '{}', true)" ) - .bind::(&formatted_name) + .bind::(&bot_name) .bind::(format!("Bot for {} template", bot_name)) .execute(conn)?; } else { - log::trace!("Bot {} already exists", formatted_name); + log::trace!("Bot {} already exists", bot_name); } } } @@ -475,8 +479,9 @@ info!("Uploaded template {} to Drive bucket {}", bot_name, bucket); fn upload_directory_recursive<'a>( &'a self, - client: &'a Operator, + client: &'a Client, local_path: &'a Path, + bucket: &'a str, prefix: &'a str, ) -> std::pin::Pin> + 'a>> { Box::pin(async move { @@ -490,26 +495,79 @@ info!("Uploaded template {} to Drive bucket {}", bot_name, bucket); let entry = entry?; let path = entry.path(); let file_name = path.file_name().unwrap().to_string_lossy().to_string(); - let key = if prefix.is_empty() { - file_name.clone() - } else { - format!("{}/{}", prefix.trim_end_matches('/'), file_name) - }; + + // Construct key path, ensuring no duplicate slashes + let mut key = prefix.trim_matches('/').to_string(); + if !key.is_empty() { + key.push('/'); + } + key.push_str(&file_name); if path.is_file() { - info!("Uploading file: {} with key: {}", path.display(), key); + info!("Uploading file: {} to bucket {} with key: {}", + path.display(), bucket, key); let content = std::fs::read(&path)?; - trace!("Writing file {} with key {}", path.display(), key); - client.write(&key, content).await?; - trace!("Successfully wrote file {}", path.display()); + client.put_object() + .bucket(bucket) + .key(&key) + .body(content.into()) + .send() + .await?; } else if path.is_dir() { - self.upload_directory_recursive(client, &path, &key).await?; + self.upload_directory_recursive(client, &path, bucket, &key).await?; } } Ok(()) }) } + async fn load_config_from_csv(&self) -> Result { + use crate::config::ConfigManager; + use uuid::Uuid; + + let client = &self.s3_client; + let bucket = "default.gbai"; + let config_key = "default.gbot/config.csv"; + + match client.get_object() + .bucket(bucket) + .key(config_key) + .send() + .await + { + Ok(response) => { + let bytes = response.body.collect().await?.into_bytes(); + let csv_content = String::from_utf8(bytes.to_vec())?; + + let database_url = std::env::var("DATABASE_URL") + .unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string()); + // Create new connection for config loading + let config_conn = diesel::PgConnection::establish(&database_url)?; + let config_manager = ConfigManager::new(Arc::new(Mutex::new(config_conn))); + + // Use default bot ID or create one if needed + let default_bot_id = Uuid::parse_str("00000000-0000-0000-0000-000000000000")?; + + // Write CSV to temp file for ConfigManager + let temp_path = std::env::temp_dir().join("config.csv"); + std::fs::write(&temp_path, csv_content)?; + + config_manager.sync_gbot_config(&default_bot_id, temp_path.to_str().unwrap()) + .map_err(|e| anyhow::anyhow!("Failed to sync gbot config: {}", e))?; + + // Load config from database which now has the CSV values + let mut config_conn = diesel::PgConnection::establish(&database_url)?; + let config = AppConfig::from_database(&mut config_conn); + info!("Successfully loaded config from CSV"); + Ok(config) + } + Err(e) => { + debug!("No config.csv found: {}", e); + Err(e.into()) + } + } + } + fn apply_migrations(&self, conn: &mut diesel::PgConnection) -> Result<()> { let migrations_dir = std::path::Path::new("migrations"); if !migrations_dir.exists() { diff --git a/src/bot/mod.rs b/src/bot/mod.rs index bd56ba6d..db54784e 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -3,6 +3,7 @@ use crate::shared::models::{BotResponse, UserMessage, UserSession}; use crate::shared::state::AppState; use actix_web::{web, HttpRequest, HttpResponse, Result}; use actix_ws::Message as WsMessage; +use diesel::PgConnection; use log::{debug, error, info, warn}; use chrono::Utc; use serde_json; @@ -11,18 +12,140 @@ use std::sync::Arc; use tokio::sync::mpsc; use crate::kb::embeddings::generate_embeddings; use uuid::Uuid; - use crate::kb::qdrant_client::{ensure_collection_exists, get_qdrant_client, QdrantPoint}; -use crate::context::langcache::{get_langcache_client}; - +use crate::context::langcache::get_langcache_client; +use crate::drive_monitor::DriveMonitor; +use tokio::sync::Mutex as AsyncMutex; pub struct BotOrchestrator { pub state: Arc, + pub mounted_bots: Arc>>>, } impl BotOrchestrator { pub fn new(state: Arc) -> Self { - Self { state } + Self { + state, + mounted_bots: Arc::new(AsyncMutex::new(HashMap::new())), + } + } + + pub async fn mount_all_bots(&self) -> Result<(), Box> { + use crate::shared::models::schema::bots::dsl::*; + use diesel::prelude::*; + + let mut db_conn = self.state.conn.lock().unwrap(); + let active_bots = bots + .filter(is_active.eq(true)) + .select(id) + .load::(&mut *db_conn) + .map_err(|e| { + error!("Failed to query active bots: {}", e); + e + })?; + + for bot_guid in active_bots { + let state_clone = self.state.clone(); + let mounted_bots_clone = self.mounted_bots.clone(); + let bot_guid_str = bot_guid.to_string(); + + tokio::spawn(async move { + if let Err(e) = Self::mount_bot_task(state_clone, mounted_bots_clone, bot_guid_str.clone()).await { + error!("Failed to mount bot {}: {}", bot_guid_str, e); + } + }); + } + + Ok(()) + } + + async fn mount_bot_task( + state: Arc, + mounted_bots: Arc>>>, + bot_guid: String, + ) -> Result<(), Box> { + use diesel::prelude::*; + use crate::shared::models::schema::bots::dsl::*; + + let bot_name: String = { + let mut db_conn = state.conn.lock().unwrap(); + bots + .filter(id.eq(Uuid::parse_str(&bot_guid)?)) + .select(name) + .first(&mut *db_conn) + .map_err(|e| { + error!("Failed to query bot name for {}: {}", bot_guid, e); + e + })? + }; + + let bucket_name = format!("{}.gbai", bot_name); + + { + let mounted = mounted_bots.lock().await; + if mounted.contains_key(&bot_guid) { + warn!("Bot {} is already mounted", bot_guid); + return Ok(()); + } + } + + let drive_monitor = Arc::new(DriveMonitor::new(state.clone(), bucket_name)); + + let _handle = drive_monitor.clone().spawn().await; + + { + let mut mounted = mounted_bots.lock().await; + mounted.insert(bot_guid.clone(), drive_monitor); + } + + info!("Bot {} mounted successfully", bot_guid); + Ok(()) + } + + pub async fn create_bot(&self, bot_guid: &str) -> Result<(), Box> { + let bucket_name = format!("{}{}.gbai", self.state.config.as_ref().unwrap().drive.org_prefix, bot_guid); + crate::create_bucket::create_bucket(&bucket_name)?; + Ok(()) + } + + pub async fn mount_bot(&self, bot_guid: &str) -> Result<(), Box> { + let bot_guid = bot_guid.strip_suffix(".gbai").unwrap_or(bot_guid).to_string(); + + use diesel::prelude::*; + use crate::shared::models::schema::bots::dsl::*; + + let bot_name: String = { + let mut db_conn = self.state.conn.lock().unwrap(); + bots + .filter(id.eq(Uuid::parse_str(&bot_guid)?)) + .select(name) + .first(&mut *db_conn) + .map_err(|e| { + error!("Failed to query bot name for {}: {}", bot_guid, e); + e + })? + }; + + let bucket_name = format!("{}.gbai", bot_name); + + { + let mounted_bots = self.mounted_bots.lock().await; + if mounted_bots.contains_key(&bot_guid) { + warn!("Bot {} is already mounted", bot_guid); + return Ok(()); + } + } + + let drive_monitor = Arc::new(DriveMonitor::new(self.state.clone(), bucket_name)); + + let _handle = drive_monitor.clone().spawn().await; + + { + let mut mounted_bots = self.mounted_bots.lock().await; + mounted_bots.insert(bot_guid.clone(), drive_monitor); + } + + Ok(()) } pub async fn handle_user_input( @@ -30,10 +153,7 @@ impl BotOrchestrator { session_id: Uuid, user_input: &str, ) -> Result, Box> { - info!( - "Handling user input for session {}: '{}'", - session_id, user_input - ); + info!("Handling user input for session {}: '{}'", session_id, user_input); let mut session_manager = self.state.session_manager.lock().await; session_manager.provide_input(session_id, user_input.to_string())?; Ok(None) @@ -74,10 +194,7 @@ impl BotOrchestrator { bot_id: &str, mode: i32, ) -> Result<(), Box> { - info!( - "Setting answer mode for user {} with bot {} to mode {}", - user_id, bot_id, mode - ); + info!("Setting answer mode for user {} with bot {} to mode {}", user_id, bot_id, mode); let mut session_manager = self.state.session_manager.lock().await; session_manager.update_answer_mode(user_id, bot_id, mode)?; Ok(()) @@ -92,10 +209,7 @@ impl BotOrchestrator { event_type: &str, data: serde_json::Value, ) -> Result<(), Box> { - info!( - "Sending event '{}' to session {} on channel {}", - event_type, session_id, channel - ); + info!("Sending event '{}' to session {} on channel {}", event_type, session_id, channel); let event_response = BotResponse { bot_id: bot_id.to_string(), user_id: user_id.to_string(), @@ -113,8 +227,9 @@ impl BotOrchestrator { if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) { adapter.send_message(event_response).await?; } else { - warn!("No channel adapter found for channel 1: {}", channel); + warn!("No channel adapter found for channel: {}", channel); } + Ok(()) } @@ -124,10 +239,7 @@ impl BotOrchestrator { channel: &str, content: &str, ) -> Result<(), Box> { - info!( - "Sending direct message to session {}: '{}'", - session_id, content - ); + info!("Sending direct message to session {}: '{}'", session_id, content); let bot_response = BotResponse { bot_id: "default_bot".to_string(), user_id: "default_user".to_string(), @@ -142,8 +254,9 @@ impl BotOrchestrator { if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) { adapter.send_message(bot_response).await?; } else { - warn!("No channel adapter found for channel 2: {}", channel); + warn!("No channel adapter found for direct message on channel: {}", channel); } + Ok(()) } @@ -151,60 +264,35 @@ impl BotOrchestrator { &self, message: UserMessage, ) -> Result<(), Box> { - info!( - "Processing message from channel: {}, user: {}, session: {}", - message.channel, message.user_id, message.session_id - ); - debug!( - "Message content: '{}', type: {}", - message.content, message.message_type - ); + info!("Processing message from channel: {}, user: {}, session: {}", message.channel, message.user_id, message.session_id); + debug!("Message content: '{}', type: {}", message.content, message.message_type); let user_id = Uuid::parse_str(&message.user_id).map_err(|e| { error!("Invalid user ID provided: {}", e); e })?; - let bot_id = if let Ok(bot_guid) = std::env::var("BOT_GUID") { - Uuid::parse_str(&bot_guid).map_err(|e| { - warn!("Invalid BOT_GUID from env: {}", e); - e - })? - } else { - warn!("BOT_GUID not set in environment, using nil UUID"); - Uuid::nil() - }; - + let bot_id = Uuid::nil(); let session = { let mut sm = self.state.session_manager.lock().await; let session_id = Uuid::parse_str(&message.session_id).map_err(|e| { error!("Invalid session ID: {}", e); e })?; + match sm.get_session_by_id(session_id)? { Some(session) => session, None => { - error!( - "Failed to create session for user {} with bot {}", - user_id, bot_id - ); + error!("Failed to create session for user {} with bot {}", user_id, bot_id); return Err("Failed to create session".into()); } } }; if self.is_waiting_for_input(session.id).await { - debug!( - "Session {} is waiting for input, processing as variable input", - session.id - ); - if let Some(variable_name) = - self.handle_user_input(session.id, &message.content).await? - { - debug!( - "Stored user input in variable '{}' for session {}", - variable_name, session.id - ); + debug!("Session {} is waiting for input, processing as variable input", session.id); + if let Some(variable_name) = self.handle_user_input(session.id, &message.content).await? { + info!("Stored user input in variable '{}' for session {}", variable_name, session.id); if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) { let ack_response = BotResponse { bot_id: message.bot_id.clone(), @@ -263,10 +351,7 @@ impl BotOrchestrator { if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) { adapter.send_message(bot_response).await?; } else { - warn!( - "No channel adapter found for channel 3: {}", - message.channel - ); + warn!("No channel adapter found for message channel: {}", message.channel); } Ok(()) @@ -298,7 +383,6 @@ impl BotOrchestrator { session_manager.get_conversation_history(session.id, session.user_id)? }; - // Prompt compactor: keep only last 10 entries let recent_history = if history.len() > 10 { &history[history.len() - 10..] } else { @@ -308,31 +392,23 @@ impl BotOrchestrator { for (role, content) in recent_history { prompt.push_str(&format!("{}: {}\n", role, content)); } + prompt.push_str(&format!("User: {}\nAssistant:", message.content)); - // Determine which cache backend to use let use_langcache = std::env::var("LLM_CACHE") .unwrap_or_else(|_| "false".to_string()) .eq_ignore_ascii_case("true"); if use_langcache { - // Ensure LangCache collection exists ensure_collection_exists(&self.state, "semantic_cache").await?; - - // Get LangCache client let langcache_client = get_langcache_client()?; - - // Isolate the user question (ignore conversation history) let isolated_question = message.content.trim().to_string(); - - // Generate embedding for the isolated question let question_embeddings = generate_embeddings(vec![isolated_question.clone()]).await?; let question_embedding = question_embeddings .get(0) .ok_or_else(|| "Failed to generate embedding for question")? .clone(); - // Search for similar question in LangCache let search_results = langcache_client .search("semantic_cache", question_embedding.clone(), 1) .await?; @@ -344,13 +420,11 @@ impl BotOrchestrator { } } - // Generate response via LLM provider using full prompt (including history) let response = self.state .llm_provider .generate(&prompt, &serde_json::Value::Null) .await?; - // Store isolated question and response in LangCache let point = QdrantPoint { id: uuid::Uuid::new_v4().to_string(), vector: question_embedding, @@ -360,26 +434,21 @@ impl BotOrchestrator { "response": response }), }; + langcache_client .upsert_points("semantic_cache", vec![point]) .await?; Ok(response) } else { - // Ensure semantic cache collection exists ensure_collection_exists(&self.state, "semantic_cache").await?; - - // Get Qdrant client let qdrant_client = get_qdrant_client(&self.state)?; - - // Generate embedding for the prompt let embeddings = generate_embeddings(vec![prompt.clone()]).await?; let embedding = embeddings .get(0) .ok_or_else(|| "Failed to generate embedding")? .clone(); - // Search for similar prompt in Qdrant let search_results = qdrant_client .search("semantic_cache", embedding.clone(), 1) .await?; @@ -392,13 +461,11 @@ impl BotOrchestrator { } } - // Generate response via LLM provider let response = self.state .llm_provider .generate(&prompt, &serde_json::Value::Null) .await?; - // Store prompt and response in Qdrant let point = QdrantPoint { id: uuid::Uuid::new_v4().to_string(), vector: embedding, @@ -407,14 +474,13 @@ impl BotOrchestrator { "response": response }), }; + qdrant_client .upsert_points("semantic_cache", vec![point]) .await?; Ok(response) } - - } pub async fn stream_response( @@ -422,10 +488,7 @@ impl BotOrchestrator { message: UserMessage, response_tx: mpsc::Sender, ) -> Result<(), Box> { - info!( - "Streaming response for user: {}, session: {}", - message.user_id, message.session_id - ); + info!("Streaming response for user: {}, session: {}", message.user_id, message.session_id); let user_id = Uuid::parse_str(&message.user_id).map_err(|e| { error!("Invalid user ID: {}", e); @@ -448,6 +511,7 @@ impl BotOrchestrator { error!("Invalid session ID: {}", e); e })?; + match sm.get_session_by_id(session_id)? { Some(sess) => sess, None => { @@ -500,12 +564,9 @@ impl BotOrchestrator { for (role, content) in &history { p.push_str(&format!("{}: {}\n", role, content)); } - p.push_str(&format!("User: {}\nAssistant:", message.content)); - debug!( - "Stream prompt constructed with {} history entries", - history.len() - ); + p.push_str(&format!("User: {}\nAssistant:", message.content)); + info!("Stream prompt constructed with {} history entries", history.len()); p }; @@ -556,22 +617,20 @@ impl BotOrchestrator { if !first_word_received && !chunk.trim().is_empty() { first_word_received = true; - debug!("First word received in stream: '{}'", chunk); } analysis_buffer.push_str(&chunk); + if analysis_buffer.contains("**") && !in_analysis { in_analysis = true; } if in_analysis { if analysis_buffer.ends_with("final") { - debug!( - "Analysis section completed, buffer length: {}", - analysis_buffer.len() - ); + info!("Analysis section completed, buffer length: {}", analysis_buffer.len()); in_analysis = false; analysis_buffer.clear(); + if message.channel == "web" { let orchestrator = BotOrchestrator::new(Arc::clone(&self.state)); orchestrator @@ -595,6 +654,7 @@ impl BotOrchestrator { } full_response.push_str(&chunk); + let partial = BotResponse { bot_id: message.bot_id.clone(), user_id: message.user_id.clone(), @@ -612,10 +672,7 @@ impl BotOrchestrator { } } - debug!( - "Stream processing completed, {} chunks processed", - chunk_count - ); + info!("Stream processing completed, {} chunks processed", chunk_count); { let mut sm = self.state.session_manager.lock().await; @@ -632,8 +689,8 @@ impl BotOrchestrator { stream_token: None, is_complete: true, }; - response_tx.send(final_msg).await?; + response_tx.send(final_msg).await?; Ok(()) } @@ -651,10 +708,7 @@ impl BotOrchestrator { session_id: Uuid, user_id: Uuid, ) -> Result, Box> { - info!( - "Getting conversation history for session {} user {}", - session_id, user_id - ); + info!("Getting conversation history for session {} user {}", session_id, user_id); let mut session_manager = self.state.session_manager.lock().await; let history = session_manager.get_conversation_history(session_id, user_id)?; Ok(history) @@ -665,12 +719,11 @@ impl BotOrchestrator { state: Arc, token: Option, ) -> Result> { - info!( - "Running start script for session: {} with token: {:?}", - session.id, token - ); - let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string()); + info!("Running start script for session: {} with token: {:?}", session.id, token); + + let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| String::from("default_bot")); let start_script_path = format!("./{}.gbai/.gbdialog/start.bas", bot_guid); + let start_script = match std::fs::read_to_string(&start_script_path) { Ok(content) => content, Err(_) => { @@ -678,10 +731,8 @@ impl BotOrchestrator { return Ok(true); } }; - debug!( - "Start script content for session {}: {}", - session.id, start_script - ); + + info!("Start script content for session {}: {}", session.id, start_script); let session_clone = session.clone(); let state_clone = state.clone(); @@ -694,17 +745,11 @@ impl BotOrchestrator { .and_then(|ast| script_service.run(&ast)) { Ok(result) => { - info!( - "Start script executed successfully for session {}, result: {}", - session_clone.id, result - ); + info!("Start script executed successfully for session {}, result: {}", session_clone.id, result); Ok(true) } Err(e) => { - error!( - "Failed to run start script for session {}: {}", - session_clone.id, e - ); + error!("Failed to run start script for session {}: {}", session_clone.id, e); Ok(false) } } @@ -716,10 +761,8 @@ impl BotOrchestrator { channel: &str, message: &str, ) -> Result<(), Box> { - warn!( - "Sending warning to session {} on channel {}: {}", - session_id, channel, message - ); + warn!("Sending warning to session {} on channel {}: {}", session_id, channel, message); + if channel == "web" { self.send_event( "system", @@ -747,10 +790,7 @@ impl BotOrchestrator { }; adapter.send_message(warn_response).await } else { - warn!( - "No channel adapter found for warning on channel: {}", - channel - ); + warn!("No channel adapter found for warning on channel: {}", channel); Ok(()) } } @@ -763,10 +803,8 @@ impl BotOrchestrator { _bot_id: &str, token: Option, ) -> Result> { - info!( - "Triggering auto welcome for user: {}, session: {}, token: {:?}", - user_id, session_id, token - ); + info!("Triggering auto welcome for user: {}, session: {}, token: {:?}", user_id, session_id, token); + let session_uuid = Uuid::parse_str(session_id).map_err(|e| { error!("Invalid session ID: {}", e); e @@ -784,22 +822,53 @@ impl BotOrchestrator { }; let result = Self::run_start_script(&session, Arc::clone(&self.state), token).await?; - info!( - "Auto welcome completed for session: {} with result: {}", - session_id, result - ); + info!("Auto welcome completed for session: {} with result: {}", session_id, result); Ok(result) } +} - async fn get_web_response_channel( - &self, - session_id: &str, - ) -> Result, Box> { - let response_channels = self.state.response_channels.lock().await; - if let Some(tx) = response_channels.get(session_id) { - Ok(tx.clone()) - } else { - Err("No response channel found for session".into()) +pub fn bot_from_url( + db_conn: &mut PgConnection, + path: &str +) -> Result<(Uuid, String), HttpResponse> { + use crate::shared::models::schema::bots::dsl::*; + use diesel::prelude::*; + + // Extract bot name from first path segment + if let Some(bot_name) = path.split('/').nth(1).filter(|s| !s.is_empty()) { + match bots + .filter(name.eq(bot_name)) + .filter(is_active.eq(true)) + .select((id, name)) + .first::<(Uuid, String)>(db_conn) + .optional() + { + Ok(Some((bot_id, bot_name))) => return Ok((bot_id, bot_name)), + Ok(None) => warn!("No active bot found with name: {}", bot_name), + Err(e) => error!("Failed to query bot by name: {}", e), + } + } + + // Fall back to first available bot + match bots + .filter(is_active.eq(true)) + .select((id, name)) + .first::<(Uuid, String)>(db_conn) + .optional() + { + Ok(Some((first_bot_id, first_bot_name))) => { + log::info!("Using first available bot: {} ({})", first_bot_id, first_bot_name); + Ok((first_bot_id, first_bot_name)) + } + Ok(None) => { + error!("No active bots found in database"); + Err(HttpResponse::ServiceUnavailable() + .json(serde_json::json!({"error": "No bots available"}))) + } + Err(e) => { + error!("Failed to query bots: {}", e); + Err(HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Failed to query bots"}))) } } } @@ -808,6 +877,7 @@ impl Default for BotOrchestrator { fn default() -> Self { Self { state: Arc::new(AppState::default()), + mounted_bots: Arc::new(AsyncMutex::new(HashMap::new())), } } } @@ -826,7 +896,6 @@ async fn websocket_handler( .unwrap_or_else(|| Uuid::new_v4().to_string()) .replace("undefined", &Uuid::new_v4().to_string()); - // Ensure user exists in database before proceeding let user_id = { let user_uuid = Uuid::parse_str(&user_id_string).unwrap_or_else(|_| Uuid::new_v4()); let mut sm = data.session_manager.lock().await; @@ -841,8 +910,8 @@ async fn websocket_handler( let (res, mut session, mut msg_stream) = actix_ws::handle(&req, stream)?; let (tx, mut rx) = mpsc::channel::(100); - let orchestrator = BotOrchestrator::new(Arc::clone(&data)); + let orchestrator = BotOrchestrator::new(Arc::clone(&data)); orchestrator .register_response_channel(session_id.clone(), tx.clone()) .await; @@ -855,7 +924,6 @@ async fn websocket_handler( .add_connection(session_id.clone(), tx.clone()) .await; - // Get first available bot from database let bot_id = { use crate::shared::models::schema::bots::dsl::*; use diesel::prelude::*; @@ -897,16 +965,13 @@ async fn websocket_handler( .await .ok(); - info!( - "WebSocket connection established for session: {}, user: {}", - session_id, user_id - ); + info!("WebSocket connection established for session: {}, user: {}", session_id, user_id); - // Trigger auto welcome (start.bas) let orchestrator_clone = BotOrchestrator::new(Arc::clone(&data)); let user_id_welcome = user_id.clone(); let session_id_welcome = session_id.clone(); let bot_id_welcome = bot_id.clone(); + actix_web::rt::spawn(async move { if let Err(e) = orchestrator_clone .trigger_auto_welcome(&session_id_welcome, &user_id_welcome, &bot_id_welcome, None) @@ -922,10 +987,7 @@ async fn websocket_handler( let user_id_clone = user_id.clone(); actix_web::rt::spawn(async move { - info!( - "Starting WebSocket sender for session {}", - session_id_clone1 - ); + info!("Starting WebSocket sender for session {}", session_id_clone1); let mut message_count = 0; while let Some(msg) = rx.recv().await { message_count += 1; @@ -936,23 +998,17 @@ async fn websocket_handler( } } } - info!( - "WebSocket sender terminated for session {}, sent {} messages", - session_id_clone1, message_count - ); + info!("WebSocket sender terminated for session {}, sent {} messages", session_id_clone1, message_count); }); actix_web::rt::spawn(async move { - info!( - "Starting WebSocket receiver for session {}", - session_id_clone2 - ); + info!("Starting WebSocket receiver for session {}", session_id_clone2); let mut message_count = 0; while let Some(Ok(msg)) = msg_stream.recv().await { match msg { WsMessage::Text(text) => { message_count += 1; - // Get first available bot from database + let bot_id = { use crate::shared::models::schema::bots::dsl::*; use diesel::prelude::*; @@ -976,42 +1032,37 @@ async fn websocket_handler( } }; - // Parse the text as JSON to extract the content field let json_value: serde_json::Value = match serde_json::from_str(&text) { Ok(value) => value, Err(e) => { error!("Error parsing JSON message {}: {}", message_count, e); - continue; // Skip processing this message + continue; } }; - // Extract content from JSON, fallback to original text if content field doesn't exist let content = json_value["content"] .as_str() .map(|s| s.to_string()) .unwrap(); let user_message = UserMessage { - bot_id: bot_id, + bot_id, user_id: user_id_clone.clone(), session_id: session_id_clone2.clone(), channel: "web".to_string(), - content: content, + content, message_type: 1, media_url: None, timestamp: Utc::now(), }; if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await { - error!( - "Error processing WebSocket message {}: {}", - message_count, e - ); + error!("Error processing WebSocket message {}: {}", message_count, e); } } - - WsMessage::Close(_) => { - // Get first available bot from database + WsMessage::Close(reason) => { + debug!("WebSocket closing for session {} - reason: {:?}", session_id_clone2, reason); + let bot_id = { use crate::shared::models::schema::bots::dsl::*; use diesel::prelude::*; @@ -1034,7 +1085,9 @@ async fn websocket_handler( } } }; - orchestrator + + debug!("Sending session_end event for {}", session_id_clone2); + if let Err(e) = orchestrator .send_event( &user_id_clone, &bot_id, @@ -1044,26 +1097,28 @@ async fn websocket_handler( serde_json::json!({}), ) .await - .ok(); + { + error!("Failed to send session_end event: {}", e); + } + + debug!("Removing WebSocket connection for {}", session_id_clone2); web_adapter.remove_connection(&session_id_clone2).await; + + debug!("Unregistering response channel for {}", session_id_clone2); orchestrator .unregister_response_channel(&session_id_clone2) .await; + + info!("WebSocket fully closed for session {}", session_id_clone2); break; } _ => {} } } - info!( - "WebSocket receiver terminated for session {}, processed {} messages", - session_id_clone2, message_count - ); + info!("WebSocket receiver terminated for session {}, processed {} messages", session_id_clone2, message_count); }); - info!( - "WebSocket handler setup completed for session {}", - session_id - ); + info!("WebSocket handler setup completed for session {}", session_id); Ok(res) } @@ -1076,6 +1131,7 @@ async fn start_session( .get("session_id") .and_then(|s| s.as_str()) .unwrap_or(""); + let token = info .get("token") .and_then(|t| t.as_str()) @@ -1109,12 +1165,10 @@ async fn start_session( }; let result = BotOrchestrator::run_start_script(&session, Arc::clone(&data), token).await; + match result { Ok(true) => { - info!( - "Start script completed successfully for session: {}", - session_id - ); + info!("Start script completed successfully for session: {}", session_id); Ok(HttpResponse::Ok().json(serde_json::json!({ "status": "started", "session_id": session.id, @@ -1130,10 +1184,7 @@ async fn start_session( }))) } Err(e) => { - error!( - "Error running start script for session {}: {}", - session_id, e - ); + error!("Error running start script for session {}: {}", session_id, e); Ok(HttpResponse::InternalServerError() .json(serde_json::json!({"error": e.to_string()}))) } @@ -1148,14 +1199,12 @@ async fn send_warning_handler( let default_session = "default".to_string(); let default_channel = "web".to_string(); let default_message = "Warning!".to_string(); + let session_id = info.get("session_id").unwrap_or(&default_session); let channel = info.get("channel").unwrap_or(&default_channel); let message = info.get("message").unwrap_or(&default_message); - info!( - "Sending warning via API - session: {}, channel: {}", - session_id, channel - ); + info!("Sending warning via API - session: {}, channel: {}", session_id, channel); let orchestrator = BotOrchestrator::new(Arc::clone(&data)); if let Err(e) = orchestrator diff --git a/src/config/mod.rs b/src/config/mod.rs index f0f75a58..2724ddbc 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -3,6 +3,8 @@ use diesel::sql_types::Text; use log::{info, warn}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::fs::OpenOptions; +use std::io::Write; use std::path::PathBuf; use std::sync::{Arc, Mutex}; @@ -190,11 +192,18 @@ impl AppConfig { .and_then(|p| p.parse().ok()) .unwrap_or_else(|| get_u32("CUSTOM_PORT", 5432)), database: std::env::var("CUSTOM_DATABASE") - .unwrap_or_else(|_| get_str("CUSTOM_DATABASE", "botserver")), + .unwrap_or_else(|_| get_str("CUSTOM_DATABASE", "gbuser")), }; let minio = DriveConfig { - server: get_str("DRIVE_SERVER", "http://localhost:9000"), + server: { + let server = get_str("DRIVE_SERVER", "http://localhost:9000"); + if !server.starts_with("http://") && !server.starts_with("https://") { + format!("http://{}", server) + } else { + server + } + }, access_key: get_str("DRIVE_ACCESSKEY", "minioadmin"), secret_key: get_str("DRIVE_SECRET", "minioadmin"), use_ssl: get_bool("DRIVE_USE_SSL", false), @@ -216,6 +225,11 @@ impl AppConfig { endpoint: get_str("AI_ENDPOINT", "https://api.openai.com"), }; + // Write drive config to .env file + if let Err(e) = write_drive_config_to_env(&minio) { + warn!("Failed to write drive config to .env: {}", e); + } + AppConfig { drive: minio, server: ServerConfig { @@ -367,6 +381,22 @@ impl AppConfig { } } +fn write_drive_config_to_env(drive: &DriveConfig) -> std::io::Result<()> { + let mut file = OpenOptions::new() + .append(true) + .create(true) + .open(".env")?; + + writeln!(file,"")?; + writeln!(file, "DRIVE_SERVER={}", drive.server)?; + writeln!(file, "DRIVE_ACCESSKEY={}", drive.access_key)?; + writeln!(file, "DRIVE_SECRET={}", drive.secret_key)?; + writeln!(file, "DRIVE_USE_SSL={}", drive.use_ssl)?; + writeln!(file, "DRIVE_ORG_PREFIX={}", drive.org_prefix)?; + + Ok(()) +} + fn parse_database_url(url: &str) -> (String, String, String, u32, String) { if let Some(stripped) = url.strip_prefix("postgres://") { let parts: Vec<&str> = stripped.split('@').collect(); diff --git a/src/create_bucket.rs b/src/create_bucket.rs new file mode 100644 index 00000000..c2163446 --- /dev/null +++ b/src/create_bucket.rs @@ -0,0 +1,8 @@ +use std::fs; +use std::path::Path; + +pub fn create_bucket(bucket_name: &str) -> std::io::Result<()> { + let bucket_path = Path::new("buckets").join(bucket_name); + fs::create_dir_all(&bucket_path)?; + Ok(()) +} diff --git a/src/drive_monitor/mod.rs b/src/drive_monitor/mod.rs index 95663671..bf61fbbb 100644 --- a/src/drive_monitor/mod.rs +++ b/src/drive_monitor/mod.rs @@ -2,8 +2,8 @@ use crate::basic::compiler::BasicCompiler; use crate::kb::embeddings; use crate::kb::qdrant_client; use crate::shared::state::AppState; +use aws_sdk_s3::Client; use log::{debug, error, info, warn}; -use opendal::Operator; use std::collections::HashMap; use std::error::Error; use std::sync::Arc; @@ -34,7 +34,10 @@ impl DriveMonitor { pub fn spawn(self: Arc) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - info!("Drive Monitor service started for bucket: {}", self.bucket_name); + info!( + "Drive Monitor service started for bucket: {}", + self.bucket_name + ); let mut tick = interval(Duration::from_secs(30)); loop { tick.tick().await; @@ -46,17 +49,17 @@ impl DriveMonitor { } async fn check_for_changes(&self) -> Result<(), Box> { - let op = match &self.state.s3_operator { - Some(op) => op, + let client = match &self.state.s3_client { + Some(client) => client, None => { return Ok(()); } }; - self.check_gbdialog_changes(op).await?; - self.check_gbkb_changes(op).await?; - - if let Err(e) = self.check_default_gbot(op).await { + self.check_gbdialog_changes(client).await?; + self.check_gbkb_changes(client).await?; + + if let Err(e) = self.check_gbot(client).await { error!("Error checking default bot config: {}", e); } @@ -65,40 +68,57 @@ impl DriveMonitor { async fn check_gbdialog_changes( &self, - op: &Operator, + client: &Client, ) -> Result<(), Box> { let prefix = ".gbdialog/"; - + let mut current_files = HashMap::new(); - - let mut lister = op.lister_with(prefix).recursive(true).await?; - while let Some(entry) = futures::TryStreamExt::try_next(&mut lister).await? { - let path = entry.path().to_string(); - - if path.ends_with('/') || !path.ends_with(".bas") { - continue; + + let mut continuation_token = None; + loop { + let list_objects = client + .list_objects_v2() + .bucket(&self.bucket_name.to_lowercase()) + .set_continuation_token(continuation_token) + .send() + .await?; + debug!("List objects result: {:?}", list_objects); + + for obj in list_objects.contents.unwrap_or_default() { + let path = obj.key().unwrap_or_default().to_string(); + let path_parts: Vec<&str> = path.split('/').collect(); + if path_parts.len() < 2 || !path_parts[0].ends_with(".gbdialog") { + continue; + } + if path.ends_with('/') || !path.ends_with(".bas") { + continue; + } + + let file_state = FileState { + path: path.clone(), + size: obj.size().unwrap_or(0), + etag: obj.e_tag().unwrap_or_default().to_string(), + last_modified: obj.last_modified().map(|dt| dt.to_string()), + }; + current_files.insert(path, file_state); } - let meta = op.stat(&path).await?; - let file_state = FileState { - path: path.clone(), - size: meta.content_length() as i64, - etag: meta.etag().unwrap_or_default().to_string(), - last_modified: meta.last_modified().map(|dt| dt.to_rfc3339()), - }; - current_files.insert(path, file_state); + if !list_objects.is_truncated.unwrap_or(false) { + break; + } + continuation_token = list_objects.next_continuation_token; } let mut file_states = self.file_states.write().await; for (path, current_state) in current_files.iter() { if let Some(previous_state) = file_states.get(path) { if current_state.etag != previous_state.etag { - if let Err(e) = self.compile_tool(op, path).await { + if let Err(e) = self.compile_tool(client, path).await { error!("Failed to compile tool {}: {}", path, e); } } } else { - if let Err(e) = self.compile_tool(op, path).await { + if let Err(e) = self.compile_tool(client, path).await { error!("Failed to compile tool {}: {}", path, e); } } @@ -125,45 +145,67 @@ impl DriveMonitor { async fn check_gbkb_changes( &self, - op: &Operator, + client: &Client, ) -> Result<(), Box> { let prefix = ".gbkb/"; - + let mut current_files = HashMap::new(); - - let mut lister = op.lister_with(prefix).recursive(true).await?; - while let Some(entry) = futures::TryStreamExt::try_next(&mut lister).await? { - let path = entry.path().to_string(); - - if path.ends_with('/') { - continue; + + let mut continuation_token = None; + loop { + let list_objects = client + .list_objects_v2() + .bucket(&self.bucket_name.to_lowercase()) + .prefix(prefix) + .set_continuation_token(continuation_token) + .send() + .await?; + debug!("List objects result: {:?}", list_objects); + + for obj in list_objects.contents.unwrap_or_default() { + let path = obj.key().unwrap_or_default().to_string(); + + let path_parts: Vec<&str> = path.split('/').collect(); + if path_parts.len() < 2 || !path_parts[0].ends_with(".gbkb") { + continue; + } + + + + if path.ends_with('/') { + continue; + } + + let ext = path.rsplit('.').next().unwrap_or("").to_lowercase(); + if !["pdf", "txt", "md", "docx"].contains(&ext.as_str()) { + continue; + } + + let file_state = FileState { + path: path.clone(), + size: obj.size().unwrap_or(0), + etag: obj.e_tag().unwrap_or_default().to_string(), + last_modified: obj.last_modified().map(|dt| dt.to_string()), + }; + current_files.insert(path, file_state); } - let ext = path.rsplit('.').next().unwrap_or("").to_lowercase(); - if !["pdf", "txt", "md", "docx"].contains(&ext.as_str()) { - continue; + if !list_objects.is_truncated.unwrap_or(false) { + break; } - - let meta = op.stat(&path).await?; - let file_state = FileState { - path: path.clone(), - size: meta.content_length() as i64, - etag: meta.etag().unwrap_or_default().to_string(), - last_modified: meta.last_modified().map(|dt| dt.to_rfc3339()), - }; - current_files.insert(path, file_state); + continuation_token = list_objects.next_continuation_token; } let mut file_states = self.file_states.write().await; for (path, current_state) in current_files.iter() { if let Some(previous_state) = file_states.get(path) { if current_state.etag != previous_state.etag { - if let Err(e) = self.index_document(op, path).await { + if let Err(e) = self.index_document(client, path).await { error!("Failed to index document {}: {}", path, e); } } } else { - if let Err(e) = self.index_document(op, path).await { + if let Err(e) = self.index_document(client, path).await { error!("Failed to index document {}: {}", path, e); } } @@ -188,38 +230,103 @@ impl DriveMonitor { Ok(()) } - async fn check_default_gbot( + async fn check_gbot( &self, - op: &Operator, + client: &Client, ) -> Result<(), Box> { - let prefix = format!("{}default.gbot/", self.bucket_name); - let config_key = format!("{}config.csv", prefix); - - match op.stat(&config_key).await { - Ok(_) => { - let content = op.read(&config_key).await?; - let csv_content = String::from_utf8(content.to_vec()) - .map_err(|e| format!("UTF-8 error in config.csv: {}", e))?; - debug!("Found config.csv: {} bytes", csv_content.len()); - Ok(()) + let prefix = ".gbot/"; + let mut continuation_token = None; + + loop { + let list_objects = client + .list_objects_v2() + .bucket(&self.bucket_name.to_lowercase()) + .prefix(prefix) + .set_continuation_token(continuation_token) + .send() + .await?; + + for obj in list_objects.contents.unwrap_or_default() { + let path = obj.key().unwrap_or_default().to_string(); + let path_parts: Vec<&str> = path.split('/').collect(); + + if path_parts.len() < 2 || !path_parts[0].ends_with(".gbot") { + continue; + } + + if !path.ends_with("config.csv") { + continue; + } + + debug!("Checking config file at path: {}", path); + match client + .head_object() + .bucket(&self.bucket_name) + .key(&path) + .send() + .await + { + Ok(head_res) => { + debug!("HeadObject successful for {}, metadata: {:?}", path, head_res); + let response = client + .get_object() + .bucket(&self.bucket_name) + .key(&path) + .send() + .await?; + debug!("GetObject successful for {}, content length: {}", path, response.content_length().unwrap_or(0)); + + let bytes = response.body.collect().await?.into_bytes(); + debug!("Collected {} bytes for {}", bytes.len(), path); + let csv_content = String::from_utf8(bytes.to_vec()) + .map_err(|e| format!("UTF-8 error in {}: {}", path, e))?; + debug!("Found {}: {} bytes", path, csv_content.len()); + } + Err(e) => { + debug!("Config file {} not found or inaccessible: {}", path, e); + } + } } - Err(e) => { - debug!("Config file not found or inaccessible: {}", e); - Ok(()) + + if !list_objects.is_truncated.unwrap_or(false) { + break; } + continuation_token = list_objects.next_continuation_token; } + + Ok(()) } async fn compile_tool( &self, - op: &Operator, + client: &Client, file_path: &str, ) -> Result<(), Box> { - let content = op.read(file_path).await?; - let source_content = String::from_utf8(content.to_vec())?; + debug!("Fetching object from S3: bucket={}, key={}", &self.bucket_name, file_path); + let response = match client + .get_object() + .bucket(&self.bucket_name) + .key(file_path) + .send() + .await { + Ok(res) => { + debug!("Successfully fetched object from S3: bucket={}, key={}, size={}", + &self.bucket_name, file_path, res.content_length().unwrap_or(0)); + res + } + Err(e) => { + error!("Failed to fetch object from S3: bucket={}, key={}, error={:?}", + &self.bucket_name, file_path, e); + return Err(e.into()); + } + }; + + let bytes = response.body.collect().await?.into_bytes(); + let source_content = String::from_utf8(bytes.to_vec())?; let tool_name = file_path - .strip_prefix(".gbdialog/") + .split('/') + .last() .unwrap_or(file_path) .strip_suffix(".bas") .unwrap_or(file_path) @@ -229,7 +336,7 @@ impl DriveMonitor { .bucket_name .strip_suffix(".gbai") .unwrap_or(&self.bucket_name); - let work_dir = format!("./work/{}.gbai/.gbdialog", bot_name); + let work_dir = format!("./work/{}.gbai/{}.gbdialog", bot_name, bot_name); std::fs::create_dir_all(&work_dir)?; let local_source_path = format!("{}/{}.bas", work_dir, tool_name); @@ -254,7 +361,7 @@ impl DriveMonitor { async fn index_document( &self, - op: &Operator, + client: &Client, file_path: &str, ) -> Result<(), Box> { let parts: Vec<&str> = file_path.split('/').collect(); @@ -264,9 +371,14 @@ impl DriveMonitor { } let collection_name = parts[1]; - let content = op.read(file_path).await?; - let bytes = content.to_vec(); - + let response = client + .get_object() + .bucket(&self.bucket_name) + .key(file_path) + .send() + .await?; + let bytes = response.body.collect().await?.into_bytes(); + let text_content = self.extract_text(file_path, &bytes)?; if text_content.trim().is_empty() { warn!("No text extracted from: {}", file_path); @@ -281,7 +393,7 @@ impl DriveMonitor { let qdrant_collection = format!("kb_default_{}", collection_name); qdrant_client::ensure_collection_exists(&self.state, &qdrant_collection).await?; - + embeddings::index_document(&self.state, &qdrant_collection, file_path, &text_content) .await?; diff --git a/src/file/mod.rs b/src/file/mod.rs index bf1530ea..2f9d2927 100644 --- a/src/file/mod.rs +++ b/src/file/mod.rs @@ -3,10 +3,12 @@ use crate::shared::state::AppState; use actix_multipart::Multipart; use actix_web::web; use actix_web::{post, HttpResponse}; -use opendal::Operator; +use aws_sdk_s3::{Client as S3Client, config::Builder as S3ConfigBuilder}; +use aws_config::BehaviorVersion; use std::io::Write; use tempfile::NamedTempFile; use tokio_stream::StreamExt as TokioStreamExt; +// Removed unused import #[post("/files/upload/{folder_path}")] pub async fn upload_file( @@ -40,13 +42,13 @@ pub async fn upload_file( let file_name = file_name.unwrap_or_else(|| "unnamed_file".to_string()); let temp_file_path = temp_file.into_temp_path(); - let op = state.get_ref().s3_operator.as_ref().ok_or_else(|| { - actix_web::error::ErrorInternalServerError("S3 operator is not initialized") + let client = state.get_ref().s3_client.as_ref().ok_or_else(|| { + actix_web::error::ErrorInternalServerError("S3 client is not initialized") })?; let s3_key = format!("{}/{}", folder_path, file_name); - match upload_to_s3(op, &s3_key, &temp_file_path).await { + match upload_to_s3(client, &state.get_ref().bucket_name, &s3_key, &temp_file_path).await { Ok(_) => { let _ = std::fs::remove_file(&temp_file_path); Ok(HttpResponse::Ok().body(format!( @@ -64,27 +66,149 @@ pub async fn upload_file( } } -pub async fn init_drive(config: &DriveConfig) -> Result> { - use opendal::services::S3; - use opendal::Operator; - let client = Operator::new( - S3::default() - .root("/") - .endpoint(&config.server) - .access_key_id(&config.access_key) - .secret_access_key(&config.secret_key), - )? - .finish(); +pub async fn aws_s3_bucket_delete( + bucket: &str, + endpoint: &str, + access_key: &str, + secret_key: &str, +) -> Result<(), Box> { + let config = aws_config::defaults(BehaviorVersion::latest()) + .endpoint_url(endpoint) + .region("auto") + .credentials_provider( + aws_sdk_s3::config::Credentials::new( + access_key.to_string(), + secret_key.to_string(), + None, + None, + "static", + ) + ) + .load() + .await; - Ok(client) + let client = S3Client::new(&config); + client.delete_bucket() + .bucket(bucket) + .send() + .await?; + Ok(()) +} + +pub async fn aws_s3_bucket_create( + bucket: &str, + endpoint: &str, + access_key: &str, + secret_key: &str, +) -> Result<(), Box> { + let config = aws_config::defaults(BehaviorVersion::latest()) + .endpoint_url(endpoint) + .region("auto") + .credentials_provider( + aws_sdk_s3::config::Credentials::new( + access_key.to_string(), + secret_key.to_string(), + None, + None, + "static", + ) + ) + .load() + .await; + + let client = S3Client::new(&config); + client.create_bucket() + .bucket(bucket) + .send() + .await?; + Ok(()) +} + +pub async fn init_drive(config: &DriveConfig) -> Result> { + let endpoint = if !config.server.ends_with('/') { + format!("{}/", config.server) + } else { + config.server.clone() + }; + + let base_config = aws_config::defaults(BehaviorVersion::latest()) + .endpoint_url(endpoint) + .region("auto") + .credentials_provider( + aws_sdk_s3::config::Credentials::new( + config.access_key.clone(), + config.secret_key.clone(), + None, + None, + "static", + ) + ) + .load() + .await; + + let s3_config = S3ConfigBuilder::from(&base_config) + .force_path_style(true) + .build(); + + Ok(S3Client::from_conf(s3_config)) } async fn upload_to_s3( - op: &Operator, + client: &S3Client, + bucket: &str, key: &str, file_path: &std::path::Path, ) -> Result<(), Box> { let data = std::fs::read(file_path)?; - op.write(key, data).await?; + client.put_object() + .bucket(bucket) + .key(key) + .body(data.into()) + .send() + .await?; Ok(()) } + +async fn create_s3_client( + +) -> Result> { + let config = DriveConfig { + server: std::env::var("DRIVE_SERVER").expect("DRIVE_SERVER not set"), + access_key: std::env::var("DRIVE_ACCESS_KEY").expect("DRIVE_ACCESS_KEY not set"), + secret_key: std::env::var("DRIVE_SECRET_KEY").expect("DRIVE_SECRET_KEY not set"), + org_prefix: "".to_string(), + use_ssl: false, + }; + Ok(init_drive(&config).await?) +} + +pub async fn bucket_exists(client: &S3Client, bucket: &str) -> Result> { + match client.head_bucket().bucket(bucket).send().await { + Ok(_) => Ok(true), + Err(e) => { + if e.to_string().contains("NoSuchBucket") { + Ok(false) + } else { + Err(Box::new(e)) + } + } + } +} + +pub async fn create_bucket(client: &S3Client, bucket: &str) -> Result<(), Box> { + client.create_bucket() + .bucket(bucket) + .send() + .await?; + Ok(()) +} + +#[cfg(test)] +mod bucket_tests { + include!("tests/bucket_tests.rs"); +} + +#[cfg(test)] +mod tests { + include!("tests/tests.rs"); +} diff --git a/src/file/tests/bucket_tests.rs b/src/file/tests/bucket_tests.rs new file mode 100644 index 00000000..ade3cac3 --- /dev/null +++ b/src/file/tests/bucket_tests.rs @@ -0,0 +1,70 @@ +use super::*; +use aws_sdk_s3::Client as S3Client; +use std::env; + +#[tokio::test] +async fn test_aws_s3_bucket_create() { + if env::var("CI").is_ok() { + return; // Skip in CI environment + } + + let bucket = "test-bucket-aws"; + let endpoint = "http://localhost:4566"; // LocalStack default endpoint + let access_key = "test"; + let secret_key = "test"; + + match aws_s3_bucket_create(bucket, endpoint, access_key, secret_key).await { + Ok(_) => { + // Verify bucket exists + let config = aws_config::defaults(BehaviorVersion::latest()) + .endpoint_url(endpoint) + .region("auto") + .load() + .await; + let client = S3Client::new(&config); + + let exists = bucket_exists(&client, bucket).await.unwrap_or(false); + assert!(exists, "Bucket should exist after creation"); + }, + Err(e) => { + println!("Bucket creation failed: {:?}", e); + } + } +} + +#[tokio::test] +async fn test_aws_s3_bucket_delete() { + if env::var("CI").is_ok() { + return; // Skip in CI environment + } + + let bucket = "test-delete-bucket-aws"; + let endpoint = "http://localhost:4566"; // LocalStack default endpoint + let access_key = "test"; + let secret_key = "test"; + + // First create the bucket + if let Err(e) = aws_s3_bucket_create(bucket, endpoint, access_key, secret_key).await { + println!("Failed to create test bucket: {:?}", e); + return; + } + + // Then test deletion + match aws_s3_bucket_delete(bucket, endpoint, access_key, secret_key).await { + Ok(_) => { + // Verify bucket no longer exists + let config = aws_config::defaults(BehaviorVersion::latest()) + .endpoint_url(endpoint) + .region("auto") + .load() + .await; + let client = S3Client::new(&config); + + let exists = bucket_exists(&client, bucket).await.unwrap_or(false); + assert!(!exists, "Bucket should not exist after deletion"); + }, + Err(e) => { + println!("Bucket deletion failed: {:?}", e); + } + } +} diff --git a/src/file/tests/tests.rs b/src/file/tests/tests.rs new file mode 100644 index 00000000..072f8bf3 --- /dev/null +++ b/src/file/tests/tests.rs @@ -0,0 +1,80 @@ +use super::*; + +#[tokio::test] +async fn test_create_s3_client() { + if std::env::var("CI").is_ok() { + return; // Skip in CI environment + } + + // Setup test environment variables + std::env::set_var("DRIVE_SERVER", "http://localhost:9000"); + std::env::set_var("DRIVE_ACCESS_KEY", "minioadmin"); + std::env::set_var("DRIVE_SECRET_KEY", "minioadmin"); + + match create_s3_client().await { + Ok(client) => { + // Verify client creation + assert!(client.config().region().is_some()); + + // Test bucket operations + if let Err(e) = create_bucket(&client, "test.gbai").await { + println!("Bucket creation failed: {:?}", e); + } + }, + Err(e) => { + // Skip if no S3 server available + println!("S3 client creation failed: {:?}", e); + } + } + + // Cleanup + std::env::remove_var("DRIVE_SERVER"); + std::env::remove_var("DRIVE_ACCESS_KEY"); + std::env::remove_var("DRIVE_SECRET_KEY"); +} + +#[tokio::test] +async fn test_bucket_exists() { + if std::env::var("CI").is_ok() { + return; // Skip in CI environment + } + + // Setup test environment variables + std::env::set_var("DRIVE_SERVER", "http://localhost:9000"); + std::env::set_var("DRIVE_ACCESS_KEY", "minioadmin"); + std::env::set_var("DRIVE_SECRET_KEY", "minioadmin"); + + match create_s3_client().await { + Ok(client) => { + // Verify client creation + assert!(client.config().region().is_some()); + }, + Err(e) => { + // Skip if no S3 server available + println!("S3 client creation failed: {:?}", e); + } + } +} + +#[tokio::test] +async fn test_create_bucket() { + if std::env::var("CI").is_ok() { + return; // Skip in CI environment + } + + // Setup test environment variables + std::env::set_var("DRIVE_SERVER", "http://localhost:9000"); + std::env::set_var("DRIVE_ACCESS_KEY", "minioadmin"); + std::env::set_var("DRIVE_SECRET_KEY", "minioadmin"); + + match create_s3_client().await { + Ok(client) => { + // Verify client creation + assert!(client.config().region().is_some()); + }, + Err(e) => { + // Skip if no S3 server available + println!("S3 client creation failed: {:?}", e); + } + } +} diff --git a/src/kb/minio_handler.rs b/src/kb/minio_handler.rs index 82608bac..675303be 100644 --- a/src/kb/minio_handler.rs +++ b/src/kb/minio_handler.rs @@ -1,7 +1,6 @@ use crate::shared::state::AppState; use log::error; -use opendal::Operator; -use tokio_stream::StreamExt; +use aws_sdk_s3::Client; use std::collections::HashMap; use std::error::Error; use std::sync::Arc; @@ -17,14 +16,32 @@ pub struct FileState { pub struct MinIOHandler { state: Arc, + s3: Arc, watched_prefixes: Arc>>, file_states: Arc>>, } +pub async fn get_file_content( + client: &aws_sdk_s3::Client, + bucket: &str, + key: &str +) -> Result, Box> { + let response = client.get_object() + .bucket(bucket) + .key(key) + .send() + .await?; + + let bytes = response.body.collect().await?.into_bytes().to_vec(); + Ok(bytes) +} + impl MinIOHandler { pub fn new(state: Arc) -> Self { + let client = state.s3_client.as_ref().expect("S3 client must be initialized").clone(); Self { - state, + state: Arc::clone(&state), + s3: Arc::new(client), watched_prefixes: Arc::new(tokio::sync::RwLock::new(Vec::new())), file_states: Arc::new(tokio::sync::RwLock::new(HashMap::new())), } @@ -61,16 +78,9 @@ impl MinIOHandler { &self, callback: &Arc, ) -> Result<(), Box> { - let op = match &self.state.s3_operator { - Some(op) => op, - None => { - return Ok(()); - } - }; - let prefixes = self.watched_prefixes.read().await; for prefix in prefixes.iter() { - if let Err(e) = self.check_prefix_changes(op, prefix, callback).await { + if let Err(e) = self.check_prefix_changes(&self.s3, prefix, callback).await { error!("Error checking prefix {}: {}", prefix, e); } } @@ -79,28 +89,41 @@ impl MinIOHandler { async fn check_prefix_changes( &self, - op: &Operator, + client: &Client, prefix: &str, callback: &Arc, ) -> Result<(), Box> { let mut current_files = HashMap::new(); - let mut lister = op.lister_with(prefix).recursive(true).await?; - while let Some(entry) = lister.try_next().await? { - let path = entry.path().to_string(); - - if path.ends_with('/') { - continue; + let mut continuation_token = None; + loop { + let list_objects = client.list_objects_v2() + .bucket(&self.state.bucket_name) + .prefix(prefix) + .set_continuation_token(continuation_token) + .send() + .await?; + + for obj in list_objects.contents.unwrap_or_default() { + let path = obj.key().unwrap_or_default().to_string(); + + if path.ends_with('/') { + continue; + } + + let file_state = FileState { + path: path.clone(), + size: obj.size().unwrap_or(0), + etag: obj.e_tag().unwrap_or_default().to_string(), + last_modified: obj.last_modified().map(|dt| dt.to_string()), + }; + current_files.insert(path, file_state); } - let meta = op.stat(&path).await?; - let file_state = FileState { - path: path.clone(), - size: meta.content_length() as i64, - etag: meta.etag().unwrap_or_default().to_string(), - last_modified: meta.last_modified().map(|dt| dt.to_rfc3339()), - }; - current_files.insert(path, file_state); + if !list_objects.is_truncated.unwrap_or(false) { + break; + } + continuation_token = list_objects.next_continuation_token; } let mut file_states = self.file_states.write().await; @@ -146,7 +169,7 @@ impl MinIOHandler { pub async fn get_file_state(&self, path: &str) -> Option { let states = self.file_states.read().await; - states.get(path).cloned() + states.get(&path.to_string()).cloned() } pub async fn clear_state(&self) { diff --git a/src/kb/mod.rs b/src/kb/mod.rs index 084236bb..771a1fdc 100644 --- a/src/kb/mod.rs +++ b/src/kb/mod.rs @@ -1,7 +1,8 @@ use crate::shared::models::KBCollection; use crate::shared::state::AppState; use log::{ error, info, warn}; -use tokio_stream::StreamExt; +// Removed unused import +// Removed duplicate import since we're using the module directly use std::collections::HashMap; use std::error::Error; use std::sync::Arc; @@ -95,35 +96,16 @@ impl KBManager { &self, collection: &KBCollection, ) -> Result<(), Box> { - let op = match &self.state.s3_operator { - Some(op) => op, + let _client = match &self.state.s3_client { + Some(client) => client, None => { - warn!("S3 operator not configured"); + warn!("S3 client not configured"); return Ok(()); } }; - let mut lister = op.lister_with(&collection.folder_path).recursive(true).await?; - while let Some(entry) = lister.try_next().await? { - let path = entry.path().to_string(); - - if path.ends_with('/') { - continue; - } - - let meta = op.stat(&path).await?; - if let Err(e) = self - .process_file( - &collection, - &path, - meta.content_length() as i64, - meta.last_modified().map(|dt| dt.to_rfc3339()), - ) - .await - { - error!("Error processing file {}: {}", path, e); - } - } + let minio_handler = minio_handler::MinIOHandler::new(self.state.clone()); + minio_handler.watch_prefix(collection.folder_path.clone()).await; Ok(()) } @@ -135,7 +117,8 @@ impl KBManager { file_size: i64, _last_modified: Option, ) -> Result<(), Box> { - let content = self.get_file_content(file_path).await?; + let client = self.state.s3_client.as_ref().ok_or("S3 client not configured")?; + let content = minio_handler::get_file_content(client, &self.state.bucket_name, file_path).await?; let file_hash = if content.len() > 100 { format!( "{:x}_{:x}_{}", @@ -183,20 +166,6 @@ impl KBManager { Ok(()) } - async fn get_file_content( - &self, - file_path: &str, - ) -> Result, Box> { - let op = self - .state - .s3_operator - .as_ref() - .ok_or("S3 operator not configured")?; - - let content = op.read(file_path).await?; - Ok(content.to_vec()) - } - async fn extract_text( &self, file_path: &str, diff --git a/src/main.rs b/src/main.rs index fddf2d36..a16bf34d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -36,6 +36,7 @@ mod tools; mod web_automation; mod web_server; mod whatsapp; +mod create_bucket; use crate::auth::auth_handler; use crate::automation::AutomationService; @@ -43,7 +44,6 @@ use crate::bootstrap::BootstrapManager; use crate::bot::{start_session, websocket_handler}; use crate::channels::{VoiceAdapter, WebChannelAdapter}; use crate::config::AppConfig; -use crate::drive_monitor::DriveMonitor; #[cfg(feature = "email")] use crate::email::{ get_emails, get_latest_email_from, list_emails, save_click, save_draft, send_email, @@ -59,10 +59,17 @@ use crate::shared::state::AppState; use crate::web_server::{bot_index, index, static_files}; use crate::whatsapp::whatsapp_webhook_verify; use crate::whatsapp::WhatsAppAdapter; +use crate::bot::BotOrchestrator; #[cfg(not(feature = "desktop"))] #[tokio::main] async fn main() -> std::io::Result<()> { + // Test bucket creation + match create_bucket::create_bucket("test-bucket") { + Ok(_) => println!("Bucket created successfully"), + Err(e) => eprintln!("Failed to create bucket: {}", e), + } + let args: Vec = std::env::args().collect(); if args.len() > 1 { let command = &args[1]; @@ -89,6 +96,7 @@ async fn main() -> std::io::Result<()> { } } + // Rest of the original main function remains unchanged... dotenv().ok(); env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")) .write_style(env_logger::WriteStyle::Always) @@ -106,7 +114,7 @@ async fn main() -> std::io::Result<()> { None }; - let mut bootstrap = BootstrapManager::new(install_mode.clone(), tenant.clone()); + let mut bootstrap = BootstrapManager::new(install_mode.clone(), tenant.clone()).await; // Prevent double bootstrap: skip if environment already initialized let env_path = std::env::current_dir()?.join("botserver-stack").join(".env"); @@ -120,7 +128,7 @@ async fn main() -> std::io::Result<()> { Err(_) => AppConfig::from_env(), } } else { - match bootstrap.bootstrap() { + match bootstrap.bootstrap().await { Ok(config) => { info!("Bootstrap completed successfully"); config @@ -138,9 +146,13 @@ async fn main() -> std::io::Result<()> { } }; - - let _ = bootstrap.start_all(); - if let Err(e) = bootstrap.upload_templates_to_drive(&cfg).await { + // Start all services (synchronous) + if let Err(e) = bootstrap.start_all() { + log::warn!("Failed to start all services: {}", e); + } + + // Upload templates (asynchronous) + if let Err(e) = futures::executor::block_on(bootstrap.upload_templates_to_drive(&cfg)) { log::warn!("Failed to upload templates to MinIO: {}", e); } @@ -193,7 +205,6 @@ async fn main() -> std::io::Result<()> { )); let tool_api = Arc::new(tools::ToolApi::new()); - let drive = init_drive(&config.drive) .await .expect("Failed to initialize Drive"); @@ -209,9 +220,10 @@ async fn main() -> std::io::Result<()> { ))); let app_state = Arc::new(AppState { - s3_operator: Some(drive.clone()), + s3_client: Some(drive), config: Some(cfg.clone()), conn: db_pool.clone(), + bucket_name: "default.gbai".to_string(), // Default bucket name custom_conn: db_custom_pool.clone(), redis_client: redis_client.clone(), session_manager: session_manager.clone(), @@ -246,19 +258,21 @@ async fn main() -> std::io::Result<()> { .expect("Failed to create runtime for automation"); let local = tokio::task::LocalSet::new(); local.block_on(&rt, async move { - let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string()); - let scripts_dir = format!("work/{}.gbai/.gbdialog", bot_guid); + let scripts_dir = "work/default.gbai/.gbdialog".to_string(); let automation = AutomationService::new(automation_state, &scripts_dir); automation.spawn().await.ok(); }); }); - let drive_state = app_state.clone(); - let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string()); - let bucket_name = format!("{}{}.gbai", cfg.drive.org_prefix, bot_guid); - let drive_monitor = Arc::new(DriveMonitor::new(drive_state, bucket_name)); - let _drive_handle = drive_monitor.spawn(); + // Initialize bot orchestrator and mount all bots + let bot_orchestrator = BotOrchestrator::new(app_state.clone()); + + // Mount all active bots from database + if let Err(e) = bot_orchestrator.mount_all_bots().await { + log::error!("Failed to mount bots: {}", e); + } + HttpServer::new(move || { let cors = Cors::default() .allow_any_origin() @@ -271,25 +285,21 @@ async fn main() -> std::io::Result<()> { .wrap(cors) .wrap(Logger::default()) .wrap(Logger::new("HTTP REQUEST: %a %{User-Agent}i")) - .app_data(web::Data::from(app_state_clone)); - - app = app - .service(upload_file) - .service(index) - .service(static_files) - .service(websocket_handler) + .app_data(web::Data::from(app_state_clone)) .service(auth_handler) - .service(whatsapp_webhook_verify) + .service(chat_completions_local) + .service(create_session) + .service(embeddings_local) + .service(get_session_history) + .service(get_sessions) + .service(index) + .service(start_session) + .service(upload_file) .service(voice_start) .service(voice_stop) - .service(create_session) - .service(get_sessions) - .service(start_session) - .service(get_session_history) - .service(chat_completions_local) - .service(embeddings_local) - .service(bot_index); // Must be last - catches all remaining paths - + .service(whatsapp_webhook_verify) + .service(websocket_handler); + #[cfg(feature = "email")] { app = app @@ -299,9 +309,12 @@ async fn main() -> std::io::Result<()> { .service(send_email) .service(save_draft) .service(save_click); - } + } + app = app.service(static_files); + app = app.service(bot_index); app + }) .workers(worker_count) .bind((config.server.host.clone(), config.server.port))? diff --git a/src/package_manager/installer.rs b/src/package_manager/installer.rs index 8ee31cf4..f52dbefe 100644 --- a/src/package_manager/installer.rs +++ b/src/package_manager/installer.rs @@ -89,17 +89,9 @@ impl PackageManager { ), binary_name: Some("minio".to_string()), pre_install_cmds_linux: vec![], - post_install_cmds_linux: vec![ - "wget https://dl.min.io/client/mc/release/linux-amd64/mc -O {{BIN_PATH}}/mc" - .to_string(), - "chmod +x {{BIN_PATH}}/mc".to_string(), - ], + post_install_cmds_linux: vec![], pre_install_cmds_macos: vec![], - post_install_cmds_macos: vec![ - "wget https://dl.min.io/client/mc/release/darwin-amd64/mc -O {{BIN_PATH}}/mc" - .to_string(), - "chmod +x {{BIN_PATH}}/mc".to_string(), - ], + post_install_cmds_macos: vec![], pre_install_cmds_windows: vec![], post_install_cmds_windows: vec![], env_vars: HashMap::from([ @@ -107,7 +99,7 @@ impl PackageManager { ("DRIVE_ROOT_PASSWORD".to_string(), drive_password.clone()), ]), data_download_list: Vec::new(), - exec_cmd: "nohup {{BIN_PATH}}/minio server {{DATA_PATH}} --address :9000 --console-address :9001 > {{LOGS_PATH}}/minio.log 2>&1 & sleep 5 && {{BIN_PATH}}/mc alias set drive http://localhost:9000 minioadmin minioadmin && {{BIN_PATH}}/mc admin user add drive $DRIVE_ROOT_USER $DRIVE_ROOT_PASSWORD && {{BIN_PATH}}/mc admin policy attach drive readwrite --user $DRIVE_ROOT_USER && {{BIN_PATH}}/mc mb drive/default.gbai || true".to_string(), + exec_cmd: "nohup {{BIN_PATH}}/minio server {{DATA_PATH}} --address :9000 --console-address :9001 > {{LOGS_PATH}}/minio.log 2>&1 &".to_string(), }, ); diff --git a/src/session/mod.rs b/src/session/mod.rs index ee291265..61c99e53 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -395,7 +395,7 @@ async fn create_session(data: web::Data) -> Result { #[actix_web::get("/api/sessions")] async fn get_sessions(data: web::Data) -> Result { let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(); - let orchestrator = BotOrchestrator::new(Arc::clone(&data)); + let orchestrator = BotOrchestrator::new(Arc::new(data.get_ref().clone())); match orchestrator.get_user_sessions(user_id).await { Ok(sessions) => Ok(HttpResponse::Ok().json(sessions)), Err(e) => { @@ -416,7 +416,7 @@ async fn get_session_history( match Uuid::parse_str(&session_id) { Ok(session_uuid) => { - let orchestrator = BotOrchestrator::new(Arc::clone(&data)); + let orchestrator = BotOrchestrator::new(Arc::new(data.get_ref().clone())); match orchestrator .get_conversation_history(session_uuid, user_id) .await diff --git a/src/shared/state.rs b/src/shared/state.rs index fe39d97f..5ffadc84 100644 --- a/src/shared/state.rs +++ b/src/shared/state.rs @@ -6,8 +6,8 @@ use crate::session::SessionManager; use crate::tools::{ToolApi, ToolManager}; use crate::whatsapp::WhatsAppAdapter; use diesel::{Connection, PgConnection}; -use opendal::Operator; -use redis::Client; +use aws_sdk_s3::Client as S3Client; +use redis::Client as RedisClient; use std::collections::HashMap; use std::sync::Arc; use std::sync::Mutex; @@ -15,11 +15,12 @@ use tokio::sync::mpsc; use crate::shared::models::BotResponse; pub struct AppState { - pub s3_operator: Option, + pub s3_client: Option, + pub bucket_name: String, pub config: Option, pub conn: Arc>, pub custom_conn: Arc>, - pub redis_client: Option>, + pub redis_client: Option>, pub session_manager: Arc>, pub tool_manager: Arc, pub llm_provider: Arc, @@ -35,7 +36,8 @@ pub struct AppState { impl Clone for AppState { fn clone(&self) -> Self { Self { - s3_operator: self.s3_operator.clone(), + s3_client: self.s3_client.clone(), + bucket_name: self.bucket_name.clone(), config: self.config.clone(), conn: Arc::clone(&self.conn), custom_conn: Arc::clone(&self.custom_conn), @@ -57,7 +59,8 @@ impl Clone for AppState { impl Default for AppState { fn default() -> Self { Self { - s3_operator: None, + s3_client: None, + bucket_name: "default.gbai".to_string(), config: None, conn: Arc::new(Mutex::new( diesel::PgConnection::establish("postgres://localhost/test").unwrap(), diff --git a/src/tests/integration_file_upload_test.rs b/src/tests/integration_file_upload_test.rs index 868b739f..56af740f 100644 --- a/src/tests/integration_file_upload_test.rs +++ b/src/tests/integration_file_upload_test.rs @@ -13,6 +13,7 @@ use std::io::Write; use std::str::FromStr; use tempfile::NamedTempFile; + #[tokio::test] async fn test_successful_file_upload() -> Result<()> { // Setup test environment and MinIO client diff --git a/src/web_server/mod.rs b/src/web_server/mod.rs index bcbebf37..a9a9fedf 100644 --- a/src/web_server/mod.rs +++ b/src/web_server/mod.rs @@ -26,7 +26,7 @@ async fn bot_index(req: HttpRequest) -> Result { } } -#[actix_web::get("/{filename:.*}")] +#[actix_web::get("/static/{filename:.*}")] async fn static_files(req: HttpRequest) -> Result { let filename = req.match_info().query("filename"); let path = format!("web/html/{}", filename);