Documentation AIME API Worker Interface
- class aime_api_worker_interface.APIWorkerInterface(api_server, job_type, auth_key, gpu_id=0, world_size=1, rank=0, gpu_name=None, image_metadata_params=['prompt', 'negative_prompt', 'seed', 'base_steps', 'refine_steps', 'scale', 'aesthetic_score', 'negative_aesthetic_score', 'img2img_strength', 'base_sampler', 'refine_sampler', 'base_discretization', 'refine_discretization'], print_server_status=True, request_timeout=60, worker_version=0)
Interface for deep learning models to communicate with AIME API Server.
- Parameters:
api_server (str) – Address of API Server. Example: ‘http://api.aime.team’.
job_type (str) – Type of job . Example: “stable_diffusion_xl_txt2img”.
auth_key (str) – key to authorize worker to connect with API Server.
gpu_id (int, optional) – ID of GPU the worker runs on. Defaults to 0.
world_size (int, optional) – Number of used GPUs the worker runs on. Defaults to 1.
rank (int, optional) – ID of current GPU if world_size > 1. Defaults to 0.
gpu_name (str, optional) – Name of GPU the worker runs on. Defaults to None.
progress_received_callback (callable, optional) – Callback function with http response as argument, called when API server sent response to send_progress(..). Defaults to None.
progress_error_callback (callable, optional) – Callback function with requests.exceptions.ConnectionError as argument, called when API server didn’t send response from send_progress(..). Defaults to None.
image_metadata_params (list, optional) – Parameters specific for the used image generator model to add as metadata to the generated image. Fixed parameters are Artist, ProcessingSoftware, Software, ImageEditingSoftware = AIME API <endpoint_name>. Defaults to aime_api_worker_interface.DEFAULT_IMAGE_METADATA.
- progress_data_received
True, if API server sent response to send_progress(), False while progress data is being transmitted or if an error occured.
- Type:
bool
Examples
Minimal example, instantiate the api_worker with URL to the API server, job type and auth_key. Waiting for and receiving job data and sending job result:
from aime_api_worker_interface import APIWorkerInterface api_worker = APIWorkerInterface('http://api.aime.team', 'llama2_chat', <auth_key>) while True: job_data = api_worker.job_request() output = do_deep_learning_worker_calculations(job_data, ...) api_worker.send_job_results(output)
Example usage with progress:
from aime_api_worker_interface import APIWorkerInterface api_worker = APIWorkerInterface('http://api.aime.team', 'llama2_chat', <auth_key>) while True: job_data = api_worker.job_request() for step in deep_learning_worker_calculation: progress_in_percent = round(step*100/len(deep_learning_worker_calculation)) progress_data = do_deep_learning_worker_calculation_step(job_data, ...) if api_worker.progress_data_received: api_worker.send_progress(progress_in_percent, progress_data) output = get_result() api_worker.send_job_results(output)
Example usage with callback:
from aime_api_worker_interface import APIWorkerInterface def progress_callback(api_worker, progress, progress_data): if api_worker.progress_data_received: api_worker.send_progress(progress, progress_data) api_worker = APIWorkerInterface('http://api.aime.team', 'llama2_chat', <auth_key>) callback = Callback(api_worker) while True: job_data = api_worker.job_request() output = do_deep_learning_worker_calculation(job_data, progress_callback, api_worker, ...) api_worker.send_progress(progress, progress_data)
Example usage with callback class:
from aime_api_worker_interface import APIWorkerInterface class Callback(): def __init__(self, api_worker): self.api_worker = api_worker def progress_callback(self, progress, progress_data): if self.api_worker.progress_data_received: self.api_worker.send_progress(progress, progress_data) def result_callback(self, result): self.api_worker.send_job_results(result) api_worker = APIWorkerInterface('http://api.aime.team', 'llama2_chat', <auth_key>) callback = Callback(api_worker) while True: job_data = api_worker.job_request() do_deep_learning_worker_calculation(job_data, callback.result_callback, callback.progress_callback, ...)
- get_current_job_data(job_id=None)
get the job_data of current job to be processed
- Parameters:
job_id (string, optional) – For single job processing (max_job_batch=1) the job_id is not required. For batch job processing (max_job_batch>1) the job_id is required to specify the job the data should be returned. Defaults to None.
- Returns:
the job_data of the current only job or the job with the given job_id
- Return type:
dict
- get_current_job_batch_data()
get the job_datas of the current batch as list
- Returns:
the list of job_datas of the current jobs to be processed
- Return type:
list
- get_job_batch_parameter(param_name)
get the job_data parameter values for a specific parameter as value array
- Returns:
The value list of the parameter across the batch
- Return type:
list
- has_job_finished(job_data)
check if specific job has been addressed with a send_job_results and is thereby finished
- Parameters:
job_data – job_data of the job to check
- Returns:
True if job has send job_results, False otherwise
- Return type:
bool
- have_all_jobs_finished()
check if all jobs have been addressed with a send_job_results and are therefore finished
- Returns:
true if all jobs have send job_results, false otherwise
- Return type:
bool
- job_request()
Worker requests a single job from the API Server on endpoint route /worker_job_request.
Does call job_batch_request() with max_job_batch size 1 and returns the first job.
See job_batch_request() for more information.
- Returns:
job data with worker [INPUT] parameters received from API server.
- Return type:
dict
- job_batch_request(max_job_batch)
Worker requests a job batch from API Server on endpoint route /worker_job_request.
If there is no client job offer within the job_timeout = request_timeout * 0.9 the API server responds with ‘cmd’:’no_job’ and the worker requests a job again on endpoint route/worker_job_request.
In MultGPU-Mode (world_size > 1) only rank 0 will get the job_data.
- Parameters:
max_job_batch – max job batch size of jobs to process. One to max job batch size jobs will be returned
- Returns:
List of job data with worker [INPUT] parameters received from API server.
- Return type:
list
Examples
Example job data:
response_data = { 'wait_for_result': False, 'endpoint_name': 'stable_diffusion_xl_txt2img', 'start_time': 1700430052.505548, 'start_time_compute': 1700430052.5124364}, 'cmd': 'job', 'job_data': [{ 'job_id': 'JID1', 'prompt': 'prompt', ... }], 'progress_descriptions': { 'progress_images': { 'type': 'image_list', 'image_format': 'JPEG', 'color_space': 'RGB' } }, 'output_descriptions': { 'images': {'type': 'image_list', 'image_format': 'JPEG', 'color_space': 'RGB'}, 'seed': {'type': 'integer'}, 'prompt': {'type': 'string'}, 'error': {'type': 'string'} } }
- send_job_results(results, job_data=None)
Process/convert job results and send it to API Server on route /worker_job_result.
- Parameters:
results (dict) – worker [OUTPUT] result parameters (f.i. ‘image’, ‘images’ or ‘text’). Example results:
{'images': [<PIL.Image.Image>, <PIL.Image.Image>, ...]}
- Returns:
Http response from API server to the worker.
- Return type:
requests.models.Response
Examples
Example response.json():
API Server received data without problems: {'cmd': 'ok'} An error occured in API server: {'cmd': 'error', 'msg': <error message>} API Server received data received with a warning: {'cmd': 'warning', 'msg': <warning message>}
- send_progress(progress, progress_data=None, progress_received_callback=None, progress_error_callback=None, job_data=None)
Processes/converts job progress information and data and sends it to API Server on route /worker_job_progress asynchronously to main thread using Pool().apply_async() from multiprocessing.dummy. When Api server received progress data, self.progress_data_received is set to True. Use progress_received_callback and progress_error_callback for response.
- Parameters:
progress (int) – current progress (f.i. percent or number of generated tokens)
progress_data (dict, optional) – dictionary with progress_images or text while worker is computing. Example progress data: :{‘progress_images’: [<PIL.Image.Image>, <PIL.Image.Image>, …]}:. Defaults to None.
progress_received_callback (callable, optional) – Callback function with API server response as argument. Called when progress_data is received. Defaults to None.
progress_error_callback (callable, optional) – Callback function with requests.exceptions.ConnectionError or http response with :status_code == 503: as argument. Called when API server replied with error. Defaults to None.
job_data (dict, optional) – To use different job_data than the received one.
- send_batch_progress(batch_progress, progress_batch_data, progress_received_callback=None, progress_error_callback=None, job_batch_data=None)
Processes/converts job progress information and data and sends it to API Server on route /worker_job_progress asynchronously to main thread using Pool().apply_async() from multiprocessing.dummy. When Api server received progress data, self.progress_data_received is set to True. Use progress_received_callback and progress_error_callback for response.
- Parameters:
batch_progress (list(int, int, ...)) – current progress (f.i. percent or number of generated tokens)
progress_batch_data (list(dict, dict, ...), optional) – dictionary with progress_images or text while worker is computing. Example progress data: :{‘progress_images’: [<PIL.Image.Image>, <PIL.Image.Image>, …]}:. Defaults to None.
progress_received_callback (callable, optional) – Callback function with API server response as argument. Called when progress_data is received. Defaults to None.
progress_error_callback (callable, optional) – Callback function with requests.exceptions.ConnectionError or http response with :status_code == 503: as argument. Called when API server replied with error. Defaults to None.
job_batch_data (list(dict, dict, ...) – List of job datas converning jobs ready to send progress
- async_check_server_connection(check_server_callback=None, check_server_error_callback=None, terminal_output=True)
Non blocking check of Api server status on route /worker_check_server_status using Pool().apply_async() from multiprocessing.dummy
- Parameters:
check_server_callback (callable, optional) – Callback function with with API server response as argument. Called after a successful server check. Defaults to None.
check_server_error_callback (callable, optional) – Callback function with requests.exceptions.ConnectionError as argument Called when server replied with error. Defaults to None.
terminal_output (bool, optional) – Prints server status to terminal if True. Defaults to True.
- check_periodically_if_server_online(interval_seconds=1)
Checking periodically every interval_seconds=1 if server is online by post requests on route /worker_check_server_status.
- Parameters:
interval_seconds (int) – Interval in seconds to update check if server is online
- Returns:
True if server is available again
- Return type:
bool
- get_pnginfo_metadata(job_data)
Parses and returns image metadata from job_data.
- Returns:
PngInfo Object with metadata for PNG images
- Return type:
PIL.PngImagePlugin.PngInfo
- static get_version()
Parses name and version of AIME API Worker Interface with pkg_resources
- Returns:
Name and version of AIME API Worker Interface
- Return type:
str