Documentation AIME API Worker Interface

class aime_api_worker_interface.APIWorkerInterface(api_server, job_type, auth_key, gpu_id=0, world_size=1, rank=0, gpu_name=None, image_metadata_params=['prompt', 'negative_prompt', 'seed', 'base_steps', 'refine_steps', 'scale', 'aesthetic_score', 'negative_aesthetic_score', 'img2img_strength', 'base_sampler', 'refine_sampler', 'base_discretization', 'refine_discretization'], print_server_status=True, request_timeout=60, worker_version=0)

Interface for deep learning models to communicate with AIME API Server.

Parameters:
  • api_server (str) – Address of API Server. Example: ‘http://api.aime.team’.

  • job_type (str) – Type of job . Example: “stable_diffusion_xl_txt2img”.

  • auth_key (str) – key to authorize worker to connect with API Server.

  • gpu_id (int, optional) – ID of GPU the worker runs on. Defaults to 0.

  • world_size (int, optional) – Number of used GPUs the worker runs on. Defaults to 1.

  • rank (int, optional) – ID of current GPU if world_size > 1. Defaults to 0.

  • gpu_name (str, optional) – Name of GPU the worker runs on. Defaults to None.

  • progress_received_callback (callable, optional) – Callback function with http response as argument, called when API server sent response to send_progress(..). Defaults to None.

  • progress_error_callback (callable, optional) – Callback function with requests.exceptions.ConnectionError as argument, called when API server didn’t send response from send_progress(..). Defaults to None.

  • image_metadata_params (list, optional) – Parameters specific for the used image generator model to add as metadata to the generated image. Fixed parameters are Artist, ProcessingSoftware, Software, ImageEditingSoftware = AIME API <endpoint_name>. Defaults to aime_api_worker_interface.DEFAULT_IMAGE_METADATA.

progress_data_received

True, if API server sent response to send_progress(), False while progress data is being transmitted or if an error occured.

Type:

bool

Examples

Minimal example, instantiate the api_worker with URL to the API server, job type and auth_key. Waiting for and receiving job data and sending job result:

from aime_api_worker_interface import APIWorkerInterface

api_worker = APIWorkerInterface('http://api.aime.team', 'llama2_chat', <auth_key>)
while True:
    job_data = api_worker.job_request()
    output = do_deep_learning_worker_calculations(job_data, ...)
    api_worker.send_job_results(output)

Example usage with progress:

from aime_api_worker_interface import APIWorkerInterface

api_worker = APIWorkerInterface('http://api.aime.team', 'llama2_chat', <auth_key>)
while True:
    job_data = api_worker.job_request()

    for step in deep_learning_worker_calculation:
        progress_in_percent = round(step*100/len(deep_learning_worker_calculation))
        progress_data = do_deep_learning_worker_calculation_step(job_data, ...)
        if api_worker.progress_data_received:
            api_worker.send_progress(progress_in_percent, progress_data)
    output = get_result()
    api_worker.send_job_results(output)

Example usage with callback:

from aime_api_worker_interface import APIWorkerInterface

def progress_callback(api_worker, progress, progress_data):
    if api_worker.progress_data_received:
        api_worker.send_progress(progress, progress_data)


api_worker = APIWorkerInterface('http://api.aime.team', 'llama2_chat', <auth_key>)
callback = Callback(api_worker)

while True:
    job_data = api_worker.job_request()
    output = do_deep_learning_worker_calculation(job_data, progress_callback, api_worker, ...)
    api_worker.send_progress(progress, progress_data)

Example usage with callback class:

from aime_api_worker_interface import APIWorkerInterface

class Callback():

    def __init__(self, api_worker):
        self.api_worker = api_worker


    def progress_callback(self, progress, progress_data):
        if self.api_worker.progress_data_received:
            self.api_worker.send_progress(progress, progress_data)

    def result_callback(self, result):
        self.api_worker.send_job_results(result)


api_worker = APIWorkerInterface('http://api.aime.team', 'llama2_chat', <auth_key>)
callback = Callback(api_worker)

while True:
    job_data = api_worker.job_request()
    do_deep_learning_worker_calculation(job_data, callback.result_callback, callback.progress_callback, ...)
get_current_job_data(job_id=None)

get the job_data of current job to be processed

Parameters:

job_id (string, optional) – For single job processing (max_job_batch=1) the job_id is not required. For batch job processing (max_job_batch>1) the job_id is required to specify the job the data should be returned. Defaults to None.

Returns:

the job_data of the current only job or the job with the given job_id

Return type:

dict

get_current_job_batch_data()

get the job_datas of the current batch as list

Returns:

the list of job_datas of the current jobs to be processed

Return type:

list

get_job_batch_parameter(param_name)

get the job_data parameter values for a specific parameter as value array

Returns:

The value list of the parameter across the batch

Return type:

list

has_job_finished(job_data)

check if specific job has been addressed with a send_job_results and is thereby finished

Parameters:

job_data – job_data of the job to check

Returns:

True if job has send job_results, False otherwise

Return type:

bool

have_all_jobs_finished()

check if all jobs have been addressed with a send_job_results and are therefore finished

Returns:

true if all jobs have send job_results, false otherwise

Return type:

bool

job_request()

Worker requests a single job from the API Server on endpoint route /worker_job_request.

Does call job_batch_request() with max_job_batch size 1 and returns the first job.

See job_batch_request() for more information.

Returns:

