Python Worker SDK

overview »

Humatron provides a Python Worker SDK designed for building AI workers and applications using Python programming language. This SDK is essentially a convenient wrapper around the underlying Worker API. You can access the latest version of the Python Worker SDK on https://pypi.org/project/humatron-python-sdk.
The Python Worker SDK is logically divided into two distinct modules: the Worker SDK and the REST Channel SDK. While both modules are bundled into a single library, they are fully decoupled, allowing developers to use each independently based on their needs. Whether you're building autonomous AI workers or integrating complex REST-based workflows, Python Worker SDK provides the necessary APIs.

Installation »

To install the latest version of Python Worker SDK:
1pip install humatron-python-sdk locked-dict flask requests
NOTE: flask dependency is optional and used for examples only.

worker SDK »

Python Worker SDK’s primary goal is to offer a convenient and efficient framework for building AI workers in Python, while simplifying and abstracting the underlying Worker API protocol complexities. Refer to the protocol details and terminology nomenclature here.

requests »

The communication between your AI worker and the Humatron service is based on the REST protocol. Humatron initiates the interaction by sending HTTP POST requests to the designated endpoint of your AI worker. The AI worker must listen for these requests, process the data, and return an appropriate response to Humatron. The detailed request-handling workflow is described here.
To handle these REST requests, the AI worker, built using the provided Python Worker SDK, must be integrated with a web server capable of receiving REST HTTP requests. The web server's role is to forward the incoming requests to the Python client and return the client’s response as an HTTP response. Although you are free to use any web server, we provide a simple reference utility using Flask Server (which we use in the exemple below).
REST requestdescription
registerThis request is sent when a new worker instance is hired and just before the onboarding commences.
unregisterThis request is sent when a worker instance is terminated.
pauseThis request is sent when a worker instance is temporarily paused.
resumeThis request is sent when a worker instance is resumed from paused state.
messageThis request is used to send one or more messages from the outside world to the worker instance.
interviewA interview request is used to conduct an interview for the worker template (i.e. specialist).

main classes »

Python Worker SDK provides abstract classes that simplify request processing, along with utilities to integrate these classes into your web server. Python Worker SDK provides two abstract client classes to help to handle required request:
classdescription
HumatronWorkerApiThe class mirrors the REST API, providing a single method process_request to implement. This method accepts a request object as input and returns a response object. You will need to manage the request state and handle all asynchronous logic yourself. This approach is flexible but requires additional effort.
HumatronAsyncWorkerThe class is designed to handle multiple requests concurrently in asynchronous mode. This class inherits from HumatronWorkerApi, but in the process_payload_part method it implements, the input request payload is divided into separate payload parts. This design allows each part to be processed independently without the need to manage asynchronous execution explicitly. All these response payload parts are processed in asynchronous mode. This class also provides a push_response method to push responses to the client without initiating a request. This response will be sent to the AI worker along with the next portion of responses.
After implementing one of these classes, you can integrate it into your web server by simply providing an HTTP request as input and returning the result. If you use the HumatronWorkerApi implementation, the results will be returned immediately. If you use the HumatronAsyncWorker implementation, the accumulated results will be returned with the next request.
To integrate the Python client with a web server using Flask Server, you can use the specialized utility we provide. Below is an example of how to use this utility for simple integration.

example »

