cellmap_flow.utils.bsub_utils

Utilities for job submission and management across different execution environments.

Supports: - LSF (bsub) cluster jobs - Local process execution - Extensible to cloud providers and other cluster types

Attributes

logger

DEFAULT_SECURITY

DEFAULT_QUEUE

DEFAULT_CHARGE_GROUP

SERVER_COMMAND

Classes

JobStatus

Enumeration of possible job statuses.

Job

Abstract base class for jobs across different execution environments.

LocalJob

Job running as a local subprocess.

LSFJob

Job submitted to LSF cluster via bsub.

Functions

extract_host_from_output(→ Optional[str])

Extract host/URL from command output using configured patterns.

cleanup_handler(→ None)

Signal handler for graceful shutdown.

is_bsub_available(→ bool)

Check if bsub command is available in the system PATH.

submit_bsub_job(→ LSFJob)

Submit a job to LSF cluster using bsub.

run_locally(→ LocalJob)

Run command locally as a subprocess (fallback when bsub unavailable).

start_hosts(→ Job)

Start a server job either via bsub or locally.

Module Contents

cellmap_flow.utils.bsub_utils.logger
cellmap_flow.utils.bsub_utils.DEFAULT_SECURITY = 'http'
cellmap_flow.utils.bsub_utils.DEFAULT_QUEUE = 'gpu_h100'
cellmap_flow.utils.bsub_utils.DEFAULT_CHARGE_GROUP = 'cellmap'
cellmap_flow.utils.bsub_utils.SERVER_COMMAND = 'cellmap_flow_server'
class cellmap_flow.utils.bsub_utils.JobStatus(*args, **kwds)

Enumeration of possible job statuses.

PENDING = 'pending'
RUNNING = 'running'
COMPLETED = 'completed'
FAILED = 'failed'
KILLED = 'killed'
class cellmap_flow.utils.bsub_utils.Job(model_name: str | None = None)

Abstract base class for jobs across different execution environments.

Subclasses should implement: - kill(): Terminate the job - get_status(): Get current job status - wait_for_host(): Wait for and extract host information

model_name = None
status
host: str | None = None
abstractmethod kill() None

Terminate the job.

abstractmethod get_status() JobStatus

Get the current status of the job.

abstractmethod wait_for_host(timeout: int = 300) str | None

Wait for the job to provide host information.

Parameters:

timeout – Maximum time to wait in seconds

Returns:

Host URL if found, None otherwise

is_running() bool

Check if the job is currently running.

class cellmap_flow.utils.bsub_utils.LocalJob(process: subprocess.Popen, model_name: str | None = None)

Job running as a local subprocess.

process
kill() None

Terminate the local process.

get_status() JobStatus

Get current status by checking process state.

wait_for_host(timeout: int = 60) str | None

Monitor process output for host information.

Parameters:

timeout – Maximum time to wait in seconds

Returns:

Host URL if found, None otherwise

class cellmap_flow.utils.bsub_utils.LSFJob(job_id: str, model_name: str | None = None)

Job submitted to LSF cluster via bsub.

job_id
kill() None

Terminate the LSF job using bkill.

get_status() JobStatus

Query LSF for job status using bjobs.

wait_for_host(timeout: int = 300) str | None

Monitor LSF job output using bpeek to extract host information.

Parameters:

timeout – Maximum time to wait in seconds

Returns:

Host URL if found, None otherwise

cellmap_flow.utils.bsub_utils.extract_host_from_output(output: str) str | None

Extract host/URL from command output using configured patterns.

Parameters:

output – String output to search

Returns:

Host URL if found, None otherwise

cellmap_flow.utils.bsub_utils.cleanup_handler(signum: int, frame) None

Signal handler for graceful shutdown. Kills all tracked jobs before exiting.

cellmap_flow.utils.bsub_utils.is_bsub_available() bool

Check if bsub command is available in the system PATH.

cellmap_flow.utils.bsub_utils.submit_bsub_job(command: str, queue: str = DEFAULT_QUEUE, charge_group: str | None = None, job_name: str = 'my_job', num_gpus: int = 1, num_cpus: int = 4) LSFJob

Submit a job to LSF cluster using bsub.

Parameters:
  • command – Shell command to execute

  • queue – LSF queue name

  • charge_group – Project/chargeback group for billing

  • job_name – Name for the job

  • num_gpus – Number of GPUs to request

  • num_cpus – Number of CPUs to request

Returns:

LSFJob object for the submitted job

Raises:

subprocess.CalledProcessError – If job submission fails

cellmap_flow.utils.bsub_utils.run_locally(command: str, name: str) LocalJob

Run command locally as a subprocess (fallback when bsub unavailable).

Parameters:
  • command – Shell command to execute

  • name – Job name for tracking

Returns:

LocalJob object with process information

cellmap_flow.utils.bsub_utils.start_hosts(command: str, queue: str = DEFAULT_QUEUE, charge_group: str | None = None, job_name: str = 'example_job', use_https: bool = False, wait_for_host: bool = True) Job

Start a server job either via bsub or locally.

Parameters:
  • command – Command to execute

  • queue – LSF queue name (for bsub)

  • charge_group – Project for billing (for bsub)

  • job_name – Name for the job

  • use_https – Whether to use HTTPS (adds cert/key flags)

  • wait_for_host – Whether to wait for host information before returning

Returns:

Job object (LSFJob or LocalJob) with job information