job data with worker [INPUT] parameters received from API server.

Return type:

dict

job_batch_request(max_job_batch)

Worker requests a job batch from API Server on endpoint route /worker_job_request.

If there is no client job offer within the job_timeout = request_timeout * 0.9 the API server responds with ‘cmd’:’no_job’ and the worker requests a job again on endpoint route/worker_job_request.

In MultGPU-Mode (world_size > 1) only rank 0 will get the job_data.

Parameters:

max_job_batch – max job batch size of jobs to process. One to max job batch size jobs will be returned

Returns:

List of job data with worker [INPUT] parameters received from API server.

Return type:

list

Examples

Example job data:

response_data = {
    'wait_for_result': False,
    'endpoint_name': 'stable_diffusion_xl_txt2img',
    'start_time': 1700430052.505548,
    'start_time_compute': 1700430052.5124364},
    'cmd': 'job',
    'job_data': [{
        'job_id': 'JID1',
        'prompt': 'prompt',
        ...
    }],
    'progress_descriptions': {
        'progress_images': {
            'type': 'image_list', 'image_format': 'JPEG', 'color_space': 'RGB'
        }
    },
    'output_descriptions': {
        'images': {'type': 'image_list', 'image_format': 'JPEG', 'color_space': 'RGB'},
        'seed': {'type': 'integer'},
        'prompt': {'type': 'string'},
        'error': {'type': 'string'}
    }
}
send_job_results(results, job_data=None)

Process/convert job results and send it to API Server on route /worker_job_result.

Parameters:

results (dict) – worker [OUTPUT] result parameters (f.i. ‘image’, ‘images’ or ‘text’). Example results: {'images': [<PIL.Image.Image>, <PIL.Image.Image>, ...]}

Returns:

Http response from API server to the worker.

Return type:

requests.models.Response

Examples

Example response.json():

API Server received data without problems:          {'cmd': 'ok'}
An error occured in API server:                     {'cmd': 'error', 'msg': <error message>}
API Server received data received with a warning:   {'cmd': 'warning', 'msg': <warning message>}
send_progress(progress, progress_data=None, progress_received_callback=None, progress_error_callback=None, job_data=None)

Processes/converts job progress information and data and sends it to API Server on route /worker_job_progress asynchronously to main thread using Pool().apply_async() from multiprocessing.dummy. When Api server received progress data, self.progress_data_received is set to True. Use progress_received_callback and progress_error_callback for response.

Parameters:
  • progress (int) – current progress (f.i. percent or number of generated tokens)

  • progress_data (dict, optional) – dictionary with progress_images or text while worker is computing. Example progress data: :{‘progress_images’: [<PIL.Image.Image>, <PIL.Image.Image>, …]}:. Defaults to None.

  • progress_received_callback (callable, optional) – Callback function with API server response as argument. Called when progress_data is received. Defaults to None.

  • progress_error_callback (callable, optional) – Callback function with requests.exceptions.ConnectionError or http response with :status_code == 503: as argument. Called when API server replied with error. Defaults to None.

  • job_data (dict, optional) – To use different job_data than the received one.

send_batch_progress(batch_progress, progress_batch_data, progress_received_callback=None, progress_error_callback=None, job_batch_data=None)

Processes/converts job progress information and data and sends it to API Server on route /worker_job_progress asynchronously to main thread using Pool().apply_async() from multiprocessing.dummy. When Api server received progress data, self.progress_data_received is set to True. Use progress_received_callback and progress_error_callback for response.

Parameters:
  • batch_progress (list(int, int, ...)) – current progress (f.i. percent or number of generated tokens)

  • progress_batch_data (list(dict, dict, ...), optional) – dictionary with progress_images or text while worker is computing. Example progress data: :{‘progress_images’: [<PIL.Image.Image>, <PIL.Image.Image>, …]}:. Defaults to None.

  • progress_received_callback (callable, optional) – Callback function with API server response as argument. Called when progress_data is received. Defaults to None.

  • progress_error_callback (callable, optional) – Callback function with requests.exceptions.ConnectionError or http response with :status_code == 503: as argument. Called when API server replied with error. Defaults to None.

  • job_batch_data (list(dict, dict, ...) – List of job datas converning jobs ready to send progress

async_check_server_connection(check_server_callback=None, check_server_error_callback=None, terminal_output=True)

Non blocking check of Api server status on route /worker_check_server_status using Pool().apply_async() from multiprocessing.dummy

Parameters:
  • check_server_callback (callable, optional) – Callback function with with API server response as argument. Called after a successful server check. Defaults to None.

  • check_server_error_callback (callable, optional) – Callback function with requests.exceptions.ConnectionError as argument Called when server replied with error. Defaults to None.

  • terminal_output (bool, optional) – Prints server status to terminal if True. Defaults to True.

check_periodically_if_server_online(interval_seconds=1)

Checking periodically every interval_seconds=1 if server is online by post requests on route /worker_check_server_status.

Parameters:

interval_seconds (int) – Interval in seconds to update check if server is online

Returns:

True if server is available again

Return type:

bool

get_pnginfo_metadata(job_data)

Parses and returns image metadata from job_data.

Returns:

PngInfo Object with metadata for PNG images

Return type:

PIL.PngImagePlugin.PngInfo

static get_version()

Parses name and version of AIME API Worker Interface with pkg_resources

Returns:

Name and version of AIME API Worker Interface

Return type:

str