Lets review the minimal implementation of the AI worker.
In our implementation we will forward all the messages that we receive from the outside world to OpenAI LLM and will send the LLM response back to the same channel the request was received on.
spec_impl.py
hide
1# AI worker Example. 2from uuid import uuid4 3from humatron.worker.client import * 4from humatron.worker.utils import make_default_response_payload 5from langchain_core.output_parsers import StrOutputParser 6from langchain_core.prompts import ChatPromptTemplate 7from langchain_openai import ChatOpenAI 8 9# Define the chat prompt template. 10_prompt = ChatPromptTemplate.from_messages([('system', 'You are a helpful assistant.'), ('user', '{input}')]) 11 12# Worker implementation based on `python-sdk` library. 13class HumatronWorkerChatGpt(HumatronAsyncWorker): 14 def __init__(self, openai_api_key: str): 15 super().__init__() 16 # Create the processing chain. 17 self._chain = _prompt | ChatOpenAI(openai_api_key=openai_api_key) | StrOutputParser() 18 19 # Implement the `process_payload_part` method. 20 def process_payload_part(self, rpp: RequestPayloadPart, _: Storage) -> ResponsePayloadPart: 21 # Process different types of request commands. 22 match rpp.body: 23 case RequestDataMessage(_) as data: 24 # To simplify the example, we skip the check for sending a message to oneself, etc. 25 match data.message: 26 case RequestMessageEmail(_) as email: 27 resp = self._chain.invoke({'input': email.text}) 28 resp_email = ResponseMessageEmail.make(sender=email.to, to=email.sender, subj='Demo', text=resp) 29 return ResponseDataMessage.make(data.instance.id, data.resource_id, resp_email, data.payload_id) 30 case RequestMessageSms(_) as sms: 31 resp = self._chain.invoke({'input': sms.text}) 32 resp_sms = ResponseMessageSms.make(sender=sms.receiver, receiver=sms.sender, text=resp) 33 return ResponseDataMessage.make(data.instance.id, data.resource_id, resp_sms, data.payload_id) 34 case RequestMessageSlack(_) as slack: 35 resp = self._chain.invoke({'input': slack.body['text']}) 36 resp_slack = ResponseMessageSlack.make(channel=slack.body['channel'], text=resp) 37 return ResponseDataMessage.make(data.instance.id, data.resource_id, resp_slack, data.payload_id) 38 case _: 39 raise ValueError(f'Unexpected request: {data.message}') 40 case _: 41 # We skip all `interview`, `register`, `unregister`, `pause` and `resume` logic. 42 return make_default_response_payload(req_cmd=rpp.req_cmd, req_payload_part=rpp.body)
Comments below:
  • Lines 3, 4 - import classes from the Python SDK library. This library provides asynchronous request handling as well as a number of utility methods.
  • Line 17 - initialize OpenAI LLM and create the request processing chain.
  • Lines 13 - define the HumatronWorkerChatGpt class, which inherits from HumatronAsyncWorker, part of the Python SDK library.
  • Line 20 - implement the abstract process_payload_part method from the HumatronAsyncWorker ABC class.
  • Lines 27, 31, 35 - send requests to the LLM, with the text field value extracted from the request body.
  • Line 26 - handle the message request with channel_type equal to email. We return an email response to the question contained in the email, sent to the sender's address.
  • Line 30 - handle the message request with channel_type equal to SMS. We return an SMS response to the question contained in the message, sent to the sender's phone number.
  • Line 34 - handle the message request with channel_type equal to slack. We return a Slack message response to the question contained in the Slack request, sent to the channel from which the request was received.
The final step in completing your AI worker is to wrap its implementation within a web server. Note that we use Flask Server in this example for demonstration purposes - you can use any web server of your choosing:
spec_rest.py
hide
1# Web Server Integration Example. 2import os 3from dotenv import load_dotenv 4from humatron.worker.rest.flask.flask_server import start_flask_server 5from demo import HumatronWorkerChatGpt 6 7# Start the REST server. 8def start() -> None: 9 # Load the environment variables. 10 load_dotenv() 11 12 # Get the tokens from the environment. 13 req_token, resp_token = os.environ['HUMATRON_REQUEST_TOKEN'], os.environ['HUMATRON_RESPONSE_TOKEN'] 14 openai_api_key = os.environ['OPENAI_API_KEY'] 15 host, port, url = os.environ['REST_HOST'], int(os.environ['REST_PORT']), os.environ['REST_URL_WORKER'] 16 17 worker = HumatronWorkerChatGpt(openai_api_key=openai_api_key) 18 start_flask_server(worker, req_token, resp_token, host, port, url, None, None, lambda: worker.close()) 19 20if __name__ == '__main__': 21 start()
Comments below:
  • Line 4 - import classes from the Python SDK library.
  • Lines 5, 18 - import and create a new instance of HumatronWorkerChatGpt we developed above.

