feat(automation): improve cron matching and job locking
- Refactor cron matching to use individual variables for each time component with additional debug logging - Replace SETEX with atomic SET NX EX for job locking in Redis - Add better error handling and logging for job execution tracking - Skip execution if Redis is unavailable or job is already held - Add verbose flag to LLM server startup command for better logging
This commit is contained in:
parent
4c279d2a19
commit
53b49ba616
4 changed files with 51 additions and 28 deletions
|
|
@ -245,15 +245,26 @@ impl AutomationService {
|
||||||
let day = dt.day() as i32;
|
let day = dt.day() as i32;
|
||||||
let month = dt.month() as i32;
|
let month = dt.month() as i32;
|
||||||
let weekday = dt.weekday().num_days_from_monday() as i32;
|
let weekday = dt.weekday().num_days_from_monday() as i32;
|
||||||
let match_result = [minute, hour, day, month, weekday]
|
|
||||||
.iter()
|
// More strict matching with additional logging
|
||||||
.enumerate()
|
let minute_match = Self::cron_part_matches(parts[0], minute);
|
||||||
.all(|(i, &val)| Self::cron_part_matches(parts[i], val));
|
let hour_match = Self::cron_part_matches(parts[1], hour);
|
||||||
|
let day_match = Self::cron_part_matches(parts[2], day);
|
||||||
|
let month_match = Self::cron_part_matches(parts[3], month);
|
||||||
|
let weekday_match = Self::cron_part_matches(parts[4], weekday);
|
||||||
|
|
||||||
|
let match_result = minute_match && hour_match && day_match && month_match && weekday_match;
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
"Cron pattern='{}' result={} at {}",
|
"Cron pattern='{}' result={} at {} (minute={}, hour={}, day={}, month={}, weekday={})",
|
||||||
pattern,
|
pattern,
|
||||||
match_result,
|
match_result,
|
||||||
dt
|
dt,
|
||||||
|
minute_match,
|
||||||
|
hour_match,
|
||||||
|
day_match,
|
||||||
|
month_match,
|
||||||
|
weekday_match
|
||||||
);
|
);
|
||||||
match_result
|
match_result
|
||||||
}
|
}
|
||||||
|
|
@ -288,30 +299,42 @@ impl AutomationService {
|
||||||
match redis_client.get_multiplexed_async_connection().await {
|
match redis_client.get_multiplexed_async_connection().await {
|
||||||
Ok(mut conn) => {
|
Ok(mut conn) => {
|
||||||
trace!("Connected to Redis; checking if job '{}' is running", param);
|
trace!("Connected to Redis; checking if job '{}' is running", param);
|
||||||
let is_running: Result<bool, redis::RedisError> = redis::cmd("EXISTS")
|
|
||||||
|
// Use SET with NX (only set if not exists) and EX (expire) for atomic operation
|
||||||
|
let set_result: Result<String, redis::RedisError> = redis::cmd("SET")
|
||||||
.arg(&redis_key)
|
.arg(&redis_key)
|
||||||
|
.arg("1")
|
||||||
|
.arg("NX")
|
||||||
|
.arg("EX")
|
||||||
|
.arg(300)
|
||||||
.query_async(&mut conn)
|
.query_async(&mut conn)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
if let Ok(true) = is_running {
|
match set_result {
|
||||||
|
Ok(res) if res == "OK" => {
|
||||||
|
trace!("Acquired lock for job '{}'", param);
|
||||||
|
}
|
||||||
|
Ok(_) => {
|
||||||
warn!(
|
warn!(
|
||||||
"Job '{}' is already running for bot '{}'; skipping execution",
|
"Job '{}' is already running for bot '{}'; skipping execution",
|
||||||
param, bot_id
|
param, bot_id
|
||||||
);
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Redis error checking job status for '{}': {}", param, e);
|
||||||
|
return Ok(()); // Skip execution if we can't verify lock status
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let _: Result<(), redis::RedisError> = redis::cmd("SETEX")
|
|
||||||
.arg(&redis_key)
|
|
||||||
.arg(300)
|
|
||||||
.arg("1")
|
|
||||||
.query_async(&mut conn)
|
|
||||||
.await;
|
|
||||||
trace!("Job '{}' marked as running in Redis", param);
|
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Failed to connect to Redis for job tracking: {}", e);
|
warn!("Failed to connect to Redis for job tracking: {}", e);
|
||||||
|
return Ok(()); // Skip execution if we can't connect to Redis
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
warn!("Redis client not available for job tracking");
|
||||||
|
return Ok(()); // Skip execution if Redis isn't configured
|
||||||
}
|
}
|
||||||
|
|
||||||
let bot_name: String = {
|
let bot_name: String = {
|
||||||
|
|
|
||||||
|
|
@ -226,14 +226,14 @@ pub async fn start_llm_server(
|
||||||
if cfg!(windows) {
|
if cfg!(windows) {
|
||||||
let mut cmd = tokio::process::Command::new("cmd");
|
let mut cmd = tokio::process::Command::new("cmd");
|
||||||
cmd.arg("/C").arg(format!(
|
cmd.arg("/C").arg(format!(
|
||||||
"cd {} && .\\llama-server.exe {} >../../../../logs/llm/stdout.log",
|
"cd {} && .\\llama-server.exe {} --verbose>../../../../logs/llm/stdout.log",
|
||||||
llama_cpp_path, args
|
llama_cpp_path, args
|
||||||
));
|
));
|
||||||
cmd.spawn()?;
|
cmd.spawn()?;
|
||||||
} else {
|
} else {
|
||||||
let mut cmd = tokio::process::Command::new("sh");
|
let mut cmd = tokio::process::Command::new("sh");
|
||||||
cmd.arg("-c").arg(format!(
|
cmd.arg("-c").arg(format!(
|
||||||
"cd {} && ./llama-server {} >../../../../logs/llm/stdout.log 2>&1 &",
|
"cd {} && ./llama-server {} --verbose >../../../../logs/llm/stdout.log 2>&1 &",
|
||||||
llama_cpp_path, args
|
llama_cpp_path, args
|
||||||
));
|
));
|
||||||
cmd.spawn()?;
|
cmd.spawn()?;
|
||||||
|
|
@ -252,14 +252,14 @@ pub async fn start_embedding_server(
|
||||||
if cfg!(windows) {
|
if cfg!(windows) {
|
||||||
let mut cmd = tokio::process::Command::new("cmd");
|
let mut cmd = tokio::process::Command::new("cmd");
|
||||||
cmd.arg("/c").arg(format!(
|
cmd.arg("/c").arg(format!(
|
||||||
"cd {} && .\\llama-server.exe -m {} --host 0.0.0.0 --port {} --embedding --n-gpu-layers 99 >../../../../logs/llm/stdout.log",
|
"cd {} && .\\llama-server.exe -m {} --verbose --host 0.0.0.0 --port {} --embedding --n-gpu-layers 99 >../../../../logs/llm/stdout.log",
|
||||||
llama_cpp_path, model_path, port
|
llama_cpp_path, model_path, port
|
||||||
));
|
));
|
||||||
cmd.spawn()?;
|
cmd.spawn()?;
|
||||||
} else {
|
} else {
|
||||||
let mut cmd = tokio::process::Command::new("sh");
|
let mut cmd = tokio::process::Command::new("sh");
|
||||||
cmd.arg("-c").arg(format!(
|
cmd.arg("-c").arg(format!(
|
||||||
"cd {} && ./llama-server -m {} --host 0.0.0.0 --port {} --embedding --n-gpu-layers 99 >../../../../logs/llm/stdout.log 2>&1 &",
|
"cd {} && ./llama-server -m {} --verbose --host 0.0.0.0 --port {} --embedding --n-gpu-layers 99 >../../../../logs/llm/stdout.log 2>&1 &",
|
||||||
llama_cpp_path, model_path, port
|
llama_cpp_path, model_path, port
|
||||||
));
|
));
|
||||||
cmd.spawn()?;
|
cmd.spawn()?;
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
SET_SCHEDULE "37 * * * *"
|
SET_SCHEDULE "37 * * * *"
|
||||||
|
|
||||||
let text = GET "announcements.gbkb/news/news.pdf"
|
let text = GET "announcements.gbkb/news/news.pdf"
|
||||||
let resume = LLM "In a short phrase, resume this: " + text
|
let resume = LLM "In a few words, resume this: " + text
|
||||||
|
|
||||||
SET_BOT_MEMORY "resume", resume
|
SET_BOT_MEMORY "resume", resume
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -17,8 +17,8 @@ llm-server-host,0.0.0.0
|
||||||
llm-server-port,8081
|
llm-server-port,8081
|
||||||
llm-server-gpu-layers,0
|
llm-server-gpu-layers,0
|
||||||
llm-server-n-moe,0
|
llm-server-n-moe,0
|
||||||
llm-server-ctx-size,2048
|
llm-server-ctx-size,512
|
||||||
llm-server-parallel,2
|
llm-server-parallel,6
|
||||||
llm-server-cont-batching,true
|
llm-server-cont-batching,true
|
||||||
llm-server-mlock,false
|
llm-server-mlock,false
|
||||||
llm-server-no-mmap,false
|
llm-server-no-mmap,false
|
||||||
|
|
|
||||||
|
Can't render this file because it has a wrong number of fields in line 25.
|
Loading…
Add table
Reference in a new issue