From 5e7507588541e4daecfe21e9e8a77cd464671a03 Mon Sep 17 00:00:00 2001 From: maybleMyers Date: Thu, 4 Dec 2025 02:49:43 -0800 Subject: [PATCH] progress --- javascript/progressbar.js | 97 +++++++++++++++------ modules/api/api.py | 4 +- modules/progress.py | 176 +++++++++++++++++++++++--------------- 3 files changed, 182 insertions(+), 95 deletions(-) diff --git a/javascript/progressbar.js b/javascript/progressbar.js index 3bb9a300..a8b67a09 100644 --- a/javascript/progressbar.js +++ b/javascript/progressbar.js @@ -1,12 +1,7 @@ -// code related to showing and updating progressbar shown as the image is being made +// Progress bar with queue tracking and reconnection support -function rememberGallerySelection() { - -} - -function getGallerySelectedIndex() { - -} +function rememberGallerySelection() {} +function getGallerySelectedIndex() {} function request(url, data, handler, errorHandler) { var xhr = new XMLHttpRequest(); @@ -69,14 +64,18 @@ function randomId() { return "task(" + Math.random().toString(36).slice(2, 7) + Math.random().toString(36).slice(2, 7) + Math.random().toString(36).slice(2, 7) + ")"; } -// starts sending progress requests to "/internal/progress" uri, creating progressbar above progressbarContainer element and -// preview inside gallery element. Cleans up all created stuff when the task is over and calls atEnd. -// calls onProgress every time there is a progress update -function requestProgress(id_task, progressbarContainer, gallery, atEnd, onProgress, inactivityTimeout = 6000) { +// Progress request with reconnection support +// inactivityTimeout: time in ms before giving up on a task that was never queued/active +// Setting to 0 disables the timeout (for restore operations) +function requestProgress(id_task, progressbarContainer, gallery, atEnd, onProgress, inactivityTimeout = 60000) { var dateStart = new Date(); var wasEverActive = false; + var wasEverQueued = false; var parentProgressbar = progressbarContainer.parentNode; var wakeLock = null; + var isDestroyed = false; + var reconnectAttempts = 0; + var maxReconnectAttempts = 10; var requestWakeLock = async function() { if (!opts.prevent_screen_sleep_during_generation || wakeLock !== null) return; @@ -110,20 +109,51 @@ function requestProgress(id_task, progressbarContainer, gallery, atEnd, onProgre var livePreview = null; var removeProgressBar = function() { + isDestroyed = true; releaseWakeLock(); if (!divProgress) return; setTitle(""); - parentProgressbar.removeChild(divProgress); - if (gallery && livePreview) gallery.removeChild(livePreview); + try { + parentProgressbar.removeChild(divProgress); + } catch (e) {} + if (gallery && livePreview) { + try { + gallery.removeChild(livePreview); + } catch (e) {} + } atEnd(); divProgress = null; }; + var handleError = function() { + reconnectAttempts++; + if (reconnectAttempts >= maxReconnectAttempts) { + console.error('Max reconnect attempts reached, giving up'); + removeProgressBar(); + return; + } + // Exponential backoff: 1s, 2s, 3s, 4s, 5s (capped) + var delay = Math.min(reconnectAttempts, 5) * 1000; + console.log('Connection error, reconnecting in', delay, 'ms (attempt', reconnectAttempts + ')'); + setTimeout(function() { + if (!isDestroyed) { + funProgress(id_task); + } + }, delay); + }; + var funProgress = function(id_task) { + if (isDestroyed) return; + requestWakeLock(); request("./internal/progress", {id_task: id_task, live_preview: false}, function(res) { + if (isDestroyed) return; + + // Reset reconnect counter on success + reconnectAttempts = 0; + if (res.completed) { removeProgressBar(); return; @@ -152,14 +182,24 @@ function requestProgress(id_task, progressbarContainer, gallery, atEnd, onProgre var elapsedFromStart = (new Date() - dateStart) / 1000; - if (res.active) wasEverActive = true; + // Track if we've ever been in the queue or active + if (res.active) { + wasEverActive = true; + wasEverQueued = true; + } + if (res.queued) { + wasEverQueued = true; + } + // Task finished (was active, now isn't) if (!res.active && wasEverActive) { removeProgressBar(); return; } - if (elapsedFromStart > inactivityTimeout && !res.queued && !res.active) { + // Timeout check - only if we've never been queued/active and timeout is enabled + if (inactivityTimeout > 0 && elapsedFromStart * 1000 > inactivityTimeout && !res.queued && !res.active && !wasEverQueued) { + console.log('Inactivity timeout reached for task', id_task); removeProgressBar(); return; } @@ -169,22 +209,23 @@ function requestProgress(id_task, progressbarContainer, gallery, atEnd, onProgre } setTimeout(() => { - funProgress(id_task); // CORRECTED: Only pass id_task + funProgress(id_task); }, opts.live_preview_refresh_period || 500); - }, function() { - removeProgressBar(); - }); + }, handleError); }; var funLivePreview = function(id_task, id_live_preview) { - request("./internal/progress", {id_task: id_task, id_live_preview: id_live_preview}, function(res) { - if (!divProgress) { + if (isDestroyed) return; + + request("./internal/progress", {id_task: id_task, id_live_preview: id_live_preview, live_preview: true}, function(res) { + if (!divProgress || isDestroyed) { return; } if (res.live_preview && gallery) { var img = new Image(); img.onload = function() { + if (isDestroyed) return; if (!livePreview) { livePreview = document.createElement('div'); livePreview.className = 'livePreview'; @@ -200,17 +241,21 @@ function requestProgress(id_task, progressbarContainer, gallery, atEnd, onProgre } setTimeout(() => { - funLivePreview(id_task, res.id_live_preview); + funLivePreview(id_task, res.id_live_preview || id_live_preview); }, opts.live_preview_refresh_period || 500); }, function() { - removeProgressBar(); + // Don't give up on live preview errors, just retry + if (!isDestroyed) { + setTimeout(() => { + funLivePreview(id_task, id_live_preview); + }, 2000); + } }); }; - funProgress(id_task, 0); + funProgress(id_task); if (gallery) { funLivePreview(id_task, 0); } - } diff --git a/modules/api/api.py b/modules/api/api.py index 9754be03..e6a06682 100644 --- a/modules/api/api.py +++ b/modules/api/api.py @@ -31,7 +31,7 @@ from typing import Any, Union, get_origin, get_args import piexif import piexif.helper from contextlib import closing -from modules.progress import create_task_id, add_task_to_queue, start_task, finish_task, current_task +from modules.progress import create_task_id, add_task_to_queue, start_task, finish_task, get_current_task def script_name_to_index(name, scripts): try: @@ -640,7 +640,7 @@ class Api: if shared.state.current_image and not req.skip_current_image: current_image = encode_pil_to_base64(shared.state.current_image) - return models.ProgressResponse(progress=progress, eta_relative=eta_relative, state=shared.state.dict(), current_image=current_image, textinfo=shared.state.textinfo, current_task=current_task) + return models.ProgressResponse(progress=progress, eta_relative=eta_relative, state=shared.state.dict(), current_image=current_image, textinfo=shared.state.textinfo, current_task=get_current_task()) def interrogateapi(self, interrogatereq: models.InterrogateRequest): image_b64 = interrogatereq.image diff --git a/modules/progress.py b/modules/progress.py index 349564fe..473e5249 100644 --- a/modules/progress.py +++ b/modules/progress.py @@ -1,5 +1,5 @@ -# --- START OF FILE progress.py (MODIFIED ORIGINAL) --- -from __future__ import annotations # Keep if your Python version needs it +# Progress tracking module - now backed by thread-safe QueueManager +from __future__ import annotations import base64 import io import time @@ -7,13 +7,16 @@ import time import gradio as gr from pydantic import BaseModel, Field -from modules.shared import opts # Assuming this import is correct for your env - -import modules.shared as shared # Assuming this import is correct +from modules.shared import opts +import modules.shared as shared from collections import OrderedDict import string import random -from typing import List, Optional # Optional is good practice, or use | None +from typing import List, Optional +import threading + +# Thread-safe lock for all queue operations +_queue_lock = threading.RLock() current_task = None pending_tasks = OrderedDict() @@ -21,39 +24,71 @@ finished_tasks = [] recorded_results = [] recorded_results_limit = 2 +# Sequence counter for deterministic ordering +_task_sequence = 0 + + +def _next_sequence(): + """Get next sequence number""" + global _task_sequence + _task_sequence += 1 + return _task_sequence + def start_task(id_task): global current_task - current_task = id_task - pending_tasks.pop(id_task, None) + with _queue_lock: + current_task = id_task + pending_tasks.pop(id_task, None) def finish_task(id_task): global current_task - if current_task == id_task: - current_task = None - finished_tasks.append(id_task) - if len(finished_tasks) > 16: - finished_tasks.pop(0) + with _queue_lock: + if current_task == id_task: + current_task = None + finished_tasks.append(id_task) + if len(finished_tasks) > 16: + finished_tasks.pop(0) + def create_task_id(task_type): N = 7 res = ''.join(random.choices(string.ascii_uppercase + string.digits, k=N)) return f"task({task_type}-{res})" + def record_results(id_task, res): - recorded_results.append((id_task, res)) - if len(recorded_results) > recorded_results_limit: - recorded_results.pop(0) + with _queue_lock: + recorded_results.append((id_task, res)) + if len(recorded_results) > recorded_results_limit: + recorded_results.pop(0) def add_task_to_queue(id_job): - pending_tasks[id_job] = time.time() + with _queue_lock: + # Use sequence number for ordering instead of timestamp + # This ensures deterministic ordering even for rapid submissions + pending_tasks[id_job] = _next_sequence() + + +def get_current_task(): + """Thread-safe getter for current_task""" + with _queue_lock: + return current_task + + +def get_pending_count(): + """Thread-safe getter for pending tasks count""" + with _queue_lock: + return len(pending_tasks) + class PendingTasksResponse(BaseModel): size: int = Field(title="Pending task size") tasks: List[str] = Field(title="Pending task ids") + class ProgressRequest(BaseModel): id_task: str = Field(default=None, title="Task ID", description="id of the task to get progress for") id_live_preview: int = Field(default=-1, title="Live preview image ID", description="id of last received last preview image") @@ -64,80 +99,86 @@ class ProgressResponse(BaseModel): active: bool = Field(title="Whether the task is being worked on right now") queued: bool = Field(title="Whether the task is in queue") completed: bool = Field(title="Whether the task has already finished") - # FIX: Use | None or Optional[] for fields that can be None progress: float | None = Field(default=None, title="Progress", description="The progress with a range of 0 to 1") eta: float | None = Field(default=None, title="ETA in secs") live_preview: str | None = Field(default=None, title="Live preview image", description="Current live preview; a data: uri") id_live_preview: int | None = Field(default=None, title="Live preview image ID", description="Send this together with next request to prevent receiving same image") textinfo: str | None = Field(default=None, title="Info text", description="Info text used by WebUI.") + queue_position: int | None = Field(default=None, title="Queue position") + queue_total: int | None = Field(default=None, title="Total queue size") def setup_progress_api(app): app.add_api_route("/internal/pending-tasks", get_pending_tasks, methods=["GET"]) - # Ensure response_model matches the actual return possibilities return app.add_api_route("/internal/progress", progressapi, methods=["POST"], response_model=ProgressResponse) def get_pending_tasks(): - pending_tasks_ids = list(pending_tasks) - pending_len = len(pending_tasks_ids) + with _queue_lock: + pending_tasks_ids = list(pending_tasks) + pending_len = len(pending_tasks_ids) return PendingTasksResponse(size=pending_len, tasks=pending_tasks_ids) def progressapi(req: ProgressRequest): - active = req.id_task == current_task - queued = req.id_task in pending_tasks - completed = req.id_task in finished_tasks - #print(f"PYTHON progressapi: Request for id_task='{req.id_task}'. Current pending_tasks: {list(pending_tasks.keys())}. Current current_task: {current_task}. Timestamp: {time.time()}") - active = req.id_task == current_task - queued = req.id_task in pending_tasks # Check membership *before* any potential modifications - completed = req.id_task in finished_tasks + # Take a consistent snapshot under lock + with _queue_lock: + snapshot_current = current_task + snapshot_pending = dict(pending_tasks) + snapshot_finished = list(finished_tasks) - # Initialize all potentially None fields for the response - # This makes it clearer what's being returned, especially for the non-active case + # Calculate status from snapshot (no lock needed) + active = req.id_task == snapshot_current + queued = req.id_task in snapshot_pending + completed = req.id_task in snapshot_finished + + # Initialize response fields current_progress = None current_eta = None current_live_preview = None - current_id_live_preview = req.id_live_preview # Start with requested id + current_id_live_preview = req.id_live_preview current_textinfo = None + queue_position = None + queue_total = None if not active: - current_textinfo = "In queue :>" if queued: - # Ensure req.id_task is actually in pending_tasks before calling index - if req.id_task in pending_tasks: - sorted_queued = sorted(pending_tasks.keys(), key=lambda x: pending_tasks[x]) - try: - queue_index = sorted_queued.index(req.id_task) - current_textinfo = "In queue: {}/{}".format(queue_index + 1, len(sorted_queued)) - except ValueError: - # Should not happen if req.id_task in pending_tasks, but good for robustness - current_textinfo = "In queue (error finding position)" - else: - # Task is not active, not in pending_tasks. Could be completed or unknown. - if completed: - current_textinfo = "Completed" - else: - current_textinfo = "Status unknown" # Or some other appropriate default + # Calculate queue position from snapshot + sorted_queued = sorted(snapshot_pending.keys(), key=lambda x: snapshot_pending[x]) + try: + queue_position = sorted_queued.index(req.id_task) + 1 + queue_total = len(sorted_queued) + current_textinfo = f"In queue: {queue_position}/{queue_total}" + except ValueError: + current_textinfo = "In queue" + elif completed: + current_textinfo = "Completed" + elif snapshot_current is not None: + # Task not found but there's an active task - show as queued behind it + # This handles reconnection from another window + queue_total = len(snapshot_pending) + 1 + queue_position = queue_total + current_textinfo = f"In queue: {queue_position}/{queue_total}" + queued = True + else: + current_textinfo = None - # When not active, we only send back active, queued, completed, id_live_preview and textinfo. - # The Pydantic model will use default=None for progress, eta, live_preview. - #print(f"DEBUG: Responding for task {req.id_task}: active={active}, queued={queued}, completed={completed}, textinfo='{current_textinfo}'") return ProgressResponse( active=active, queued=queued, completed=completed, - progress=None, # Explicitly None - eta=None, # Explicitly None - live_preview=None, # Explicitly None - id_live_preview= -1 if not shared.state.id_live_preview else shared.state.id_live_preview, # or req.id_live_preview - textinfo=current_textinfo + progress=None, + eta=None, + live_preview=None, + id_live_preview=-1 if not shared.state.id_live_preview else shared.state.id_live_preview, + textinfo=current_textinfo, + queue_position=queue_position, + queue_total=queue_total ) - # This part is ONLY reached if active is True - current_progress = 0.0 # Default to float if active + # Task is active - calculate progress + current_progress = 0.0 - # Ensure shared.state is initialized before accessing attributes if shared.state.job_count is not None and shared.state.job_no is not None: if shared.state.job_count > 0: current_progress += shared.state.job_no / shared.state.job_count @@ -153,14 +194,13 @@ def progressapi(req: ProgressRequest): predicted_duration = elapsed_since_start / current_progress current_eta = predicted_duration - elapsed_since_start else: - current_eta = None # Explicitly None if progress is 0 + current_eta = None else: - current_eta = None # Explicitly None if time_start is not set + current_eta = None - # current_live_preview remains None by default - # current_id_live_preview is already req.id_live_preview + # Live preview if opts is not None and opts.live_previews_enable and req.live_preview: - shared.state.set_current_image() # Make sure this method exists and is safe + shared.state.set_current_image() if shared.state.id_live_preview != req.id_live_preview: image = shared.state.current_image if image is not None: @@ -178,8 +218,8 @@ def progressapi(req: ProgressRequest): current_live_preview = f"data:image/{image_format};base64,{base64_image}" current_id_live_preview = shared.state.id_live_preview - current_textinfo = shared.state.textinfo # This could be None - #print(f"DEBUG: Responding for task {req.id_task}: active={active}, queued={queued}, completed={completed}, textinfo='{current_textinfo}'") + current_textinfo = shared.state.textinfo + return ProgressResponse( active=active, queued=queued, @@ -188,7 +228,9 @@ def progressapi(req: ProgressRequest): eta=current_eta, live_preview=current_live_preview, id_live_preview=current_id_live_preview, - textinfo=current_textinfo + textinfo=current_textinfo, + queue_position=None, + queue_total=None ) @@ -198,4 +240,4 @@ def restore_progress(id_task): res = next(iter([x[1] for x in recorded_results if id_task == x[0]]), None) if res is not None: return res - return gr.update(), gr.update(), gr.update(), f"Couldn't restore progress for {id_task}: results either have been discarded or never were obtained" \ No newline at end of file + return gr.update(), gr.update(), gr.update(), f"Couldn't restore progress for {id_task}: results either have been discarded or never were obtained"