Source code for glance.task_cancellation_tracker

# Copyright 2025 RedHat Inc.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.
import os
import time

from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging

from glance.common import exception


LOG = logging.getLogger(__name__)
CONF = cfg.CONF


[docs] def get_data_dir(): """Return the filesystem store data directory from config.""" if CONF.enabled_backends: return CONF.os_glance_tasks_store.filesystem_store_datadir else: # NOTE(abhishekk): strip the 'file://' prefix from the URI return CONF.node_staging_uri[7:]
[docs] def path_for_op(operation_id, prefix='running-task-'): """Construct the file path for a given operation ID.""" return os.path.join(get_data_dir(), "%s%s" % (prefix, operation_id))
[docs] def is_canceled(operation_id): """ Check if the operation has been canceled (file exists and is nonzero length). """ operation_path = path_for_op(operation_id) return os.path.exists(operation_path) and os.path.getsize( operation_path) > 0
[docs] def register_operation(operation_id): """Register a new operation by creating a lock file.""" with lockutils.external_lock('tasks'): operation_path = path_for_op(operation_id) try: # Use os.open with O_CREAT | O_EXCL to ensure atomic creation fd = os.open(operation_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) os.close(fd) except FileExistsError: # Handle the case where the lock file already exists raise RuntimeError(f"Operation {operation_id} is " f"already registered.")
[docs] def cancel_operation(operation_id): """ Mark an operation as canceled by writing to the lock file if it exists. """ with lockutils.external_lock('tasks'): operation_path = path_for_op(operation_id) if not os.path.exists(operation_path): raise exception.ServerError( "Operation file for %s does not exist, cannot cancel.", operation_id) with open(operation_path, 'w') as f: f.write(str(operation_id)) # Wait for the system to acknowledge the cancellation for _ in range(60): if os.path.exists(operation_path): time.sleep(0.5) continue return # If still not canceled after timeout, raise an exception raise exception.ServerError("Timeout canceling in-progress " "task %s" % operation_id)
[docs] def signal_finished(operation_id): """Remove the lock file to signal that the operation is canceled.""" with lockutils.external_lock('tasks'): operation_path = path_for_op(operation_id) try: os.remove(operation_path) except FileNotFoundError: LOG.warning("Attempted to signal finished for operation %s, " "but the operation file does not " "exist.", operation_path)