REST channel SDK »

The REST channel is a special communication channel that enables REST interactions between an AI worker instance and the end user. The primary goal of the Python REST Channel SDK is to provide a convenient and efficient way to build custom applications based on the REST Channel API using the Python programming language.

main classes »

Python REST Channel SDK provides three abstract client classes to help to communicate via REST channel.
classdescription
RestChannelClientThe class mirrors the REST Channel API, providing a single method post to implement. This method accepts a request object as input and returns an optional response object. Note that due to the asynchronous nature of the REST channel, this post method may return a response to any previous request, the current request, or nothing at all if there is no data to return. Note that you will need to manage the request state and handle all asynchronous logic yourself. This approach is flexible but requires additional effort.
RestChannelAsyncClientThe class is designed to operate in asynchronous mode, where you send requests and listen for responses. This class is initialized with a callback parameter, which handles the asynchronous response parts. Requests are sent asynchronously using the post method.
RestChannelSyncClientThis is the synchronous version of the REST channel client. It features a single post method for sending requests and returning responses. Its API is similar to the RestChannelClient described above, with one significant difference: while RestChannelClient post method returns an optional currently accessible response object, RestChannelSyncClient post method waits for a response to its request and returns its result.

REST channel SDK example »

Below is a simple example demonstrating how to use the client classes described above.
rest_channel_client_example.py
hide
1# REST Channel Base Client Example. 2from humatron.channels.rest.client import * 3 4client = RestChannelClient('https://humatron.ai/restchannel', 'your_token') 5 6while True: 7 req_txt = input('Request: ') 8 req_payload = RequestMessagePayloadPart.make(req_txt, 'sender', 'receiver') 9 req = RestChannelRequest.make_message(req_payload) 10 resp = client.post(req) 11 12 # A response to the provided payload can be received with either the initial or subsequent response. 13 while not resp or resp.payload[0].ref_payload_id != req_payload.payload_id: 14 time.sleep(1) 15 resp = client.post(RestChannelRequest.make_heartbeat()) 16 17 resp_txt = resp.payload[0].text 18 print(f'Response: {resp_txt}')
NOTE: that you must send heartbeats to ensure you receive a response to your request.
rest_channel_async_client_example.py
hide
1# REST Channel Asynchronous Client Example. 2chat = dict() 3 4def callback(resp_part: ResponseMessagePayloadPart) -> None: 5 r = chat.get(resp_part.ref_payload_id, 'Unknown request') 6 print(f'>>Response `{resp_part.text}` received in request `{r}`.') 7 8with RestChannelAsyncClient('https://humatron.ai/restchannel', 'your_token', callback) as client: 9 while True: 10 req_txt = input('Request: ') 11 payload = RequestMessagePayloadPart.make(req_txt, 'sender', 'receiver') 12 chat[payload.payload_id] = payload.text 13 req = RestChannelRequest.make_message(payload) 14 # The response to the provided payload will be received asynchronously. 15 client.post(req)
NOTE: input and output messages may be mixed in the console output.
rest_channel_sync_client_example.py
hide
1# REST Channel Synchronous Client Example. 2from humatron.channels.rest.client import * 3 4client = RestChannelSyncClient('https://humatron.ai/restchannel', 'your_token') 5 6while True: 7 req_txt = input('Request: ') 8 req = RestChannelRequest.make_message(RequestMessagePayloadPart.make(req_txt, 'sender', 'receiver')) 9 # The response to the provided payload is received in a synchronous manner. 10 resp = client.post(req) 11 resp_txt = resp.payload[0].text 12 print(f'Response: {resp_txt}')
NOTE: this class handles a single request payload at a time, one by one.